# Part of the Fifteen-Thieves Project # Chris Xiong 2020 # License: Expat (MIT) import rtmidi.midiutil as r import xv.util as u import xv.amap import threading class Model: def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None): self.midii,_ = r.open_midiinput(port_name) self.midio,_ = r.open_midioutput(port_name) self.midii.ignore_types(sysex=False) self.model = model self.devn = devn self.aroot = address_mapping def __del__(self): del self.midii del self.midio def writeraw(self, addr, data): if type(addr) == int: addr = u.int2belist(addr) addr = u.padbelist(addr) if len(addr) > 4: raise ValueError("invalid address") for a in addr: if a > 0x80 or a < 0: raise ValueError("invalid address") cs = u.roland_checksum(addr + data) d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7] #print(' '.join([format(a, "02x") for a in d])) self.midio.send_message(d) def readraw(self, addr, size, timeout=None): if type(addr) == int: addr = u.int2belist(addr) addr = u.padbelist(addr) if len(addr) > 4: raise ValueError("invalid address") if type(size) == int: size = u.padbelist(u.int2b7belist(size)) rsize = u.b7belist2int(size) for a in addr: if a > 0x80 or a < 0: raise ValueError("invalid address") cs = u.roland_checksum(addr + size) d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7] #print(' '.join([format(a, "02x") for a in d])) cv = threading.Condition() m = [] def msgcb(d, cd): _m, dt = d nonlocal m, cv, addr is_last = False if u.check_roland_checksum(_m): m.append( (_m[6:10], _m[10:-2]) ) if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize: is_last = True else: raise RuntimeError("checksum error") if is_last: cv.acquire() cv.notify() cv.release() self.midii.set_callback(msgcb) cv.acquire() self.midio.send_message(d) cv.wait(timeout) cv.release() self.midii.cancel_callback() return m #current implementation doesn't support reading entire strided list of primitives def read_locations(self, key, timeout=None): node = self.aroot.find_node(key) rqsz = 0 rqba = 0 if type(node) == list: if type(node[0]) == xv.amap.AddrMapNode: rqsz = node[-1].upper_address + 1 - node[0].base_address rqba = node[0].base_address else: rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1 rqba = node[0].offset else: if type(node) == xv.amap.AddrMapNode: rqba = node.base_address rqsz = node.upper_address + 1 - node.base_address else: rqba = node.offset rqsz = xv.amap.strsize2bytesize(node.size) rqba = u.padbelist(u.int2b7belist(rqba)) r = self.readraw(rqba, rqsz, timeout) for m in r: mba = u.b7belist2int(m[0]) self.aroot.data[mba:mba + len(m[1])] = m[1] return r #current implementation doesn't support writing entire strided list of primitives def write_locations(self, key, data): node = self.aroot.find_node(key) dstsz = 0 dstba = 0 if type(node) == list: if type(node[0]) == xv.amap.AddrMapNode: dstsz = node[-1].upper_address + 1 - node[0].base_address dstba = node[0].base_address else: dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1 dstba = node[0].offset else: if type(node) == xv.amap.AddrMapNode: dstba = node.base_address dstsz = node.upper_address + 1 - node.base_address else: dstba = node.offset dstsz = xv.amap.strsize2bytesize(node.size) if type(data) != list: self.aroot.set_value(key, data) data = list(self.aroot.data[node.offset:node.offset + dstsz]) if len(data) > dstsz: raise ValueError("excessive data") self.aroot.data[dstba:dstba + len(data)] = data dstba = u.padbelist(u.int2b7belist(dstba)) self.writeraw(dstba, data) # vim: expandtab shiftwidth=4 tabstop=4