aboutsummaryrefslogtreecommitdiff
path: root/xv/model.py
diff options
context:
space:
mode:
Diffstat (limited to 'xv/model.py')
-rw-r--r--xv/model.py140
1 files changed, 140 insertions, 0 deletions
diff --git a/xv/model.py b/xv/model.py
new file mode 100644
index 0000000..0c0c014
--- /dev/null
+++ b/xv/model.py
@@ -0,0 +1,140 @@
+# Part of the Fifteen-Thieves Project
+# Chris Xiong 2020
+# License: Expat (MIT)
+
+import rtmidi.midiutil as r
+import xv.util as u
+import xv.amap
+import threading
+
+class Model:
+ def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None):
+ self.midii,_ = r.open_midiinput(port_name)
+ self.midio,_ = r.open_midioutput(port_name)
+ self.midii.ignore_types(sysex=False)
+ self.model = model
+ self.devn = devn
+ self.aroot = address_mapping
+
+ def __del__(self):
+ del self.midii
+ del self.midio
+
+ def writeraw(self, addr, data):
+ if type(addr) == int:
+ addr = u.int2belist(addr)
+ addr = u.padbelist(addr)
+ if len(addr) > 4:
+ raise ValueError("invalid address")
+
+ for a in addr:
+ if a > 0x80 or a < 0:
+ raise ValueError("invalid address")
+
+ cs = u.roland_checksum(addr + data)
+ d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7]
+ #print(' '.join([format(a, "02x") for a in d]))
+
+ self.midio.send_message(d)
+
+ def readraw(self, addr, size, timeout=None):
+ if type(addr) == int:
+ addr = u.int2belist(addr)
+ addr = u.padbelist(addr)
+ if len(addr) > 4:
+ raise ValueError("invalid address")
+ if type(size) == int:
+ size = u.padbelist(u.int2b7belist(size))
+ rsize = u.b7belist2int(size)
+
+ for a in addr:
+ if a > 0x80 or a < 0:
+ raise ValueError("invalid address")
+
+ cs = u.roland_checksum(addr + size)
+ d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7]
+ #print(' '.join([format(a, "02x") for a in d]))
+
+ cv = threading.Condition()
+ m = []
+ def msgcb(d, cd):
+ _m, dt = d
+ nonlocal m, cv, addr
+ is_last = False
+
+ if u.check_roland_checksum(_m):
+ m.append( (_m[6:10], _m[10:-2]) )
+ if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize:
+ is_last = True
+ else:
+ raise RuntimeError("checksum error")
+
+ if is_last:
+ cv.acquire()
+ cv.notify()
+ cv.release()
+
+ self.midii.set_callback(msgcb)
+ cv.acquire()
+ self.midio.send_message(d)
+ cv.wait(timeout)
+ cv.release()
+ self.midii.cancel_callback()
+ return m
+
+ #current implementation doesn't support reading entire strided list of primitives
+ def read_locations(self, key, timeout=None):
+ node = self.aroot.find_node(key)
+ rqsz = 0
+ rqba = 0
+ if type(node) == list:
+ if type(node[0]) == xv.amap.AddrMapNode:
+ rqsz = node[-1].upper_address + 1 - node[0].base_address
+ rqba = node[0].base_address
+ else:
+ rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
+ rqba = node[0].offset
+ else:
+ if type(node) == xv.amap.AddrMapNode:
+ rqba = node.base_address
+ rqsz = node.upper_address + 1 - node.base_address
+ else:
+ rqba = node.offset
+ rqsz = xv.amap.strsize2bytesize(node.size)
+ rqba = u.padbelist(u.int2b7belist(rqba))
+ r = self.readraw(rqba, rqsz, timeout)
+ for m in r:
+ mba = u.b7belist2int(m[0])
+ self.aroot.data[mba:mba + len(m[1])] = m[1]
+ return r
+
+ #current implementation doesn't support writing entire strided list of primitives
+ def write_locations(self, key, data):
+ node = self.aroot.find_node(key)
+ dstsz = 0
+ dstba = 0
+ if type(node) == list:
+ if type(node[0]) == xv.amap.AddrMapNode:
+ dstsz = node[-1].upper_address + 1 - node[0].base_address
+ dstba = node[0].base_address
+ else:
+ dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
+ dstba = node[0].offset
+ else:
+ if type(node) == xv.amap.AddrMapNode:
+ dstba = node.base_address
+ dstsz = node.upper_address + 1 - node.base_address
+ else:
+ dstba = node.offset
+ dstsz = xv.amap.strsize2bytesize(node.size)
+ if type(data) != list:
+ self.aroot.set_value(key, data)
+ data = list(self.aroot.data[node.offset:node.offset + dstsz])
+ if len(data) > dstsz:
+ raise ValueError("excessive data")
+ self.aroot.data[dstba:dstba + len(data)] = data
+
+ dstba = u.padbelist(u.int2b7belist(dstba))
+ self.writeraw(dstba, data)
+
+# vim: expandtab shiftwidth=4 tabstop=4