aboutsummaryrefslogtreecommitdiff
path: root/xv
diff options
context:
space:
mode:
Diffstat (limited to 'xv')
-rw-r--r--xv/__init__.py1
-rw-r--r--xv/amap.py265
-rw-r--r--xv/model.py140
-rw-r--r--xv/util.py90
4 files changed, 496 insertions, 0 deletions
diff --git a/xv/__init__.py b/xv/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/xv/__init__.py
@@ -0,0 +1 @@
+
diff --git a/xv/amap.py b/xv/amap.py
new file mode 100644
index 0000000..4ad7ff5
--- /dev/null
+++ b/xv/amap.py
@@ -0,0 +1,265 @@
+# Part of the Fifteen-Thieves Project
+# Chris Xiong 2020
+# License: Expat (MIT)
+
+import re
+from collections import namedtuple
+import xv.util as u
+
+LeafNode = namedtuple("LeafNode", ["offset", "name", "size", "elements", "stride"])
+
+def strsize2bytesize(s):
+ if s.startswith('0'):
+ return 1
+ else:
+ return int(s[1:])
+
+def get_leaf_data(data, leaf):
+ if leaf.size.startswith('0'):
+ return data[leaf.offset]
+ else:
+ ret = 0
+ for i in range(0, int(leaf.size[1:])):
+ ret <<= 4
+ ret |= data[leaf.offset + i] & 0xf
+ return ret
+
+def set_leaf_data(data, leaf, value):
+ sz = int(leaf.size[1:])
+ if leaf.size.startswith('0'):
+ if value >= (1 << sz):
+ raise ValueError("Value overflow")
+ data[leaf.offset] = value
+ else:
+ if value >= (1 << (sz * 4)):
+ raise ValueError("Value overflow")
+ for i in range(0, sz):
+ data[leaf.offset + sz - 1 - i] = value & 0xf
+ value >>= 4
+
+class AddrMapTemplateNode:
+ def __init__(self, lines, is_root=False):
+ self.valid = True
+ self.name = lines[0].strip()
+ self.is_root = is_root
+ self.is_aggregate_node = False
+ self.children_nodes = []
+ self.children_names = dict()
+ if self.name.endswith('*'):
+ self.is_aggregate_node = True
+ self.name = self.name[:-1]
+ for line in lines[1:]:
+ el = line.split(',')
+ if not 2 <= len(el) <= 3:
+ self.valid = False
+ break
+ offset = u.hexrep2b7val(int(el[0].strip(), 16))
+ size = None if len(el) == 2 else el[1].strip()
+ m = re.match("([a-z_0-9]*)(\[([0-9]*)(\+[0-9a-f]*)?\])?", el[-1].strip())
+ if not m:
+ self.valid = False
+ break
+ name = m.groups()[0]
+ elements = None if not m.groups()[2] else int(m.groups()[2])
+ stride = None if not m.groups()[-1] else u.hexrep2b7val(int(m.groups()[-1][1:], 16))
+
+ child = LeafNode(offset, name, size, elements, stride)
+ self.children_names[name] = len(self.children_nodes)
+ self.children_nodes.append(child)
+
+class AddrMapTemplate:
+ def __init__(self, amapfile=None):
+ self.root_node_name = None
+ self.nodes = dict()
+ self.parse(amapfile)
+ if not self.verify():
+ raise ValueError("invalid address map file")
+
+ def parse(self,amapfile=None):
+ linebuf = []
+ with open(amapfile, "r", encoding="utf-8") as f:
+ for line in f:
+ line = line[:line.find('#')]
+ if len(line.strip()) == 0:
+ continue
+ if re.match("^[a-z]", line):
+ if len(linebuf) > 0:
+ newnode = AddrMapTemplateNode(linebuf, len(self.nodes) == 0)
+ if newnode.valid:
+ self.nodes[newnode.name] = newnode
+ if newnode.is_root:
+ self.root_node_name = newnode.name
+ linebuf = [line]
+ else:
+ linebuf.append(line)
+ if len(linebuf) > 0:
+ newnode = AddrMapTemplateNode(linebuf)
+ if newnode.valid:
+ self.nodes[newnode.name] = newnode
+
+ def verify(self):
+ for nodename, node in self.nodes.items():
+ if not node.is_aggregate_node:
+ continue
+ for child in node.children_nodes:
+ if not child.name in self.nodes:
+ print(f"referencing undefined node {child.name} in {node.name}")
+ return False
+ return True
+
+class AddrMapNode:
+ def __init__(self, template_node, base_address, data, template):
+ self.template = template_node
+ self.base_address = base_address
+ self.upper_address = base_address
+ self.children_nodes = []
+ self.data = data
+ self._populate(template)
+
+ def _populate(self, template):
+ for t in self.template.children_nodes:
+ if self.template.is_aggregate_node:
+ ba = self.base_address + t.offset
+ if t.elements:
+ stride = t.stride if t.stride else 1
+ l = []
+ for i in range(0, t.elements):
+ l.append(AddrMapNode(template.nodes[t.name], ba, self.data, template))
+ ba += stride
+ self.children_nodes.append(l)
+ if l[-1].upper_address > self.upper_address:
+ self.upper_address = l[-1].upper_address
+ else:
+ self.children_nodes.append(AddrMapNode(template.nodes[t.name], ba, self.data, template))
+ if self.children_nodes[-1].upper_address > self.upper_address:
+ self.upper_address = self.children_nodes[-1].upper_address
+ else:
+ ba = self.base_address + t.offset
+ if t.elements:
+ stride = t.stride if t.stride else 1
+ l = []
+ for i in range(0, t.elements):
+ cn = LeafNode(ba, t.name, t.size, t.elements, t.stride)
+ l.append(cn)
+ ba += stride
+ self.children_nodes.append(l)
+ if ba - stride + strsize2bytesize(t.size) > self.upper_address:
+ self.upper_address = ba - stride + strsize2bytesize(t.size)
+ else:
+ cn = LeafNode(ba, t.name, t.size, t.elements, t.stride)
+ self.children_nodes.append(cn)
+ if ba + strsize2bytesize(t.size) > self.upper_address:
+ self.upper_address = ba + strsize2bytesize(t.size)
+
+ def dump(self, level=0, instance=None):
+ ba_rep = ''.join([format(x, "02x") for x in u.padbelist(u.int2b7belist(self.base_address))])
+ print(f"{ba_rep}{level*' '} {self.template.name}{'['+str(instance)+']' if instance is not None else ''}")
+ for child in self.children_nodes:
+ if self.template.is_aggregate_node:
+ if type(child) == list:
+ for i, element in enumerate(child):
+ element.dump(level + 1, i)
+ else:
+ child.dump(level + 1)
+ else:
+ if type(child) == list:
+ for i, element in enumerate(child):
+ ba_rep = ''.join([format(x, "02x") for x in u.padbelist(u.int2b7belist(element.offset))])
+ print(f"{ba_rep}{(level + 1)*' '} {element.name}[{i}] {element.size}")
+ else:
+ ba_rep = ''.join([format(x, "02x") for x in u.padbelist(u.int2b7belist(child.offset))])
+ print(f"{ba_rep}{(level + 1)*' '} {child.name} {child.size}")
+
+ def set_data(self, data):
+ if len(data) > self.size():
+ raise ValueError("excessive data")
+ self.data[self.base_address:self.base_address + len(data)] = data
+
+ def get_data(self):
+ return self.data[self.base_address:self.upper_address + 1]
+
+ def size(self):
+ return self.upper_address - self.base_address + 1
+
+ def find_node(self, key):
+ if type(key) != list:
+ key = key.split('.')
+ if len(key) == 0:
+ raise ValueError("???")
+ if key[0] == self.template.name:
+ key = key[1:]
+ if len(key) == 0:
+ return self
+ m = re.match("([a-z_0-9]*)(\[([0-9]*)\])?", key[0])
+ if not m:
+ raise ValueError(f"Invalid key: {key[0]}")
+ if m.groups()[0] not in self.template.children_names:
+ raise ValueError(f"No such region: {m.groups()[0]}")
+ child = self.children_nodes[self.template.children_names[m.groups()[0]]]
+ if m.groups()[2] and type(child) != list:
+ raise ValueError(f"{m.groups()[0]} is not a list")
+ if not m.groups()[2] and type(child) == list:
+ if len(key) > 1:
+ raise ValueError(f"index needed for {m.groups()[0]}")
+ if not self.template.is_aggregate_node and len(key) > 1:
+ raise ValueError(f"{key[0]} has no more subregions")
+ if type(child) == list:
+ if len(key) > 1:
+ return child[int(m.groups()[2])].find_node(key[1:])
+ else:
+ return child if not m.groups()[2] else child[int(m.groups()[2])]
+ else:
+ if len(key) > 1:
+ return child.find_node(key[1:])
+ else:
+ return child
+
+ def value(self, key):
+ node = self.find_node(key)
+ if type(node) == list:
+ if type(node[0]) == AddrMapNode:
+ return node.data[node[0].base_address:node[-1].upper_address + 1]
+ else:
+ stride = node[0].stride if node[0].stride else 1
+ return self.data[node[0].offset:node[0].offset + len(node) * stride:stride]
+ else:
+ if type(node) == AddrMapNode:
+ return node.data[node.base_address:node.upper_address + 1]
+ else:
+ return get_leaf_data(self.data, node)
+
+ def set_value(self, key, data):
+ node = self.find_node(key)
+ if type(node) == list:
+ if type(node[0]) == AddrMapNode:
+ if len(data) > node[-1].upper_address + 1 - node[0].base_address:
+ raise ValueError("excessive data")
+ node.data[node[0].base_address:node[0].base_address + len(data)] = data
+ else:
+ stride = node[0].stride if node[0].stride else 1
+ if len(data) > len(self.data[node[0].offset:node[0].offset + len(node) * stride:stride]):
+ raise ValueError("excessive data")
+ self.data[node[0].offset:node[0].offset + len(node) * stride:stride] = data
+ else:
+ if type(node) == AddrMapNode:
+ if len(data) > node.upper_address + 1 - node.base_address:
+ raise ValueError("excessive data")
+ node.data[node.base_address:node.base_address + len(data)] = data
+ else:
+ return set_leaf_data(self.data, node, data)
+
+ def __getitem__(self, key):
+ return self.children_nodes[self.template.children_names[key]]
+
+ def __iter__(self):
+ return self.children_nodes.__iter__()
+
+ def __len__(self):
+ return len(self.children_nodes)
+
+def create_addr_map_tree(template):
+ r = AddrMapNode(template.nodes[template.root_node_name], 0, bytearray(), template)
+ r.data.extend(bytearray(r.upper_address))
+ return r
+
+# vim: expandtab shiftwidth=4 tabstop=4
diff --git a/xv/model.py b/xv/model.py
new file mode 100644
index 0000000..0c0c014
--- /dev/null
+++ b/xv/model.py
@@ -0,0 +1,140 @@
+# Part of the Fifteen-Thieves Project
+# Chris Xiong 2020
+# License: Expat (MIT)
+
+import rtmidi.midiutil as r
+import xv.util as u
+import xv.amap
+import threading
+
+class Model:
+ def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None):
+ self.midii,_ = r.open_midiinput(port_name)
+ self.midio,_ = r.open_midioutput(port_name)
+ self.midii.ignore_types(sysex=False)
+ self.model = model
+ self.devn = devn
+ self.aroot = address_mapping
+
+ def __del__(self):
+ del self.midii
+ del self.midio
+
+ def writeraw(self, addr, data):
+ if type(addr) == int:
+ addr = u.int2belist(addr)
+ addr = u.padbelist(addr)
+ if len(addr) > 4:
+ raise ValueError("invalid address")
+
+ for a in addr:
+ if a > 0x80 or a < 0:
+ raise ValueError("invalid address")
+
+ cs = u.roland_checksum(addr + data)
+ d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7]
+ #print(' '.join([format(a, "02x") for a in d]))
+
+ self.midio.send_message(d)
+
+ def readraw(self, addr, size, timeout=None):
+ if type(addr) == int:
+ addr = u.int2belist(addr)
+ addr = u.padbelist(addr)
+ if len(addr) > 4:
+ raise ValueError("invalid address")
+ if type(size) == int:
+ size = u.padbelist(u.int2b7belist(size))
+ rsize = u.b7belist2int(size)
+
+ for a in addr:
+ if a > 0x80 or a < 0:
+ raise ValueError("invalid address")
+
+ cs = u.roland_checksum(addr + size)
+ d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7]
+ #print(' '.join([format(a, "02x") for a in d]))
+
+ cv = threading.Condition()
+ m = []
+ def msgcb(d, cd):
+ _m, dt = d
+ nonlocal m, cv, addr
+ is_last = False
+
+ if u.check_roland_checksum(_m):
+ m.append( (_m[6:10], _m[10:-2]) )
+ if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize:
+ is_last = True
+ else:
+ raise RuntimeError("checksum error")
+
+ if is_last:
+ cv.acquire()
+ cv.notify()
+ cv.release()
+
+ self.midii.set_callback(msgcb)
+ cv.acquire()
+ self.midio.send_message(d)
+ cv.wait(timeout)
+ cv.release()
+ self.midii.cancel_callback()
+ return m
+
+ #current implementation doesn't support reading entire strided list of primitives
+ def read_locations(self, key, timeout=None):
+ node = self.aroot.find_node(key)
+ rqsz = 0
+ rqba = 0
+ if type(node) == list:
+ if type(node[0]) == xv.amap.AddrMapNode:
+ rqsz = node[-1].upper_address + 1 - node[0].base_address
+ rqba = node[0].base_address
+ else:
+ rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
+ rqba = node[0].offset
+ else:
+ if type(node) == xv.amap.AddrMapNode:
+ rqba = node.base_address
+ rqsz = node.upper_address + 1 - node.base_address
+ else:
+ rqba = node.offset
+ rqsz = xv.amap.strsize2bytesize(node.size)
+ rqba = u.padbelist(u.int2b7belist(rqba))
+ r = self.readraw(rqba, rqsz, timeout)
+ for m in r:
+ mba = u.b7belist2int(m[0])
+ self.aroot.data[mba:mba + len(m[1])] = m[1]
+ return r
+
+ #current implementation doesn't support writing entire strided list of primitives
+ def write_locations(self, key, data):
+ node = self.aroot.find_node(key)
+ dstsz = 0
+ dstba = 0
+ if type(node) == list:
+ if type(node[0]) == xv.amap.AddrMapNode:
+ dstsz = node[-1].upper_address + 1 - node[0].base_address
+ dstba = node[0].base_address
+ else:
+ dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
+ dstba = node[0].offset
+ else:
+ if type(node) == xv.amap.AddrMapNode:
+ dstba = node.base_address
+ dstsz = node.upper_address + 1 - node.base_address
+ else:
+ dstba = node.offset
+ dstsz = xv.amap.strsize2bytesize(node.size)
+ if type(data) != list:
+ self.aroot.set_value(key, data)
+ data = list(self.aroot.data[node.offset:node.offset + dstsz])
+ if len(data) > dstsz:
+ raise ValueError("excessive data")
+ self.aroot.data[dstba:dstba + len(data)] = data
+
+ dstba = u.padbelist(u.int2b7belist(dstba))
+ self.writeraw(dstba, data)
+
+# vim: expandtab shiftwidth=4 tabstop=4
diff --git a/xv/util.py b/xv/util.py
new file mode 100644
index 0000000..0b3f2e6
--- /dev/null
+++ b/xv/util.py
@@ -0,0 +1,90 @@
+# Part of the Fifteen-Thieves Project
+# Chris Xiong 2020
+# License: Expat (MIT)
+
+def roland_checksum(payload):
+ return (0x80 - sum(payload) % 0x80) % 0x80
+
+def check_roland_checksum(msg):
+ return (0x80 - sum(msg[6:-2]) % 0x80) % 0x80 == msg[-2]
+
+def int2belist(x):
+ ret = []
+ while x > 0:
+ ret = [x & 0xff] + ret
+ x >>= 8
+ return ret
+
+def belist2int(x):
+ ret = 0
+ for i in x:
+ ret <<= 8
+ ret |= i
+ return ret
+
+def int2b7belist(x):
+ ret = []
+ while x > 0:
+ ret = [x & 0x7f] + ret
+ x >>= 7
+ return ret
+
+def strb7list(x):
+ return ' '.join([format(a, "02x") for a in x])
+
+def b7belist2int(x):
+ ret = 0
+ for i in x:
+ ret <<= 7
+ ret |= i
+ return ret
+
+def hexrep2b7val(x):
+ ret = 0
+ b = 0
+ while x > 0:
+ if x & 0xff > 0x7f:
+ raise ValueError("invalid value")
+ ret |= (x & 0xff) << (7 * b)
+ b += 1
+ x >>= 8
+ return ret
+
+def padbelist(x, l=4):
+ return ([0]*(l-len(x))) + x
+
+def int2packetint(x):
+ ret = []
+ while x > 0:
+ ret = [x & 0x0f] + ret
+ x >>= 4
+ return ret
+
+def packetint2int(x):
+ ret = 0
+ for i in x:
+ ret <<= 4
+ ret |= i
+ return ret
+
+def load_memoryregion(fn):
+ ret = []
+ with open(fn, "rb") as f:
+ while True:
+ addr = list(f.read(4))
+ if len(addr) < 4:
+ break
+ l = belist2int(list(f.read(4)))
+ data = list(f.read(l))
+ ret.append( (addr, data) )
+ return ret
+
+def write_memoryregion(fn, mr):
+ with open(fn, "wb") as f:
+ for addr, data in mr:
+ l = padbelist(int2belist(len(data)))
+ f.write(bytearray(addr))
+ f.write(bytearray(l))
+ f.write(bytearray(data))
+
+# vim: expandtab shiftwidth=4 tabstop=4