aboutsummaryrefslogblamecommitdiff
path: root/xv/model.py
blob: 0c0c014fe391a52acd2ba0c5b264b015bd44035a (plain) (tree)











































































































































                                                                                                      
# Part of the Fifteen-Thieves Project
# Chris Xiong 2020
# License: Expat (MIT)

import rtmidi.midiutil as r
import xv.util as u
import xv.amap
import threading

class Model:
    def __init__(self, port_name=None, model=[0x00, 0x00], devn=0x10, address_mapping=None):
        self.midii,_ = r.open_midiinput(port_name)
        self.midio,_ = r.open_midioutput(port_name)
        self.midii.ignore_types(sysex=False)
        self.model = model
        self.devn = devn
        self.aroot = address_mapping

    def __del__(self):
        del self.midii
        del self.midio

    def writeraw(self, addr, data):
        if type(addr) == int:
            addr = u.int2belist(addr)
        addr = u.padbelist(addr)
        if len(addr) > 4:
            raise ValueError("invalid address")

        for a in addr:
            if a > 0x80 or a < 0:
                raise ValueError("invalid address")

        cs = u.roland_checksum(addr + data)
        d = [0xf0, 0x41, self.devn] + self.model + [0x12] + addr + data + [cs, 0xf7]
        #print(' '.join([format(a, "02x") for a in d]))

        self.midio.send_message(d)

    def readraw(self, addr, size, timeout=None):
        if type(addr) == int:
            addr = u.int2belist(addr)
        addr = u.padbelist(addr)
        if len(addr) > 4:
            raise ValueError("invalid address")
        if type(size) == int:
            size = u.padbelist(u.int2b7belist(size))
        rsize = u.b7belist2int(size)

        for a in addr:
            if a > 0x80 or a < 0:
                raise ValueError("invalid address")

        cs = u.roland_checksum(addr + size)
        d = [0xf0, 0x41, self.devn] + self.model + [0x11] + addr + size + [cs, 0xf7]
        #print(' '.join([format(a, "02x") for a in d]))

        cv = threading.Condition()
        m = []
        def msgcb(d, cd):
            _m, dt = d
            nonlocal m, cv, addr
            is_last = False

            if u.check_roland_checksum(_m):
                m.append( (_m[6:10], _m[10:-2]) )
                if u.b7belist2int(_m[6:10]) - u.b7belist2int(addr) + len(_m[10:-2]) >= rsize:
                    is_last = True
            else:
                raise RuntimeError("checksum error")

            if is_last:
                cv.acquire()
                cv.notify()
                cv.release()

        self.midii.set_callback(msgcb)
        cv.acquire()
        self.midio.send_message(d)
        cv.wait(timeout)
        cv.release()
        self.midii.cancel_callback()
        return m

    #current implementation doesn't support reading entire strided list of primitives
    def read_locations(self, key, timeout=None):
        node = self.aroot.find_node(key)
        rqsz = 0
        rqba = 0
        if type(node) == list:
            if type(node[0]) == xv.amap.AddrMapNode:
                rqsz = node[-1].upper_address + 1 - node[0].base_address
                rqba = node[0].base_address
            else:
                rqsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
                rqba = node[0].offset
        else:
            if type(node) == xv.amap.AddrMapNode:
                rqba = node.base_address
                rqsz = node.upper_address + 1 - node.base_address
            else:
                rqba = node.offset
                rqsz = xv.amap.strsize2bytesize(node.size)
        rqba = u.padbelist(u.int2b7belist(rqba))
        r = self.readraw(rqba, rqsz, timeout)
        for m in r:
            mba = u.b7belist2int(m[0])
            self.aroot.data[mba:mba + len(m[1])] = m[1]
        return r

    #current implementation doesn't support writing entire strided list of primitives
    def write_locations(self, key, data):
        node = self.aroot.find_node(key)
        dstsz = 0
        dstba = 0
        if type(node) == list:
            if type(node[0]) == xv.amap.AddrMapNode:
                dstsz = node[-1].upper_address + 1 - node[0].base_address
                dstba = node[0].base_address
            else:
                dstsz = node[-1].offset - node[0].offset + xv.amap.strsize2bytesize(node[-1].size) + 1
                dstba = node[0].offset
        else:
            if type(node) == xv.amap.AddrMapNode:
                dstba = node.base_address
                dstsz = node.upper_address + 1 - node.base_address
            else:
                dstba = node.offset
                dstsz = xv.amap.strsize2bytesize(node.size)
                if type(data) != list:
                    self.aroot.set_value(key, data)
                    data = list(self.aroot.data[node.offset:node.offset + dstsz])
        if len(data) > dstsz:
            raise ValueError("excessive data")
        self.aroot.data[dstba:dstba + len(data)] = data

        dstba = u.padbelist(u.int2b7belist(dstba))
        self.writeraw(dstba, data)

# vim: expandtab shiftwidth=4 tabstop=4