import math from rich.console import ConsoleOptions, Console from rich.segment import Segment from rich.style import Style from textual.app import App, ComposeResult, Widget, RenderResult from textual.color import Color from textual.containers import HorizontalScroll, Horizontal, Vertical, Container from textual.reactive import reactive, var from textual.widgets import Static, Button, Pretty, Log, Footer, Header import mido from encoding import decode_message, encode_message, hex_format, get_bitfield, level_to_db, format_db import encoding class MixerMatrix: def __init__(self, out_port: mido.ports.BaseOutput, in_channels = 10, play_channels = 12, out_channels = 12) -> None: self.out_port = out_port self.in_channels = in_channels self.in_levels = [[-120.0 for _ in range(in_channels)] for _ in range(out_channels)] self.play_levels = [[0.0 if p == o else -120.0 for p in range(play_channels)] for o in range(out_channels)] self.out_levels = [0.0 for _ in range(out_channels)] def update_out(self, channel, val): self.out_levels[channel] = val def update_in(self, in_channel, out_channel, val): self.in_levels[out_channel][in_channel] = val self._send_update(in_channel, out_channel, val) def update_play(self, play_channel, out_channel, val): self.play_levels[out_channel][play_channel] = val self._send_update(play_channel + self.in_channels, out_channel, val) def _send_out_update(self, out_channel, val): reg = 0x3e0 | out_channel msg = val << 15 | reg self.out_port.send(encode_message(encoding.SUBID_MIXER, msg)) if out_channel < 4: gain_val = int(0x3b + (val / 2**16) * 0x3b) gain_msg = gain_val << 16 | 0x10 + out_channel self.out_port.send(encode_message(encoding.SUBID_GAIN, gain_msg)) def _send_update(self, in_channel, out_channel, val): reg = out_channel * 24 + in_channel msg = val << 15 | reg self.out_port.send(encode_message(encoding.SUBID_MIXER, msg)) class MeterGradient: FRACTIONS = [ ' ', '▁', '▂', '▃', '▄', '▅', '▆', '▇', '█', ] def __init__(self, value: int, color1: str, color2: str) -> None: self._color1 = Color.parse(color1) self._color2 = Color.parse(color2) self._value = 1 - value def __rich_console__( self, console: Console, options: ConsoleOptions ) -> RenderResult: width = options.max_width height = options.height or options.max_height color1 = self._color1 color2 = self._color2 from_color = Style.from_color blend = color1.blend rich_color1 = color1.rich_color top_char = self.FRACTIONS[math.floor((self._value * height * 8) % 8)] top = math.ceil(self._value * height) for y in range(height): rel_y = y / (height - 1) if y < top: yield Segment(f"{width * ' '}\n") else: line_color = from_color( ( blend(color2, rel_y).rich_color if height > 1 else rich_color1 ), None, ) char = top_char if y == top else '█' yield Segment(f"{width * char}\n", line_color) class Meter(Widget): DEFAULT_CSS = """ Meter { width: 1 } """ value = reactive(1.0) def render(self) -> RenderResult: return MeterGradient(self.value, '#04ff00', '#04aa00') class Channel(Widget): DEFAULT_CSS = """ Channel { width: 11; border: solid #333333; Horizontal { height: 100%; } .label { width: 100%; text-align: center; } .controls { margin-left: 1; margin-right: 1; width: 5 } Button { margin-bottom: 1; padding: 0; min-width: 6; height: 1; } .mute { background: #00f !important; } .solo { background: #0ff; } .phantom { background: #f00; } .inst { background: #ff0; } .indicators-wrap { margin-left: 1; align: left bottom; } .indicators { height: 4; } .peak, .rms { color: #55aa00; } .level { text-style: bold; } } """ rms_value = var(0) peak_value = var(0) level_value = var(0x20000000) db_rms_value = reactive(-100.0) db_peak_value = reactive(-100.0) db_level_value = reactive(0.0) RMS_FACTOR = float(2**54) PEAK_FACTOR = float(2**23) def compute_db_level_value(self) -> float: return level_to_db(self.level_value) def compute_db_peak_value(self) -> float: try: return 20 * math.log10((self.peak_value >> 4) / self.PEAK_FACTOR) except ValueError: return -120 def compute_db_rms_value(self) -> float: try: return 10 * math.log10(self.rms_value / self.RMS_FACTOR) except ValueError: return -120 def watch_db_level_value(self, val) -> None: self.query_one('.level').update(format_db(val)) def watch_db_peak_value(self, val) -> None: self.query_one('.peak').update(format_db(val)) def watch_db_rms_value(self, val) -> None: self.query_one('.rms').update(format_db(val)) self.query_one(Meter).value = (val + 90) / 110 def __init__(self, label, phantom=False, inst=False, *children: Widget): super().__init__(*children) self.label = label self.phantom = phantom self.inst = inst def compose(self) -> ComposeResult: yield Static(self.label, classes='label') with Horizontal(): yield Meter() with Vertical(): with Vertical(classes='controls'): yield Button("M", classes='mute', compact=True) yield Button("S", classes='solo', compact=True) if self.phantom: yield Button("48v", classes='phantom', compact=True) if self.inst: yield Button("Inst", classes='inst', compact=True) with Vertical(classes='indicators-wrap'): with Container(classes='indicators'): yield Static('0.0', classes='level') yield Static('0.0', classes='peak') yield Static('0.1', classes='rms') class ChannelRow(Widget): DEFAULT_CSS = """ ChannelRow { height: 33% } """ rms_values = var([]) peak_values = var([]) level_values = var([]) def watch_rms_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.rms_value = value def watch_peak_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.peak_value = value def watch_level_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.level_value = value class InputChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel('AN1', phantom=True) yield Channel('AN2', phantom=True, inst=True) yield Channel('ADAT1') yield Channel('ADAT2') yield Channel('ADAT3') yield Channel('ADAT4') yield Channel('ADAT5') yield Channel('ADAT6') yield Channel('ADAT7') yield Channel('ADAT8') class PlaybackChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel('AN1') yield Channel('AN2') yield Channel('PH3') yield Channel('PH4') yield Channel('ADAT1') yield Channel('ADAT2') yield Channel('ADAT3') yield Channel('ADAT4') yield Channel('ADAT5') yield Channel('ADAT6') yield Channel('ADAT7') yield Channel('ADAT8') class OutputChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel('AN1') yield Channel('AN2') yield Channel('PH3') yield Channel('PH4') yield Channel('ADAT1') yield Channel('ADAT2') yield Channel('ADAT3') yield Channel('ADAT4') yield Channel('ADAT5') yield Channel('ADAT6') yield Channel('ADAT7') yield Channel('ADAT8') class BFStatus: word0 = 0 word1 = 0 word2 = 0 # word 0 ch1_48v = 0 ch2_48v = 0 ch2_inst = 0 digital_out = 0 # 0 = ADAT, 1 = SPDIF # word 1 out_1_vol = 0 out_2_vol = 0 ch1_gain = 0 ch2_gain = 0 # word 2 out_3_vol = 0 out_4_vol = 0 dim = 0 def parse(self, msg): self.word0 = format(msg[0], '032b') if len(msg) > 2: self.word1 = format(msg[1], '032b') self.word2 = format(msg[2], '032b') self.ch1_48v = get_bitfield(msg[0], 31, 1) self.ch2_48v = get_bitfield(msg[0], 30, 1) self.ch2_inst = get_bitfield(msg[0], 29, 1) self.digital_out = get_bitfield(msg[0], 28, 1) self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8)) self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8)) self.ch2_gain = get_bitfield(msg[1], 24, 5) self.ch1_gain = get_bitfield(msg[1], 19, 5) self.out_3_vol = level_to_db(get_bitfield(msg[2], 15, 8)) self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8)) self.dim = get_bitfield(msg[2], 5, 1) class BFRMS: def __init__(self): self.input_rms = [0 for _ in range(10)] self.playback_rms = [0 for _ in range(12)] self.fx_in_rms = [0, 0] self.output_rms = [0 for _ in range(12)] self.fx_out_rms = [0, 0] def _parse_rms(self, values, target, offset=0): n = offset for i in range(0, len(values), 2): target[n] = (values[i] | (values[i + 1] << 32)) n += 1 def parse0(self, id0_msg): self._parse_rms(id0_msg[3:23], self.input_rms) self._parse_rms(id0_msg[23:], self.playback_rms) def parse1(self, id1_msg): self._parse_rms(id1_msg[:6], self.playback_rms, 9) self._parse_rms(id1_msg[6:10], self.fx_in_rms) self._parse_rms(id1_msg[10:34], self.output_rms) self._parse_rms(id1_msg[34:38], self.fx_in_rms) class BFPeak: def __init__(self): self.input_peaks = [0 for _ in range(10)] self.playback_peaks = [0 for _ in range(12)] self.fx_in_peaks = [0, 0] self.output_peaks = [0 for _ in range(12)] self.fx_out_peaks = [0, 0] def parse(self, msg): self.input_peaks = msg[:10] self.playback_peaks = msg[10:22] self.fx_in_peaks = msg[22:24] self.output_peaks = msg[24:36] self.fx_out_peaks = msg[36:38] class Status(Widget): status_data = reactive({}) def compose(self) -> ComposeResult: yield Pretty(self.status_data, id='status') yield Pretty(self.status_data, id='rms') yield Pretty(self.status_data, id='peak') yield Log(self.status_data, id='log') class Mixer(App): in_port: mido.ports.BaseInput out_port: mido.ports.BaseInput messages = [] messageset = set() STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10]) bf_status = BFStatus() bf_rms = BFRMS() bf_peak = BFPeak() def __init__(self): super().__init__() in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name] out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name] if in_ports: self.in_port = mido.open_input(in_ports[0]) else: self.in_port = mido.open_input('Mixer', virtual=True) if out_ports: self.out_port = mido.open_output(out_ports[0]) else: self.out_port = mido.open_output('Mixer', virtual=True) def on_mount(self) -> None: self.set_interval(.1, self.update_messages) def update_messages(self) -> None: self.out_port.send(self.STATUS_MSG) for msg in self.in_port.iter_pending(): self.process_message(msg) def compose(self) -> ComposeResult: yield Header() yield InputChannelRow() yield PlaybackChannelRow() yield OutputChannelRow() yield Footer() def process_message(self, msg: mido.Message) -> None: subid, decoded_msg = decode_message(msg) if subid is None: return elif subid == 0x00: self.bf_status.parse(decoded_msg) self.bf_rms.parse0(decoded_msg) elif subid == 0x01: self.bf_rms.parse1(decoded_msg) elif subid == 0x02: self.bf_peak.parse(decoded_msg) else: self.messages.append(hex_format(decoded_msg)) self.query_one('#log').update(self.messages) return input_row = self.query_one(InputChannelRow) input_row.rms_values = self.bf_rms.input_rms input_row.peak_values = self.bf_peak.input_peaks input_row.mutate_reactive(ChannelRow.rms_values) input_row.mutate_reactive(ChannelRow.peak_values) playback_row = self.query_one(PlaybackChannelRow) playback_row.rms_values = self.bf_rms.playback_rms playback_row.peak_values = self.bf_peak.playback_peaks playback_row.mutate_reactive(ChannelRow.rms_values) playback_row.mutate_reactive(ChannelRow.peak_values) output_row = self.query_one(OutputChannelRow) output_row.rms_values = self.bf_rms.output_rms output_row.peak_values = self.bf_peak.output_peaks output_row.mutate_reactive(ChannelRow.rms_values) output_row.mutate_reactive(ChannelRow.peak_values) #self.query_one('#status').update(self.bf_status.__dict__) if __name__ == '__main__': app = Mixer() app.run()