import math from rich.console import ConsoleOptions, Console from rich.segment import Segment from rich.style import Style from textual.app import App, ComposeResult, Widget, RenderResult from textual.color import Color from textual.containers import HorizontalScroll, Horizontal, Vertical from textual.reactive import reactive, var from textual.widgets import Static, Button, Pretty, Log, Footer, Header import mido import sys def decode_message(msg_bytes): result_words = [] try: chunks = zip(*[iter(msg_bytes)] * 5, strict=True) for packed in chunks: value = packed[4] << 28 | packed[3] << 21 | packed[2] << 14 | packed[1] << 7 | packed[0] result_words.append(value) except ValueError: pass # invalid message return result_words def hex_format(msg, width=2): return ' '.join([("{x:0" + str(width) + "x}").format(x=x) for x in msg]) def bin_format(msg, width=32): return ' '.join([("{x:0" + str(width) + "b}").format(x=x) for x in msg]) def get_bitfield(word, position, length): return (word >> position) & ((1 << length) - 1) def set_bitfield(word, val, position, length): mask = ((1 << length) - 1) val = (val & mask) << position return (word & mask << position) | val def level_to_db(val): return 6 + (val - 0xFF) / 2 class MeterGradient: def __init__(self, value: int, color1: str, color2: str) -> None: self._color1 = Color.parse(color1) self._color2 = Color.parse(color2) self._value = 1 - value def __rich_console__( self, console: Console, options: ConsoleOptions ) -> RenderResult: width = options.max_width height = options.height or options.max_height color1 = self._color1 color2 = self._color2 default_color = Color(0, 0, 0).rich_color from_color = Style.from_color blend = color1.blend rich_color1 = color1.rich_color for y in range(height): rel_y = y / (height - 1) if rel_y < self._value: yield Segment(f"{width * ' '}\n") else: line_color = from_color( default_color, ( blend(color2, rel_y).rich_color if height > 1 else rich_color1 ), ) yield Segment(f"{width * ' '}\n", line_color) class Meter(Widget): DEFAULT_CSS = """ Meter { width: 1 } """ value = reactive(0) def render(self) -> RenderResult: return MeterGradient(self.value, '#04ff00', '#04aa00') class Channel(Widget): DEFAULT_CSS = """ Channel { width: 10; border: solid #333333; Horizontal { height: 100%; } .label { width: 100%; text-align: center; } .controls { margin-left: 1; width: 6 } Button { margin-bottom: 1; padding: 0; min-width: 6; height: 1; } .mute { background: #00f !important; } .solo { background: #0ff; } .phantom { background: #f00; } .inst { background: #ff0; } } """ rms_value = var(0) scaled_rms_value = var(0.0) RMS_FACTOR = float(2**54) def compute_scaled_rms_value(self) -> float: try: db = 10 * math.log10(self.rms_value / self.RMS_FACTOR) return (db + 90) / 110 except ValueError: return 0 def watch_scaled_rms_value(self, val) -> None: self.query_one(Meter).value = val def __init__(self, label, phantom=False, inst=False, *children: Widget): super().__init__(*children) self.label = label self.phantom = phantom self.inst = inst def compose(self) -> ComposeResult: yield Static(self.label, classes='label') with Horizontal(): yield Meter() with Vertical(classes='controls'): yield Button("M", classes='mute', compact=True) yield Button("S", classes='solo', compact=True) if self.phantom: yield Button("48v", classes='phantom', compact=True) if self.inst: yield Button("Inst", classes='inst', compact=True) class ChannelRow(Widget): DEFAULT_CSS = """ ChannelRow { height: 33% } """ rms_values = var([]) def watch_rms_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.rms_value = value class InputChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel('AN1', phantom=True) yield Channel('AN2', phantom=True, inst=True) yield Channel('ADAT1') yield Channel('ADAT2') yield Channel('ADAT3') yield Channel('ADAT4') yield Channel('ADAT5') yield Channel('ADAT6') yield Channel('ADAT7') yield Channel('ADAT8') class PlaybackChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel('AN1') yield Channel('AN2') yield Channel('PH3') yield Channel('PH4') yield Channel('ADAT1') yield Channel('ADAT2') yield Channel('ADAT3') yield Channel('ADAT4') yield Channel('ADAT5') yield Channel('ADAT6') yield Channel('ADAT7') yield Channel('ADAT8') class OutputChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel('AN1') yield Channel('AN2') yield Channel('PH3') yield Channel('PH4') yield Channel('ADAT1') yield Channel('ADAT2') yield Channel('ADAT3') yield Channel('ADAT4') yield Channel('ADAT5') yield Channel('ADAT6') yield Channel('ADAT7') yield Channel('ADAT8') class BFStatus: word0 = 0 word1 = 0 word2 = 0 # word 0 ch1_48v = 0 ch2_48v = 0 ch2_inst = 0 digital_out = 0 # 0 = ADAT, 1 = SPDIF # word 1 out_1_vol = 0 out_2_vol = 0 ch1_gain = 0 ch2_gain = 0 # word 2 out_3_vol = 0 out_4_vol = 0 dim = 0 def parse(self, msg): self.word0 = format(msg[0], '032b') if len(msg) > 2: self.word1 = format(msg[1], '032b') self.word2 = format(msg[2], '032b') self.ch1_48v = get_bitfield(msg[0], 31, 1) self.ch2_48v = get_bitfield(msg[0], 30, 1) self.ch2_inst = get_bitfield(msg[0], 29, 1) self.digital_out = get_bitfield(msg[0], 28, 1) self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8)) self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8)) self.ch2_gain = get_bitfield(msg[1], 24, 5) self.ch1_gain = get_bitfield(msg[1], 19, 5) self.out_3_vol = level_to_db(get_bitfield(msg[2], 15, 8)) self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8)) self.dim = get_bitfield(msg[2], 5, 1) class BFRMS: def __init__(self): self.input_rms = [0 for _ in range(10)] self.playback_rms = [0 for _ in range(12)] self.fx_in_rms = [0, 0] self.output_rms = [0 for _ in range(12)] self.fx_out_rms = [0, 0] def _parse_rms(self, values, target, offset=0): n = offset for i in range(0, len(values), 2): target[n] = (values[i] | (values[i + 1] << 32)) n += 1 def parse0(self, id0_msg): self._parse_rms(id0_msg[3:23], self.input_rms) self._parse_rms(id0_msg[23:], self.playback_rms) def parse1(self, id1_msg): self._parse_rms(id1_msg[:6], self.playback_rms, 9) self._parse_rms(id1_msg[6:10], self.fx_in_rms) self._parse_rms(id1_msg[10:34], self.output_rms) self._parse_rms(id1_msg[34:38], self.fx_in_rms) class Status(Widget): status_data = reactive({}) def compose(self) -> ComposeResult: yield Pretty(self.status_data, id='status') yield Pretty(self.status_data, id='rms') yield Pretty(self.status_data, id='peak') yield Log(self.status_data, id='log') class Mixer(App): in_port: mido.ports.BaseInput out_port: mido.ports.BaseInput messages = [] messageset = set() STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10]) bf_status = BFStatus() bf_rms = BFRMS() bf_peak = [] def __init__(self): super().__init__() in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name] out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name] if in_ports: self.in_port = mido.open_input(in_ports[0]) else: self.in_port = mido.open_input('Mixer', virtual=True) if out_ports: self.out_port = mido.open_output(out_ports[0]) else: self.out_port = mido.open_output('Mixer', virtual=True) def on_mount(self) -> None: self.set_interval(.1, self.update_messages) def update_messages(self) -> None: self.out_port.send(self.STATUS_MSG) for msg in self.in_port.iter_pending(): self.process_message(msg) def compose(self) -> ComposeResult: yield Header() yield InputChannelRow() yield PlaybackChannelRow() yield OutputChannelRow() yield Footer() def process_dump(self, msg: mido.Message) -> None: msg_bytes = msg.bytes()[1:-1] if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]: return # noop decoded_msg = decode_message(msg_bytes[5:]) subid = msg_bytes[4] msg_pt = str(subid) + ": " + bin_format(decoded_msg) if subid == 16: return if decoded_msg: self.query_one('#log').write_line(msg_pt) else: self.query_one('#log').write_line(bin_format(msg_bytes[5:])) def process_message(self, msg: mido.Message) -> None: msg_bytes = msg.bytes()[1:-1] if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]: return # noop decoded_msg = decode_message(msg_bytes[5:]) subid = msg_bytes[4] if subid == 0x00: self.bf_status.parse(decoded_msg) self.bf_rms.parse0(decoded_msg) elif subid == 0x01: self.bf_rms.parse1(decoded_msg) elif subid == 0x02: self.bf_peak = decoded_msg else: self.messages.append(hex_format(decoded_msg)) self.query_one('#log').update(self.messages) input_row = self.query_one(InputChannelRow) input_row.rms_values = self.bf_rms.input_rms input_row.mutate_reactive(ChannelRow.rms_values) playback_row = self.query_one(PlaybackChannelRow) playback_row.rms_values = self.bf_rms.playback_rms playback_row.mutate_reactive(ChannelRow.rms_values) output_row = self.query_one(OutputChannelRow) output_row.rms_values = self.bf_rms.output_rms output_row.mutate_reactive(ChannelRow.rms_values) #self.query_one('#status').update(self.bf_status.__dict__) """ else: key = str([subid] + decoded_msg) if key not in self.messageset: self.messageset.add(key) self.messages.append('{subid:X}: {msg}'.format( subid=subid, msg=hex_format(decoded_msg, width=8) )) self.query_one('#messages').update('\n'.join(self.messages)) """ if __name__ == '__main__': app = Mixer() app.run()