commit 5294df34c7391d3d0d0a3366648b67077f45ce61 Author: Brian Hrebec Date: Fri Mar 6 22:58:36 2026 -0600 Initial commit diff --git a/main.py b/main.py new file mode 100644 index 0000000..95ef38e --- /dev/null +++ b/main.py @@ -0,0 +1,394 @@ +from rich.console import ConsoleOptions, Console +from rich.segment import Segment +from rich.style import Style +from textual.app import App, ComposeResult, Widget, RenderResult +from textual.color import Color +from textual.containers import HorizontalScroll, Horizontal, Vertical +from textual.reactive import reactive, var +from textual.widgets import Static, Button, Pretty, Log, Footer, Header +import mido +import sys + + +def decode_message(msg_bytes): + result_words = [] + try: + chunks = zip(*[iter(msg_bytes)] * 5, strict=True) + for packed in chunks: + value = packed[4] << 28 | packed[3] << 21 | packed[2] << 14 | packed[1] << 7 | packed[0] + result_words.append(value) + except ValueError: + pass # invalid message + + return result_words + + +def hex_format(msg, width=2): + return ' '.join([("{x:0" + str(width) + "x}").format(x=x) for x in msg]) + + +def bin_format(msg, width=32): + return ' '.join([("{x:0" + str(width) + "b}").format(x=x) for x in msg]) + + +def get_bitfield(word, position, length): + return (word >> position) & ((1 << length) - 1) + + +def set_bitfield(word, val, position, length): + mask = ((1 << length) - 1) + val = (val & mask) << position + return (word & mask << position) | val + + +def level_to_db(val): + return 6 + (val - 0xFF) / 2 + + +class MeterGradient: + def __init__(self, value: int, color1: str, color2: str) -> None: + self._color1 = Color.parse(color1) + self._color2 = Color.parse(color2) + self._value = 1 - value + + + def __rich_console__( + self, console: Console, options: ConsoleOptions + ) -> RenderResult: + width = options.max_width + height = options.height or options.max_height + color1 = self._color1 + color2 = self._color2 + default_color = Color(0, 0, 0).rich_color + from_color = Style.from_color + blend = color1.blend + rich_color1 = color1.rich_color + for y in range(height): + rel_y = y / (height - 1) + if rel_y < self._value: + yield Segment(f"{width * ' '}\n") + else: + line_color = from_color( + default_color, + ( + blend(color2, rel_y).rich_color + if height > 1 + else rich_color1 + ), + ) + yield Segment(f"{width * ' '}\n", line_color) + + +class Meter(Widget): + DEFAULT_CSS = """ + Meter { + width: 1 + } + """ + value = reactive(0) + def render(self) -> RenderResult: + return MeterGradient(self.value, '#04ff00', '#04aa00') + + +class Channel(Widget): + DEFAULT_CSS = """ + Channel { + width: 10; + border: solid #333333; + Horizontal { + height: 100%; + } + .label { + width: 100%; + text-align: center; + } + .controls { + margin-left: 1; + width: 6 + } + Button { + margin-bottom: 1; + padding: 0; + min-width: 6; + height: 1; + } + .mute { + background: #00f !important; + } + .solo { + background: #0ff; + } + .phantom { + background: #f00; + } + .inst { + background: #ff0; + } + } + """ + + rms_value = var(0) + scaled_rms_value = var(0) + MAX_RMS = 2**64 + + def compute_scaled_rms_value(self) -> float: + return self.rms_value / self.MAX_RMS + + def watch_scaled_rms_value(self, val) -> None: + self.query_one(Meter).value = val + + def __init__(self, label, phantom=False, inst=False, *children: Widget): + super().__init__(*children) + self.label = label + self.phantom = phantom + self.inst = inst + + def compose(self) -> ComposeResult: + yield Static(self.label, classes='label') + with Horizontal(): + yield Meter() + with Vertical(classes='controls'): + yield Button("M", classes='mute', compact=True) + yield Button("S", classes='solo', compact=True) + if self.phantom: + yield Button("48v", classes='phantom', compact=True) + if self.inst: + yield Button("Inst", classes='inst', compact=True) + + +class ChannelRow(Widget): + DEFAULT_CSS = """ + ChannelRow { + height: 33% + } + """ + + rms_values = var([]) + + def watch_rms_values(self, old_values, new_values): + print('watch_rms_values', new_values) + for channel, value in zip(self.query(Channel), new_values): + channel.rms_value = value + +class InputChannelRow(ChannelRow): + def compose(self) -> ComposeResult: + with HorizontalScroll(): + yield Channel('AN1', phantom=True) + yield Channel('AN2', phantom=True, inst=True) + yield Channel('ADAT1') + yield Channel('ADAT2') + yield Channel('ADAT3') + yield Channel('ADAT4') + yield Channel('ADAT5') + yield Channel('ADAT6') + yield Channel('ADAT7') + yield Channel('ADAT8') + +class PlaybackChannelRow(ChannelRow): + def compose(self) -> ComposeResult: + with HorizontalScroll(): + yield Channel('AN1') + yield Channel('AN2') + yield Channel('PH3') + yield Channel('PH4') + yield Channel('ADAT1') + yield Channel('ADAT2') + yield Channel('ADAT3') + yield Channel('ADAT4') + yield Channel('ADAT5') + yield Channel('ADAT6') + yield Channel('ADAT7') + yield Channel('ADAT8') + +class OutputChannelRow(ChannelRow): + def compose(self) -> ComposeResult: + with HorizontalScroll(): + yield Channel('AN1') + yield Channel('AN2') + yield Channel('PH3') + yield Channel('PH4') + yield Channel('ADAT1') + yield Channel('ADAT2') + yield Channel('ADAT3') + yield Channel('ADAT4') + yield Channel('ADAT5') + yield Channel('ADAT6') + yield Channel('ADAT7') + yield Channel('ADAT8') + + +class BFStatus: + word0 = 0 + word1 = 0 + word2 = 0 + + # word 0 + ch1_48v = 0 + ch2_48v = 0 + ch2_inst = 0 + digital_out = 0 # 0 = ADAT, 1 = SPDIF + + # word 1 + out_1_vol = 0 + out_2_vol = 0 + ch1_gain = 0 + ch2_gain = 0 + + # word 2 + out_3_vol = 0 + out_4_vol = 0 + dim = 0 + + def parse(self, msg): + self.word0 = format(msg[0], '032b') + + if len(msg) > 2: + self.word1 = format(msg[1], '032b') + self.word2 = format(msg[2], '032b') + self.ch1_48v = get_bitfield(msg[0], 31, 1) + self.ch2_48v = get_bitfield(msg[0], 30, 1) + self.ch2_inst = get_bitfield(msg[0], 29, 1) + self.digital_out = get_bitfield(msg[0], 28, 1) + + self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8)) + self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8)) + + self.ch2_gain = get_bitfield(msg[1], 24, 5) + self.ch1_gain = get_bitfield(msg[1], 19, 5) + self.out_3_vol = level_to_db(get_bitfield(msg[2], 15, 8)) + self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8)) + self.dim = get_bitfield(msg[2], 5, 1) + + +class BFRMS: + def __init__(self): + self.input_rms = [0 for _ in range(10)] + self.playback_rms = [0 for _ in range(12)] + self.fx_in_rms = [0, 0] + self.output_rms = [0 for _ in range(12)] + self.fx_out_rms = [0, 0] + + def _parse_rms(self, values, target, offset=0): + n = offset + for i in range(0, len(values), 2): + target[n] = (values[i] | (values[i + 1] << 32)) + n += 1 + + def parse0(self, id0_msg): + self._parse_rms(id0_msg[3:23], self.input_rms) + self._parse_rms(id0_msg[23:], self.playback_rms) + + def parse1(self, id1_msg): + self._parse_rms(id1_msg[:6], self.playback_rms, 9) + self._parse_rms(id1_msg[6:10], self.fx_in_rms) + self._parse_rms(id1_msg[10:34], self.output_rms) + self._parse_rms(id1_msg[34:38], self.fx_in_rms) + + +class Status(Widget): + status_data = reactive({}) + + def compose(self) -> ComposeResult: + yield Pretty(self.status_data, id='status') + yield Pretty(self.status_data, id='rms') + yield Pretty(self.status_data, id='peak') + yield Log(self.status_data, id='log') + + +class Mixer(App): + in_port: mido.ports.BaseInput + out_port: mido.ports.BaseInput + messages = [] + messageset = set() + STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10]) + bf_status = BFStatus() + bf_rms = BFRMS() + bf_peak = [] + + def __init__(self): + super().__init__() + in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name] + out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name] + if in_ports: + self.in_port = mido.open_input(in_ports[0]) + else: + self.in_port = mido.open_input('Mixer', virtual=True) + + if out_ports: + self.out_port = mido.open_output(out_ports[0]) + else: + self.out_port = mido.open_output('Mixer', virtual=True) + + def on_mount(self) -> None: + self.set_interval(.5, self.update_messages) + + def update_messages(self) -> None: + self.out_port.send(self.STATUS_MSG) + for msg in self.in_port.iter_pending(): + self.process_message(msg) + + def compose(self) -> ComposeResult: + yield Header() + yield InputChannelRow() + yield PlaybackChannelRow() + yield OutputChannelRow() + yield Footer() + + def process_dump(self, msg: mido.Message) -> None: + msg_bytes = msg.bytes()[1:-1] + if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]: + return # noop + decoded_msg = decode_message(msg_bytes[5:]) + subid = msg_bytes[4] + + msg_pt = str(subid) + ": " + bin_format(decoded_msg) + if subid == 16: + return + + if decoded_msg: + self.query_one('#log').write_line(msg_pt) + else: + self.query_one('#log').write_line(bin_format(msg_bytes[5:])) + + def process_message(self, msg: mido.Message) -> None: + msg_bytes = msg.bytes()[1:-1] + if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]: + return # noop + decoded_msg = decode_message(msg_bytes[5:]) + subid = msg_bytes[4] + if subid == 0x00: + self.bf_status.parse(decoded_msg) + self.bf_rms.parse0(decoded_msg) + elif subid == 0x01: + self.bf_rms.parse1(decoded_msg) + elif subid == 0x02: + self.bf_peak = decoded_msg + else: + self.messages.append(hex_format(decoded_msg)) + self.query_one('#log').update(self.messages) + + print('update') + input_row = self.query_one(InputChannelRow) + input_row.rms_values = self.bf_rms.input_rms + playback_row = self.query_one(PlaybackChannelRow) + playback_row.rms_values = self.bf_rms.playback_rms + output_row = self.query_one(OutputChannelRow) + output_row.rms_values = self.bf_rms.output_rms + #self.query_one('#status').update(self.bf_status.__dict__) + + +""" +else: +key = str([subid] + decoded_msg) +if key not in self.messageset: +self.messageset.add(key) +self.messages.append('{subid:X}: {msg}'.format( + subid=subid, + msg=hex_format(decoded_msg, width=8) +)) +self.query_one('#messages').update('\n'.join(self.messages)) +""" + +if __name__ == '__main__': + app = Mixer() + app.run()