# Part of the Fifteen-Thieves Project
# Chris Xiong 2020
# License: Expat (MIT)
import rtmidi.midiutil as r
import xv.util as u
import xv.amap
import threading
class Model:
def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None):
self.midii,_ = r.open_midiinput(port_name)
self.midio,_ = r.open_midioutput(port_name)
self.midii.ignore_types(sysex=False)
self.model = model
self.devn = devn
self.aroot = address_mapping
def __del__(self):
del self.midii
del self.midio
def writeraw(self, addr, data):
if type(addr) == int:
addr = u.int2belist(addr)
addr = u.padbelist(addr)
if len(addr) > 4:
raise ValueError("invalid address")
for a in addr:
if a > 0x80 or a < 0:
raise ValueError("invalid address")
cs = u.roland_checksum(addr + data)
d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7]
#print(' '.join([format(a, "02x") for a in d]))
self.midio.send_message(d)
def readraw(self, addr, size, timeout=None):
if type(addr) == int:
addr = u.int2belist(addr)
addr = u.padbelist(addr)
if len(addr) > 4:
raise ValueError("invalid address")
if type(size) == int:
size = u.padbelist(u.int2b7belist(size))
rsize = u.b7belist2int(size)
for a in addr:
if a > 0x80 or a < 0:
raise ValueError("invalid address")
cs = u.roland_checksum(addr + size)
d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7]
#print(' '.join([format(a, "02x") for a in d]))
cv = threading.Condition()
m = []
def msgcb(d, cd):
_m, dt = d
nonlocal m, cv, addr
is_last = False
if u.check_roland_checksum(_m):
m.append( (_m[6:10], _m[10:-2]) )
if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize:
is_last = True
else:
raise RuntimeError("checksum error")
if is_last:
cv.acquire()
cv.notify()
cv.release()
self.midii.set_callback(msgcb)
cv.acquire()
self.midio.send_message(d)
cv.wait(timeout)
cv.release()
self.midii.cancel_callback()
return m
#current implementation doesn't support reading entire strided list of primitives
def read_locations(self, key, timeout=None):
node = self.aroot.find_node(key)
rqsz = 0
rqba = 0
if type(node) == list:
if type(node[0]) == xv.amap.AddrMapNode:
rqsz = node[-1].upper_address + 1 - node[0].base_address
rqba = node[0].base_address
else:
rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
rqba = node[0].offset
else:
if type(node) == xv.amap.AddrMapNode:
rqba = node.base_address
rqsz = node.upper_address + 1 - node.base_address
else:
rqba = node.offset
rqsz = xv.amap.strsize2bytesize(node.size)
rqba = u.padbelist(u.int2b7belist(rqba))
r = self.readraw(rqba, rqsz, timeout)
for m in r:
mba = u.b7belist2int(m[0])
self.aroot.data[mba:mba + len(m[1])] = m[1]
return r
#current implementation doesn't support writing entire strided list of primitives
def write_locations(self, key, data):
node = self.aroot.find_node(key)
dstsz = 0
dstba = 0
if type(node) == list:
if type(node[0]) == xv.amap.AddrMapNode:
dstsz = node[-1].upper_address + 1 - node[0].base_address
dstba = node[0].base_address
else:
dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
dstba = node[0].offset
else:
if type(node) == xv.amap.AddrMapNode:
dstba = node.base_address
dstsz = node.upper_address + 1 - node.base_address
else:
dstba = node.offset
dstsz = xv.amap.strsize2bytesize(node.size)
if type(data) != list:
self.aroot.set_value(key, data)
data = list(self.aroot.data[node.offset:node.offset + dstsz])
if len(data) > dstsz:
raise ValueError("excessive data")
self.aroot.data[dstba:dstba + len(data)] = data
dstba = u.padbelist(u.int2b7belist(dstba))
self.writeraw(dstba, data)
# vim: expandtab shiftwidth=4 tabstop=4