From aefdc8d0edad5d77d73ac40d8002363e12da00da Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Tue, 9 Aug 2022 02:56:10 -0400 Subject: fures, ostendite se! --- xv/__init__.py | 1 + xv/amap.py | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ xv/model.py | 140 ++++++++++++++++++++++++++++++ xv/util.py | 90 ++++++++++++++++++++ 4 files changed, 496 insertions(+) create mode 100644 xv/__init__.py create mode 100644 xv/amap.py create mode 100644 xv/model.py create mode 100644 xv/util.py (limited to 'xv') diff --git a/xv/__init__.py b/xv/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/xv/__init__.py @@ -0,0 +1 @@ + diff --git a/xv/amap.py b/xv/amap.py new file mode 100644 index 0000000..4ad7ff5 --- /dev/null +++ b/xv/amap.py @@ -0,0 +1,265 @@ +# Part of the Fifteen-Thieves Project +# Chris Xiong 2020 +# License: Expat (MIT) + +import re +from collections import namedtuple +import xv.util as u + +LeafNode = namedtuple("LeafNode", ["offset", "name", "size", "elements", "stride"]) + +def strsize2bytesize(s): + if s.startswith('0'): + return 1 + else: + return int(s[1:]) + +def get_leaf_data(data, leaf): + if leaf.size.startswith('0'): + return data[leaf.offset] + else: + ret = 0 + for i in range(0, int(leaf.size[1:])): + ret <<= 4 + ret |= data[leaf.offset + i] & 0xf + return ret + +def set_leaf_data(data, leaf, value): + sz = int(leaf.size[1:]) + if leaf.size.startswith('0'): + if value >= (1 << sz): + raise ValueError("Value overflow") + data[leaf.offset] = value + else: + if value >= (1 << (sz * 4)): + raise ValueError("Value overflow") + for i in range(0, sz): + data[leaf.offset + sz - 1 - i] = value & 0xf + value >>= 4 + +class AddrMapTemplateNode: + def __init__(self, lines, is_root=False): + self.valid = True + self.name = lines[0].strip() + self.is_root = is_root + self.is_aggregate_node = False + self.children_nodes = [] + self.children_names = dict() + if self.name.endswith('*'): + self.is_aggregate_node = True + self.name = self.name[:-1] + for line in lines[1:]: + el = line.split(',') + if not 2 <= len(el) <= 3: + self.valid = False + break + offset = u.hexrep2b7val(int(el[0].strip(), 16)) + size = None if len(el) == 2 else el[1].strip() + m = re.match("([a-z_0-9]*)(\[([0-9]*)(\+[0-9a-f]*)?\])?", el[-1].strip()) + if not m: + self.valid = False + break + name = m.groups()[0] + elements = None if not m.groups()[2] else int(m.groups()[2]) + stride = None if not m.groups()[-1] else u.hexrep2b7val(int(m.groups()[-1][1:], 16)) + + child = LeafNode(offset, name, size, elements, stride) + self.children_names[name] = len(self.children_nodes) + self.children_nodes.append(child) + +class AddrMapTemplate: + def __init__(self, amapfile=None): + self.root_node_name = None + self.nodes = dict() + self.parse(amapfile) + if not self.verify(): + raise ValueError("invalid address map file") + + def parse(self,amapfile=None): + linebuf = [] + with open(amapfile, "r", encoding="utf-8") as f: + for line in f: + line = line[:line.find('#')] + if len(line.strip()) == 0: + continue + if re.match("^[a-z]", line): + if len(linebuf) > 0: + newnode = AddrMapTemplateNode(linebuf, len(self.nodes) == 0) + if newnode.valid: + self.nodes[newnode.name] = newnode + if newnode.is_root: + self.root_node_name = newnode.name + linebuf = [line] + else: + linebuf.append(line) + if len(linebuf) > 0: + newnode = AddrMapTemplateNode(linebuf) + if newnode.valid: + self.nodes[newnode.name] = newnode + + def verify(self): + for nodename, node in self.nodes.items(): + if not node.is_aggregate_node: + continue + for child in node.children_nodes: + if not child.name in self.nodes: + print(f"referencing undefined node {child.name} in {node.name}") + return False + return True + +class AddrMapNode: + def __init__(self, template_node, base_address, data, template): + self.template = template_node + self.base_address = base_address + self.upper_address = base_address + self.children_nodes = [] + self.data = data + self._populate(template) + + def _populate(self, template): + for t in self.template.children_nodes: + if self.template.is_aggregate_node: + ba = self.base_address + t.offset + if t.elements: + stride = t.stride if t.stride else 1 + l = [] + for i in range(0, t.elements): + l.append(AddrMapNode(template.nodes[t.name], ba, self.data, template)) + ba += stride + self.children_nodes.append(l) + if l[-1].upper_address > self.upper_address: + self.upper_address = l[-1].upper_address + else: + self.children_nodes.append(AddrMapNode(template.nodes[t.name], ba, self.data, template)) + if self.children_nodes[-1].upper_address > self.upper_address: + self.upper_address = self.children_nodes[-1].upper_address + else: + ba = self.base_address + t.offset + if t.elements: + stride = t.stride if t.stride else 1 + l = [] + for i in range(0, t.elements): + cn = LeafNode(ba, t.name, t.size, t.elements, t.stride) + l.append(cn) + ba += stride + self.children_nodes.append(l) + if ba - stride + strsize2bytesize(t.size) > self.upper_address: + self.upper_address = ba - stride + strsize2bytesize(t.size) + else: + cn = LeafNode(ba, t.name, t.size, t.elements, t.stride) + self.children_nodes.append(cn) + if ba + strsize2bytesize(t.size) > self.upper_address: + self.upper_address = ba + strsize2bytesize(t.size) + + def dump(self, level=0, instance=None): + ba_rep = ''.join([format(x, "02x") for x in u.padbelist(u.int2b7belist(self.base_address))]) + print(f"{ba_rep}{level*' '} {self.template.name}{'['+str(instance)+']' if instance is not None else ''}") + for child in self.children_nodes: + if self.template.is_aggregate_node: + if type(child) == list: + for i, element in enumerate(child): + element.dump(level + 1, i) + else: + child.dump(level + 1) + else: + if type(child) == list: + for i, element in enumerate(child): + ba_rep = ''.join([format(x, "02x") for x in u.padbelist(u.int2b7belist(element.offset))]) + print(f"{ba_rep}{(level + 1)*' '} {element.name}[{i}] {element.size}") + else: + ba_rep = ''.join([format(x, "02x") for x in u.padbelist(u.int2b7belist(child.offset))]) + print(f"{ba_rep}{(level + 1)*' '} {child.name} {child.size}") + + def set_data(self, data): + if len(data) > self.size(): + raise ValueError("excessive data") + self.data[self.base_address:self.base_address + len(data)] = data + + def get_data(self): + return self.data[self.base_address:self.upper_address + 1] + + def size(self): + return self.upper_address - self.base_address + 1 + + def find_node(self, key): + if type(key) != list: + key = key.split('.') + if len(key) == 0: + raise ValueError("???") + if key[0] == self.template.name: + key = key[1:] + if len(key) == 0: + return self + m = re.match("([a-z_0-9]*)(\[([0-9]*)\])?", key[0]) + if not m: + raise ValueError(f"Invalid key: {key[0]}") + if m.groups()[0] not in self.template.children_names: + raise ValueError(f"No such region: {m.groups()[0]}") + child = self.children_nodes[self.template.children_names[m.groups()[0]]] + if m.groups()[2] and type(child) != list: + raise ValueError(f"{m.groups()[0]} is not a list") + if not m.groups()[2] and type(child) == list: + if len(key) > 1: + raise ValueError(f"index needed for {m.groups()[0]}") + if not self.template.is_aggregate_node and len(key) > 1: + raise ValueError(f"{key[0]} has no more subregions") + if type(child) == list: + if len(key) > 1: + return child[int(m.groups()[2])].find_node(key[1:]) + else: + return child if not m.groups()[2] else child[int(m.groups()[2])] + else: + if len(key) > 1: + return child.find_node(key[1:]) + else: + return child + + def value(self, key): + node = self.find_node(key) + if type(node) == list: + if type(node[0]) == AddrMapNode: + return node.data[node[0].base_address:node[-1].upper_address + 1] + else: + stride = node[0].stride if node[0].stride else 1 + return self.data[node[0].offset:node[0].offset + len(node) * stride:stride] + else: + if type(node) == AddrMapNode: + return node.data[node.base_address:node.upper_address + 1] + else: + return get_leaf_data(self.data, node) + + def set_value(self, key, data): + node = self.find_node(key) + if type(node) == list: + if type(node[0]) == AddrMapNode: + if len(data) > node[-1].upper_address + 1 - node[0].base_address: + raise ValueError("excessive data") + node.data[node[0].base_address:node[0].base_address + len(data)] = data + else: + stride = node[0].stride if node[0].stride else 1 + if len(data) > len(self.data[node[0].offset:node[0].offset + len(node) * stride:stride]): + raise ValueError("excessive data") + self.data[node[0].offset:node[0].offset + len(node) * stride:stride] = data + else: + if type(node) == AddrMapNode: + if len(data) > node.upper_address + 1 - node.base_address: + raise ValueError("excessive data") + node.data[node.base_address:node.base_address + len(data)] = data + else: + return set_leaf_data(self.data, node, data) + + def __getitem__(self, key): + return self.children_nodes[self.template.children_names[key]] + + def __iter__(self): + return self.children_nodes.__iter__() + + def __len__(self): + return len(self.children_nodes) + +def create_addr_map_tree(template): + r = AddrMapNode(template.nodes[template.root_node_name], 0, bytearray(), template) + r.data.extend(bytearray(r.upper_address)) + return r + +# vim: expandtab shiftwidth=4 tabstop=4 diff --git a/xv/model.py b/xv/model.py new file mode 100644 index 0000000..0c0c014 --- /dev/null +++ b/xv/model.py @@ -0,0 +1,140 @@ +# Part of the Fifteen-Thieves Project +# Chris Xiong 2020 +# License: Expat (MIT) + +import rtmidi.midiutil as r +import xv.util as u +import xv.amap +import threading + +class Model: + def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None): + self.midii,_ = r.open_midiinput(port_name) + self.midio,_ = r.open_midioutput(port_name) + self.midii.ignore_types(sysex=False) + self.model = model + self.devn = devn + self.aroot = address_mapping + + def __del__(self): + del self.midii + del self.midio + + def writeraw(self, addr, data): + if type(addr) == int: + addr = u.int2belist(addr) + addr = u.padbelist(addr) + if len(addr) > 4: + raise ValueError("invalid address") + + for a in addr: + if a > 0x80 or a < 0: + raise ValueError("invalid address") + + cs = u.roland_checksum(addr + data) + d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7] + #print(' '.join([format(a, "02x") for a in d])) + + self.midio.send_message(d) + + def readraw(self, addr, size, timeout=None): + if type(addr) == int: + addr = u.int2belist(addr) + addr = u.padbelist(addr) + if len(addr) > 4: + raise ValueError("invalid address") + if type(size) == int: + size = u.padbelist(u.int2b7belist(size)) + rsize = u.b7belist2int(size) + + for a in addr: + if a > 0x80 or a < 0: + raise ValueError("invalid address") + + cs = u.roland_checksum(addr + size) + d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7] + #print(' '.join([format(a, "02x") for a in d])) + + cv = threading.Condition() + m = [] + def msgcb(d, cd): + _m, dt = d + nonlocal m, cv, addr + is_last = False + + if u.check_roland_checksum(_m): + m.append( (_m[6:10], _m[10:-2]) ) + if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize: + is_last = True + else: + raise RuntimeError("checksum error") + + if is_last: + cv.acquire() + cv.notify() + cv.release() + + self.midii.set_callback(msgcb) + cv.acquire() + self.midio.send_message(d) + cv.wait(timeout) + cv.release() + self.midii.cancel_callback() + return m + + #current implementation doesn't support reading entire strided list of primitives + def read_locations(self, key, timeout=None): + node = self.aroot.find_node(key) + rqsz = 0 + rqba = 0 + if type(node) == list: + if type(node[0]) == xv.amap.AddrMapNode: + rqsz = node[-1].upper_address + 1 - node[0].base_address + rqba = node[0].base_address + else: + rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1 + rqba = node[0].offset + else: + if type(node) == xv.amap.AddrMapNode: + rqba = node.base_address + rqsz = node.upper_address + 1 - node.base_address + else: + rqba = node.offset + rqsz = xv.amap.strsize2bytesize(node.size) + rqba = u.padbelist(u.int2b7belist(rqba)) + r = self.readraw(rqba, rqsz, timeout) + for m in r: + mba = u.b7belist2int(m[0]) + self.aroot.data[mba:mba + len(m[1])] = m[1] + return r + + #current implementation doesn't support writing entire strided list of primitives + def write_locations(self, key, data): + node = self.aroot.find_node(key) + dstsz = 0 + dstba = 0 + if type(node) == list: + if type(node[0]) == xv.amap.AddrMapNode: + dstsz = node[-1].upper_address + 1 - node[0].base_address + dstba = node[0].base_address + else: + dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1 + dstba = node[0].offset + else: + if type(node) == xv.amap.AddrMapNode: + dstba = node.base_address + dstsz = node.upper_address + 1 - node.base_address + else: + dstba = node.offset + dstsz = xv.amap.strsize2bytesize(node.size) + if type(data) != list: + self.aroot.set_value(key, data) + data = list(self.aroot.data[node.offset:node.offset + dstsz]) + if len(data) > dstsz: + raise ValueError("excessive data") + self.aroot.data[dstba:dstba + len(data)] = data + + dstba = u.padbelist(u.int2b7belist(dstba)) + self.writeraw(dstba, data) + +# vim: expandtab shiftwidth=4 tabstop=4 diff --git a/xv/util.py b/xv/util.py new file mode 100644 index 0000000..0b3f2e6 --- /dev/null +++ b/xv/util.py @@ -0,0 +1,90 @@ +# Part of the Fifteen-Thieves Project +# Chris Xiong 2020 +# License: Expat (MIT) + +def roland_checksum(payload): + return (0x80 - sum(payload) % 0x80) % 0x80 + +def check_roland_checksum(msg): + return (0x80 - sum(msg[6:-2]) % 0x80) % 0x80 == msg[-2] + +def int2belist(x): + ret = [] + while x > 0: + ret = [x & 0xff] + ret + x >>= 8 + return ret + +def belist2int(x): + ret = 0 + for i in x: + ret <<= 8 + ret |= i + return ret + +def int2b7belist(x): + ret = [] + while x > 0: + ret = [x & 0x7f] + ret + x >>= 7 + return ret + +def strb7list(x): + return ' '.join([format(a, "02x") for a in x]) + +def b7belist2int(x): + ret = 0 + for i in x: + ret <<= 7 + ret |= i + return ret + +def hexrep2b7val(x): + ret = 0 + b = 0 + while x > 0: + if x & 0xff > 0x7f: + raise ValueError("invalid value") + ret |= (x & 0xff) << (7 * b) + b += 1 + x >>= 8 + return ret + +def padbelist(x, l=4): + return ([0]*(l-len(x))) + x + +def int2packetint(x): + ret = [] + while x > 0: + ret = [x & 0x0f] + ret + x >>= 4 + return ret + +def packetint2int(x): + ret = 0 + for i in x: + ret <<= 4 + ret |= i + return ret + +def load_memoryregion(fn): + ret = [] + with open(fn, "rb") as f: + while True: + addr = list(f.read(4)) + if len(addr) < 4: + break + l = belist2int(list(f.read(4))) + data = list(f.read(l)) + ret.append( (addr, data) ) + return ret + +def write_memoryregion(fn, mr): + with open(fn, "wb") as f: + for addr, data in mr: + l = padbelist(int2belist(len(data))) + f.write(bytearray(addr)) + f.write(bytearray(l)) + f.write(bytearray(data)) + +# vim: expandtab shiftwidth=4 tabstop=4 -- cgit v1.2.3