Initial commit

This commit is contained in:
Brian Hrebec 2026-03-06 22:58:36 -06:00
commit 5294df34c7

394
main.py Normal file
View file

@ -0,0 +1,394 @@
from rich.console import ConsoleOptions, Console
from rich.segment import Segment
from rich.style import Style
from textual.app import App, ComposeResult, Widget, RenderResult
from textual.color import Color
from textual.containers import HorizontalScroll, Horizontal, Vertical
from textual.reactive import reactive, var
from textual.widgets import Static, Button, Pretty, Log, Footer, Header
import mido
import sys
def decode_message(msg_bytes):
result_words = []
try:
chunks = zip(*[iter(msg_bytes)] * 5, strict=True)
for packed in chunks:
value = packed[4] << 28 | packed[3] << 21 | packed[2] << 14 | packed[1] << 7 | packed[0]
result_words.append(value)
except ValueError:
pass # invalid message
return result_words
def hex_format(msg, width=2):
return ' '.join([("{x:0" + str(width) + "x}").format(x=x) for x in msg])
def bin_format(msg, width=32):
return ' '.join([("{x:0" + str(width) + "b}").format(x=x) for x in msg])
def get_bitfield(word, position, length):
return (word >> position) & ((1 << length) - 1)
def set_bitfield(word, val, position, length):
mask = ((1 << length) - 1)
val = (val & mask) << position
return (word & mask << position) | val
def level_to_db(val):
return 6 + (val - 0xFF) / 2
class MeterGradient:
def __init__(self, value: int, color1: str, color2: str) -> None:
self._color1 = Color.parse(color1)
self._color2 = Color.parse(color2)
self._value = 1 - value
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
width = options.max_width
height = options.height or options.max_height
color1 = self._color1
color2 = self._color2
default_color = Color(0, 0, 0).rich_color
from_color = Style.from_color
blend = color1.blend
rich_color1 = color1.rich_color
for y in range(height):
rel_y = y / (height - 1)
if rel_y < self._value:
yield Segment(f"{width * ' '}\n")
else:
line_color = from_color(
default_color,
(
blend(color2, rel_y).rich_color
if height > 1
else rich_color1
),
)
yield Segment(f"{width * ' '}\n", line_color)
class Meter(Widget):
DEFAULT_CSS = """
Meter {
width: 1
}
"""
value = reactive(0)
def render(self) -> RenderResult:
return MeterGradient(self.value, '#04ff00', '#04aa00')
class Channel(Widget):
DEFAULT_CSS = """
Channel {
width: 10;
border: solid #333333;
Horizontal {
height: 100%;
}
.label {
width: 100%;
text-align: center;
}
.controls {
margin-left: 1;
width: 6
}
Button {
margin-bottom: 1;
padding: 0;
min-width: 6;
height: 1;
}
.mute {
background: #00f !important;
}
.solo {
background: #0ff;
}
.phantom {
background: #f00;
}
.inst {
background: #ff0;
}
}
"""
rms_value = var(0)
scaled_rms_value = var(0)
MAX_RMS = 2**64
def compute_scaled_rms_value(self) -> float:
return self.rms_value / self.MAX_RMS
def watch_scaled_rms_value(self, val) -> None:
self.query_one(Meter).value = val
def __init__(self, label, phantom=False, inst=False, *children: Widget):
super().__init__(*children)
self.label = label
self.phantom = phantom
self.inst = inst
def compose(self) -> ComposeResult:
yield Static(self.label, classes='label')
with Horizontal():
yield Meter()
with Vertical(classes='controls'):
yield Button("M", classes='mute', compact=True)
yield Button("S", classes='solo', compact=True)
if self.phantom:
yield Button("48v", classes='phantom', compact=True)
if self.inst:
yield Button("Inst", classes='inst', compact=True)
class ChannelRow(Widget):
DEFAULT_CSS = """
ChannelRow {
height: 33%
}
"""
rms_values = var([])
def watch_rms_values(self, old_values, new_values):
print('watch_rms_values', new_values)
for channel, value in zip(self.query(Channel), new_values):
channel.rms_value = value
class InputChannelRow(ChannelRow):
def compose(self) -> ComposeResult:
with HorizontalScroll():
yield Channel('AN1', phantom=True)
yield Channel('AN2', phantom=True, inst=True)
yield Channel('ADAT1')
yield Channel('ADAT2')
yield Channel('ADAT3')
yield Channel('ADAT4')
yield Channel('ADAT5')
yield Channel('ADAT6')
yield Channel('ADAT7')
yield Channel('ADAT8')
class PlaybackChannelRow(ChannelRow):
def compose(self) -> ComposeResult:
with HorizontalScroll():
yield Channel('AN1')
yield Channel('AN2')
yield Channel('PH3')
yield Channel('PH4')
yield Channel('ADAT1')
yield Channel('ADAT2')
yield Channel('ADAT3')
yield Channel('ADAT4')
yield Channel('ADAT5')
yield Channel('ADAT6')
yield Channel('ADAT7')
yield Channel('ADAT8')
class OutputChannelRow(ChannelRow):
def compose(self) -> ComposeResult:
with HorizontalScroll():
yield Channel('AN1')
yield Channel('AN2')
yield Channel('PH3')
yield Channel('PH4')
yield Channel('ADAT1')
yield Channel('ADAT2')
yield Channel('ADAT3')
yield Channel('ADAT4')
yield Channel('ADAT5')
yield Channel('ADAT6')
yield Channel('ADAT7')
yield Channel('ADAT8')
class BFStatus:
word0 = 0
word1 = 0
word2 = 0
# word 0
ch1_48v = 0
ch2_48v = 0
ch2_inst = 0
digital_out = 0 # 0 = ADAT, 1 = SPDIF
# word 1
out_1_vol = 0
out_2_vol = 0
ch1_gain = 0
ch2_gain = 0
# word 2
out_3_vol = 0
out_4_vol = 0
dim = 0
def parse(self, msg):
self.word0 = format(msg[0], '032b')
if len(msg) > 2:
self.word1 = format(msg[1], '032b')
self.word2 = format(msg[2], '032b')
self.ch1_48v = get_bitfield(msg[0], 31, 1)
self.ch2_48v = get_bitfield(msg[0], 30, 1)
self.ch2_inst = get_bitfield(msg[0], 29, 1)
self.digital_out = get_bitfield(msg[0], 28, 1)
self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8))
self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8))
self.ch2_gain = get_bitfield(msg[1], 24, 5)
self.ch1_gain = get_bitfield(msg[1], 19, 5)
self.out_3_vol = level_to_db(get_bitfield(msg[2], 15, 8))
self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8))
self.dim = get_bitfield(msg[2], 5, 1)
class BFRMS:
def __init__(self):
self.input_rms = [0 for _ in range(10)]
self.playback_rms = [0 for _ in range(12)]
self.fx_in_rms = [0, 0]
self.output_rms = [0 for _ in range(12)]
self.fx_out_rms = [0, 0]
def _parse_rms(self, values, target, offset=0):
n = offset
for i in range(0, len(values), 2):
target[n] = (values[i] | (values[i + 1] << 32))
n += 1
def parse0(self, id0_msg):
self._parse_rms(id0_msg[3:23], self.input_rms)
self._parse_rms(id0_msg[23:], self.playback_rms)
def parse1(self, id1_msg):
self._parse_rms(id1_msg[:6], self.playback_rms, 9)
self._parse_rms(id1_msg[6:10], self.fx_in_rms)
self._parse_rms(id1_msg[10:34], self.output_rms)
self._parse_rms(id1_msg[34:38], self.fx_in_rms)
class Status(Widget):
status_data = reactive({})
def compose(self) -> ComposeResult:
yield Pretty(self.status_data, id='status')
yield Pretty(self.status_data, id='rms')
yield Pretty(self.status_data, id='peak')
yield Log(self.status_data, id='log')
class Mixer(App):
in_port: mido.ports.BaseInput
out_port: mido.ports.BaseInput
messages = []
messageset = set()
STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10])
bf_status = BFStatus()
bf_rms = BFRMS()
bf_peak = []
def __init__(self):
super().__init__()
in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name]
out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name]
if in_ports:
self.in_port = mido.open_input(in_ports[0])
else:
self.in_port = mido.open_input('Mixer', virtual=True)
if out_ports:
self.out_port = mido.open_output(out_ports[0])
else:
self.out_port = mido.open_output('Mixer', virtual=True)
def on_mount(self) -> None:
self.set_interval(.5, self.update_messages)
def update_messages(self) -> None:
self.out_port.send(self.STATUS_MSG)
for msg in self.in_port.iter_pending():
self.process_message(msg)
def compose(self) -> ComposeResult:
yield Header()
yield InputChannelRow()
yield PlaybackChannelRow()
yield OutputChannelRow()
yield Footer()
def process_dump(self, msg: mido.Message) -> None:
msg_bytes = msg.bytes()[1:-1]
if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]:
return # noop
decoded_msg = decode_message(msg_bytes[5:])
subid = msg_bytes[4]
msg_pt = str(subid) + ": " + bin_format(decoded_msg)
if subid == 16:
return
if decoded_msg:
self.query_one('#log').write_line(msg_pt)
else:
self.query_one('#log').write_line(bin_format(msg_bytes[5:]))
def process_message(self, msg: mido.Message) -> None:
msg_bytes = msg.bytes()[1:-1]
if msg_bytes[:4] != [0x00, 0x20, 0x0D, 0x10]:
return # noop
decoded_msg = decode_message(msg_bytes[5:])
subid = msg_bytes[4]
if subid == 0x00:
self.bf_status.parse(decoded_msg)
self.bf_rms.parse0(decoded_msg)
elif subid == 0x01:
self.bf_rms.parse1(decoded_msg)
elif subid == 0x02:
self.bf_peak = decoded_msg
else:
self.messages.append(hex_format(decoded_msg))
self.query_one('#log').update(self.messages)
print('update')
input_row = self.query_one(InputChannelRow)
input_row.rms_values = self.bf_rms.input_rms
playback_row = self.query_one(PlaybackChannelRow)
playback_row.rms_values = self.bf_rms.playback_rms
output_row = self.query_one(OutputChannelRow)
output_row.rms_values = self.bf_rms.output_rms
#self.query_one('#status').update(self.bf_status.__dict__)
"""
else:
key = str([subid] + decoded_msg)
if key not in self.messageset:
self.messageset.add(key)
self.messages.append('{subid:X}: {msg}'.format(
subid=subid,
msg=hex_format(decoded_msg, width=8)
))
self.query_one('#messages').update('\n'.join(self.messages))
"""
if __name__ == '__main__':
app = Mixer()
app.run()