From f444daeeca23b08611a9d1161ccd532b2c102c33 Mon Sep 17 00:00:00 2001 From: Brian Hrebec Date: Mon, 9 Mar 2026 12:34:49 -0500 Subject: [PATCH] More protocol decoding --- encoding.py | 78 +++++++++++++++++ main.py | 235 ++++++++++++++++++++++++++++++++------------------- messages.txt | 6 +- 3 files changed, 230 insertions(+), 89 deletions(-) create mode 100644 encoding.py diff --git a/encoding.py b/encoding.py new file mode 100644 index 0000000..029e3c0 --- /dev/null +++ b/encoding.py @@ -0,0 +1,78 @@ +import math +from typing import Optional, List + +import mido + +PREFIX = [0x00, 0x20, 0x0D, 0x10] +SUBID_IN_STATUS = 0 +SUBID_IN_RMS = 1 +SUBID_IN_PEAK = 2 + +SUBID_OPTIONS = 0 +SUBID_MIXER = 1 +SUBID_LOOPBACK = 2 +SUBID_INPUT_OUTS = 3 +SUBID_GAIN = 4 + +def decode_message(msg: mido.Message) -> tuple[Optional[int], Optional[List[int]]]: + msg_bytes = msg.bytes() + if msg_bytes[:5] != [0xF0] + PREFIX: + return None, None # noop + + result_words = [] + try: + chunks = zip(*[iter(msg_bytes[6:-1])] * 5, strict=True) + for packed in chunks: + value = packed[4] << 28 | packed[3] << 21 | packed[2] << 14 | packed[1] << 7 | packed[0] + result_words.append(value) + except ValueError: + pass # invalid message + + return msg_bytes[5], result_words + + +def encode_message(subid: int, payload: List[int]) -> mido.Message: + data = [subid] + for word in payload: + data += [ + word & 0x7F, + word >> 7 & 0x7F, + word >> 14 & 0x7F, + word >> 21 & 0x7F, + word >> 28 & 0x7F, + ] + + return mido.Message('sysex', data=PREFIX + data) + + +def hex_format(msg, width=2): + return ' '.join([("{x:0" + str(width) + "x}").format(x=x) for x in msg]) + + +def bin_format(msg, width=32): + return ' '.join([("{x:0" + str(width) + "b}").format(x=x) for x in msg]) + + +def get_bitfield(word, position, length): + return (word >> position) & ((1 << length) - 1) + + +def set_bitfield(word, val, position, length): + mask = ((1 << length) - 1) + val = (val & mask) << position + return (word & mask << position) | val + + +def level_to_db(val): + return 6 + (val - 0xFF) / 2 + + +def fader_to_db(val): + return 6 + (abs(val) - 0xFF) / 2 + + +def format_db(val, min_db=-120): + if math.isnan(val) or val <= min_db: + return '∞' + else: + return format(val, '.1f') diff --git a/main.py b/main.py index eebc86e..58f3377 100644 --- a/main.py +++ b/main.py @@ -5,49 +5,61 @@ from rich.segment import Segment from rich.style import Style from textual.app import App, ComposeResult, Widget, RenderResult from textual.color import Color -from textual.containers import HorizontalScroll, Horizontal, Vertical +from textual.containers import HorizontalScroll, Horizontal, Vertical, Container from textual.reactive import reactive, var from textual.widgets import Static, Button, Pretty, Log, Footer, Header import mido -import sys + +from encoding import decode_message, encode_message, hex_format, get_bitfield, level_to_db, format_db +import encoding -def decode_message(msg_bytes): - result_words = [] - try: - chunks = zip(*[iter(msg_bytes)] * 5, strict=True) - for packed in chunks: - value = packed[4] << 28 | packed[3] << 21 | packed[2] << 14 | packed[1] << 7 | packed[0] - result_words.append(value) - except ValueError: - pass # invalid message +class MixerMatrix: + def __init__(self, out_port: mido.ports.BaseOutput, in_channels = 10, play_channels = 12, out_channels = 12) -> None: + self.out_port = out_port + self.in_channels = in_channels + self.in_levels = [[-120.0 for _ in range(in_channels)] for _ in range(out_channels)] + self.play_levels = [[0.0 if p == o else -120.0 for p in range(play_channels)] for o in range(out_channels)] + self.out_levels = [0.0 for _ in range(out_channels)] - return result_words + def update_out(self, channel, val): + self.out_levels[channel] = val + def update_in(self, in_channel, out_channel, val): + self.in_levels[out_channel][in_channel] = val + self._send_update(in_channel, out_channel, val) -def hex_format(msg, width=2): - return ' '.join([("{x:0" + str(width) + "x}").format(x=x) for x in msg]) + def update_play(self, play_channel, out_channel, val): + self.play_levels[out_channel][play_channel] = val + self._send_update(play_channel + self.in_channels, out_channel, val) + def _send_out_update(self, out_channel, val): + reg = 0x3e0 | out_channel + msg = val << 15 | reg + self.out_port.send(encode_message(encoding.SUBID_MIXER, msg)) + if out_channel < 4: + gain_val = int(0x3b + (val / 2**16) * 0x3b) + gain_msg = gain_val << 16 | 0x10 + out_channel + self.out_port.send(encode_message(encoding.SUBID_GAIN, gain_msg)) -def bin_format(msg, width=32): - return ' '.join([("{x:0" + str(width) + "b}").format(x=x) for x in msg]) - - -def get_bitfield(word, position, length): - return (word >> position) & ((1 << length) - 1) - - -def set_bitfield(word, val, position, length): - mask = ((1 << length) - 1) - val = (val & mask) << position - return (word & mask << position) | val - - -def level_to_db(val): - return 6 + (val - 0xFF) / 2 - + def _send_update(self, in_channel, out_channel, val): + reg = out_channel * 24 + in_channel + msg = val << 15 | reg + self.out_port.send(encode_message(encoding.SUBID_MIXER, msg)) class MeterGradient: + FRACTIONS = [ + ' ', + '▁', + '▂', + '▃', + '▄', + '▅', + '▆', + '▇', + '█', + ] + def __init__(self, value: int, color1: str, color2: str) -> None: self._color1 = Color.parse(color1) self._color2 = Color.parse(color2) @@ -61,24 +73,26 @@ class MeterGradient: height = options.height or options.max_height color1 = self._color1 color2 = self._color2 - default_color = Color(0, 0, 0).rich_color from_color = Style.from_color blend = color1.blend rich_color1 = color1.rich_color + top_char = self.FRACTIONS[math.floor((self._value * height * 8) % 8)] + top = math.ceil(self._value * height) for y in range(height): rel_y = y / (height - 1) - if rel_y < self._value: + if y < top: yield Segment(f"{width * ' '}\n") else: line_color = from_color( - default_color, ( blend(color2, rel_y).rich_color if height > 1 else rich_color1 ), + None, ) - yield Segment(f"{width * ' '}\n", line_color) + char = top_char if y == top else '█' + yield Segment(f"{width * char}\n", line_color) class Meter(Widget): @@ -87,7 +101,7 @@ class Meter(Widget): width: 1 } """ - value = reactive(0) + value = reactive(1.0) def render(self) -> RenderResult: return MeterGradient(self.value, '#04ff00', '#04aa00') @@ -95,7 +109,7 @@ class Meter(Widget): class Channel(Widget): DEFAULT_CSS = """ Channel { - width: 10; + width: 11; border: solid #333333; Horizontal { height: 100%; @@ -106,7 +120,8 @@ class Channel(Widget): } .controls { margin-left: 1; - width: 6 + margin-right: 1; + width: 5 } Button { margin-bottom: 1; @@ -126,22 +141,56 @@ class Channel(Widget): .inst { background: #ff0; } + .indicators-wrap { + margin-left: 1; + align: left bottom; + } + .indicators { + height: 4; + } + .peak, .rms { + color: #55aa00; + } + .level { + text-style: bold; + } } """ rms_value = var(0) - scaled_rms_value = var(0.0) + peak_value = var(0) + level_value = var(0x20000000) + db_rms_value = reactive(-100.0) + db_peak_value = reactive(-100.0) + db_level_value = reactive(0.0) RMS_FACTOR = float(2**54) + PEAK_FACTOR = float(2**23) - def compute_scaled_rms_value(self) -> float: + def compute_db_level_value(self) -> float: + return level_to_db(self.level_value) + + def compute_db_peak_value(self) -> float: try: - db = 10 * math.log10(self.rms_value / self.RMS_FACTOR) - return (db + 90) / 110 + return 20 * math.log10((self.peak_value >> 4) / self.PEAK_FACTOR) except ValueError: - return 0 + return -120 + + def compute_db_rms_value(self) -> float: + try: + return 10 * math.log10(self.rms_value / self.RMS_FACTOR) + except ValueError: + return -120 + + def watch_db_level_value(self, val) -> None: + self.query_one('.level').update(format_db(val)) + + def watch_db_peak_value(self, val) -> None: + self.query_one('.peak').update(format_db(val)) + + def watch_db_rms_value(self, val) -> None: + self.query_one('.rms').update(format_db(val)) + self.query_one(Meter).value = (val + 90) / 110 - def watch_scaled_rms_value(self, val) -> None: - self.query_one(Meter).value = val def __init__(self, label, phantom=False, inst=False, *children: Widget): super().__init__(*children) @@ -153,13 +202,19 @@ class Channel(Widget): yield Static(self.label, classes='label') with Horizontal(): yield Meter() - with Vertical(classes='controls'): - yield Button("M", classes='mute', compact=True) - yield Button("S", classes='solo', compact=True) - if self.phantom: - yield Button("48v", classes='phantom', compact=True) - if self.inst: - yield Button("Inst", classes='inst', compact=True) + with Vertical(): + with Vertical(classes='controls'): + yield Button("M", classes='mute', compact=True) + yield Button("S", classes='solo', compact=True) + if self.phantom: + yield Button("48v", classes='phantom', compact=True) + if self.inst: + yield Button("Inst", classes='inst', compact=True) + with Vertical(classes='indicators-wrap'): + with Container(classes='indicators'): + yield Static('0.0', classes='level') + yield Static('0.0', classes='peak') + yield Static('0.1', classes='rms') class ChannelRow(Widget): @@ -170,11 +225,21 @@ class ChannelRow(Widget): """ rms_values = var([]) + peak_values = var([]) + level_values = var([]) def watch_rms_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.rms_value = value + def watch_peak_values(self, new_values): + for channel, value in zip(self.query(Channel), new_values): + channel.peak_value = value + + def watch_level_values(self, new_values): + for channel, value in zip(self.query(Channel), new_values): + channel.level_value = value + class InputChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): @@ -289,6 +354,21 @@ class BFRMS: self._parse_rms(id1_msg[10:34], self.output_rms) self._parse_rms(id1_msg[34:38], self.fx_in_rms) +class BFPeak: + def __init__(self): + self.input_peaks = [0 for _ in range(10)] + self.playback_peaks = [0 for _ in range(12)] + self.fx_in_peaks = [0, 0] + self.output_peaks = [0 for _ in range(12)] + self.fx_out_peaks = [0, 0] + + def parse(self, msg): + self.input_peaks = msg[:10] + self.playback_peaks = msg[10:22] + self.fx_in_peaks = msg[22:24] + self.output_peaks = msg[24:36] + self.fx_out_peaks = msg[36:38] + class Status(Widget): status_data = reactive({}) @@ -308,7 +388,7 @@ class Mixer(App): STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10]) bf_status = BFStatus() bf_rms = BFRMS() - bf_peak = [] + bf_peak = BFPeak() def __init__(self): super().__init__() @@ -339,63 +419,42 @@ class Mixer(App): yield OutputChannelRow() yield Footer() - def process_dump(self, msg: mido.Message) -> None: - msg_bytes = msg.bytes()[1:-1] - if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]: - return # noop - decoded_msg = decode_message(msg_bytes[5:]) - subid = msg_bytes[4] - - msg_pt = str(subid) + ": " + bin_format(decoded_msg) - if subid == 16: - return - - if decoded_msg: - self.query_one('#log').write_line(msg_pt) - else: - self.query_one('#log').write_line(bin_format(msg_bytes[5:])) - def process_message(self, msg: mido.Message) -> None: - msg_bytes = msg.bytes()[1:-1] - if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]: - return # noop - decoded_msg = decode_message(msg_bytes[5:]) - subid = msg_bytes[4] - if subid == 0x00: + subid, decoded_msg = decode_message(msg) + if subid is None: + return + elif subid == 0x00: self.bf_status.parse(decoded_msg) self.bf_rms.parse0(decoded_msg) elif subid == 0x01: self.bf_rms.parse1(decoded_msg) elif subid == 0x02: - self.bf_peak = decoded_msg + self.bf_peak.parse(decoded_msg) else: self.messages.append(hex_format(decoded_msg)) self.query_one('#log').update(self.messages) + return input_row = self.query_one(InputChannelRow) input_row.rms_values = self.bf_rms.input_rms + input_row.peak_values = self.bf_peak.input_peaks input_row.mutate_reactive(ChannelRow.rms_values) + input_row.mutate_reactive(ChannelRow.peak_values) + playback_row = self.query_one(PlaybackChannelRow) playback_row.rms_values = self.bf_rms.playback_rms + playback_row.peak_values = self.bf_peak.playback_peaks playback_row.mutate_reactive(ChannelRow.rms_values) + playback_row.mutate_reactive(ChannelRow.peak_values) + output_row = self.query_one(OutputChannelRow) output_row.rms_values = self.bf_rms.output_rms + output_row.peak_values = self.bf_peak.output_peaks output_row.mutate_reactive(ChannelRow.rms_values) + output_row.mutate_reactive(ChannelRow.peak_values) #self.query_one('#status').update(self.bf_status.__dict__) -""" -else: -key = str([subid] + decoded_msg) -if key not in self.messageset: -self.messageset.add(key) -self.messages.append('{subid:X}: {msg}'.format( - subid=subid, - msg=hex_format(decoded_msg, width=8) -)) -self.query_one('#messages').update('\n'.join(self.messages)) -""" - if __name__ == '__main__': app = Mixer() app.run() diff --git a/messages.txt b/messages.txt index 0f2104d..3293c72 100644 --- a/messages.txt +++ b/messages.txt @@ -42,6 +42,10 @@ Min = 00000000000000000000 0000 0000 0000 1/2 = 00000111000100011000000000000000 Uni = 00100000000000000000000000000000 Max = 01000000000000000000000000000000 +20 bits = 0 -> 0x40000 -> 0x80000 +18 bits = 0 -> 0x10000 -> 0x20000 +10**(db/20) * 0x40000 +6.0206db = 0x80000 Phase = 11000000000000000000000000000000 2's complement - @@ -96,7 +100,7 @@ gain2 = 4: 00000000000010000000000000000001 4: 00000000000010010000000000000001 4: 00000000000001110000000000000001 -4: 00000000000100100000000000000001 +4: 00000000000100100000000000000001 unsure what these are - output gain? outputs