From aefdc8d0edad5d77d73ac40d8002363e12da00da Mon Sep 17 00:00:00 2001 From: Chris Xiong Date: Tue, 9 Aug 2022 02:56:10 -0400 Subject: fures, ostendite se! --- xv/model.py | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 xv/model.py (limited to 'xv/model.py') diff --git a/xv/model.py b/xv/model.py new file mode 100644 index 0000000..0c0c014 --- /dev/null +++ b/xv/model.py @@ -0,0 +1,140 @@ +# Part of the Fifteen-Thieves Project +# Chris Xiong 2020 +# License: Expat (MIT) + +import rtmidi.midiutil as r +import xv.util as u +import xv.amap +import threading + +class Model: + def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None): + self.midii,_ = r.open_midiinput(port_name) + self.midio,_ = r.open_midioutput(port_name) + self.midii.ignore_types(sysex=False) + self.model = model + self.devn = devn + self.aroot = address_mapping + + def __del__(self): + del self.midii + del self.midio + + def writeraw(self, addr, data): + if type(addr) == int: + addr = u.int2belist(addr) + addr = u.padbelist(addr) + if len(addr) > 4: + raise ValueError("invalid address") + + for a in addr: + if a > 0x80 or a < 0: + raise ValueError("invalid address") + + cs = u.roland_checksum(addr + data) + d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7] + #print(' '.join([format(a, "02x") for a in d])) + + self.midio.send_message(d) + + def readraw(self, addr, size, timeout=None): + if type(addr) == int: + addr = u.int2belist(addr) + addr = u.padbelist(addr) + if len(addr) > 4: + raise ValueError("invalid address") + if type(size) == int: + size = u.padbelist(u.int2b7belist(size)) + rsize = u.b7belist2int(size) + + for a in addr: + if a > 0x80 or a < 0: + raise ValueError("invalid address") + + cs = u.roland_checksum(addr + size) + d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7] + #print(' '.join([format(a, "02x") for a in d])) + + cv = threading.Condition() + m = [] + def msgcb(d, cd): + _m, dt = d + nonlocal m, cv, addr + is_last = False + + if u.check_roland_checksum(_m): + m.append( (_m[6:10], _m[10:-2]) ) + if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize: + is_last = True + else: + raise RuntimeError("checksum error") + + if is_last: + cv.acquire() + cv.notify() + cv.release() + + self.midii.set_callback(msgcb) + cv.acquire() + self.midio.send_message(d) + cv.wait(timeout) + cv.release() + self.midii.cancel_callback() + return m + + #current implementation doesn't support reading entire strided list of primitives + def read_locations(self, key, timeout=None): + node = self.aroot.find_node(key) + rqsz = 0 + rqba = 0 + if type(node) == list: + if type(node[0]) == xv.amap.AddrMapNode: + rqsz = node[-1].upper_address + 1 - node[0].base_address + rqba = node[0].base_address + else: + rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1 + rqba = node[0].offset + else: + if type(node) == xv.amap.AddrMapNode: + rqba = node.base_address + rqsz = node.upper_address + 1 - node.base_address + else: + rqba = node.offset + rqsz = xv.amap.strsize2bytesize(node.size) + rqba = u.padbelist(u.int2b7belist(rqba)) + r = self.readraw(rqba, rqsz, timeout) + for m in r: + mba = u.b7belist2int(m[0]) + self.aroot.data[mba:mba + len(m[1])] = m[1] + return r + + #current implementation doesn't support writing entire strided list of primitives + def write_locations(self, key, data): + node = self.aroot.find_node(key) + dstsz = 0 + dstba = 0 + if type(node) == list: + if type(node[0]) == xv.amap.AddrMapNode: + dstsz = node[-1].upper_address + 1 - node[0].base_address + dstba = node[0].base_address + else: + dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1 + dstba = node[0].offset + else: + if type(node) == xv.amap.AddrMapNode: + dstba = node.base_address + dstsz = node.upper_address + 1 - node.base_address + else: + dstba = node.offset + dstsz = xv.amap.strsize2bytesize(node.size) + if type(data) != list: + self.aroot.set_value(key, data) + data = list(self.aroot.data[node.offset:node.offset + dstsz]) + if len(data) > dstsz: + raise ValueError("excessive data") + self.aroot.data[dstba:dstba + len(data)] = data + + dstba = u.padbelist(u.int2b7belist(dstba)) + self.writeraw(dstba, data) + +# vim: expandtab shiftwidth=4 tabstop=4 -- cgit v1.2.3