import math from collections import namedtuple from rich.console import ConsoleOptions, Console from rich.segment import Segment from rich.style import Style from textual import events from textual.app import App, ComposeResult, Widget, RenderResult from textual.binding import Binding from textual.color import Color from textual.containers import HorizontalScroll, Horizontal, Vertical, Container from textual.events import Event, MouseEvent from textual.geometry import Size from textual.reactive import reactive, var from textual.strip import Strip from textual.widgets import Static, Button, Pretty, Log, Footer, Header from textual.messages import Message import mido from encoding import decode_message, encode_message, hex_format, get_bitfield, level_to_db, format_db, clamp import encoding ChannelState = namedtuple('ChannelState', ['level', 'muted', 'soloed']) class MixerMatrix: def __init__(self, out_port: mido.ports.BaseOutput, in_channels=10, play_channels=12, out_channels=12) -> None: self.out_port = out_port self.matrix = [[ -120.0 for _ in range(out_channels) ] for _ in range(in_channels)] + [ [ 0.0 if p == o else -120.0 for p in range(play_channels) ] for o in range(out_channels)] self.outputs = [0.0 for _ in range(out_channels)] self.output_mutes = [False for _ in range(out_channels)] self.output_solos = [False for _ in range(out_channels)] self.input_mutes = [False for _ in range(in_channels + play_channels)] self.input_solos = [False for _ in range(in_channels + play_channels)] def toggle_out_solo_status(self, channel): solo = self.output_solos[channel] = not self.output_solos[channel] if solo or any(self.output_solos): for channel, channel_solo in enumerate(self.output_solos): if channel_solo: self._send_out_update(channel, self.outputs[channel]) else: self._send_out_update(channel, -120) else: for channel, level in enumerate(self.outputs): if not self.output_mutes[channel]: self._send_out_update(channel, level) def toggle_out_mute_status(self, channel): mute = self.output_mutes[channel] = not self.output_mutes[channel] disabled = self.outputs[channel] == -120.0 soloed = self.output_solos[channel] if mute and not disabled and not soloed: self._send_out_update(channel, -120) elif not mute and not disabled: self._send_out_update(channel, self.outputs[channel]) def set_out_level(self, channel, val): self.outputs[channel] = clamp(val, -120, 6) if not self.output_mutes[channel]: self._send_out_update(channel, self.outputs[channel]) def toggle_mix_mute_status(self, in_channel): mute = self.input_mutes[in_channel] = not self.input_mutes[in_channel] for out_channel, level in enumerate(self.matrix[in_channel]): if mute and level != -120: self._send_mix_update(in_channel, out_channel, -120) else: self._send_mix_update(in_channel, out_channel, level) def set_mix_level(self, in_channel, out_channel, val): self.matrix[in_channel][out_channel] = clamp(val, -120, 6) if not self.input_mutes[in_channel]: self._send_mix_update(in_channel, out_channel, val) def _send_out_update(self, out_channel, db_val): reg = 0x3e0 | out_channel msg = encoding.db_to_fader(db_val) << 12 | reg self.out_port.send(encode_message(encoding.SUBID_MIXER, [msg])) if out_channel < 4: gain_val = encoding.db_to_level(clamp(db_val, -92, 6)) gain_msg = gain_val << 16 | 0x2 + out_channel self.out_port.send(encode_message(encoding.SUBID_GAIN, [gain_msg])) def _send_mix_update(self, in_channel, out_channel, db_val): reg = out_channel * 24 + in_channel msg = encoding.db_to_fader(db_val) << 12 | reg self.out_port.send(encode_message(encoding.SUBID_MIXER, [msg])) class FaderIndicator(Widget): DEFAULT_CSS = """ FaderIndicator { width: 1; } """ value = reactive(0.925) def render_line(self, y: int): if y == math.floor((self.region.height - 1) * (1 - self.value)): c = '<' else: c = ' ' return Strip((Segment(c),), 1) def get_content_width(self, container: Size, viewport: Size) -> int: return 1 class MeterGradient: FRACTIONS = [ ' ', '▁', '▂', '▃', '▄', '▅', '▆', '▇', '█', ] def __init__(self, value: int, color1: str, color2: str) -> None: self._color1 = Color.parse(color1) self._color2 = Color.parse(color2) self._value = 1 - value def __rich_console__( self, console: Console, options: ConsoleOptions ) -> RenderResult: width = options.max_width height = options.height or options.max_height color1 = self._color1 color2 = self._color2 from_color = Style.from_color blend = color1.blend rich_color1 = color1.rich_color top_char = self.FRACTIONS[math.floor((self._value * height * 8) % 8)] top = math.ceil(self._value * height) for y in range(height): rel_y = y / (height - 1) if y < top: yield Segment(f"{width * ' '}\n") else: line_color = from_color( ( blend(color2, rel_y).rich_color if height > 1 else rich_color1 ), None, ) char = top_char if y == top else '█' yield Segment(f"{width * char}\n", line_color) class Meter(Widget): DEFAULT_CSS = """ Meter { width: 1 } """ value = reactive(1.0) def render(self) -> RenderResult: return MeterGradient(self.value, '#04ff00', '#04aa00') class Channel(Widget): DEFAULT_CSS = """ Channel { width: 12; border: solid #333333; Horizontal { height: 100%; } .label { width: 100%; text-align: center; } .controls { margin-left: 1; margin-right: 1; width: 5 } Button { margin-bottom: 1; padding: 0; min-width: 6; height: 1; } .mute.enabled { background: #00f !important; } .solo.enabled { background: #0ff !important; } .phantom.enabled { background: #f00 !important; } .inst.enabled { background: #ff0 !important; } .indicators-wrap { margin-left: 1; align: left bottom; } .indicators { height: 4; } .peak, .rms { color: #55aa00; } .level { text-style: bold; } } Channel.selected { border: #fff; } Channel:focus { background: #222; border: #fff; } """ BINDINGS = [ ('equals', 'move_fader(.5)'), ('plus', 'move_fader(.5)'), ('minus', 'move_fader(-.5)'), Binding('m', 'mute', "Mute", show=True), Binding('s', 'solo', "Solo", show=True), Binding('p', 'phantom', "48v", show=True), Binding('i', 'inst', "Inst.", show=True), Binding('q', 'gain(-3)', "+gain", show=True), Binding('e', 'gain(3)', "-gain", show=True), ] can_focus = True selected = reactive(False) mute = reactive(False) solo = reactive(False) phantom = reactive(False) inst = reactive(False) gain = reactive(0.0) level = reactive(0.0) rms_value = var(0) peak_value = var(0) db_rms_value = reactive(-100.0) db_peak_value = reactive(-100.0) RMS_FACTOR = float(2 ** 54) PEAK_FACTOR = float(2 ** 23) class MoveFader(Message): def __init__(self, channel: int, output=False, change=0): self.channel = channel self.output = output self.change = change super().__init__() class SetFader(Message): def __init__(self, channel: int, output=False, val=0): self.channel = channel self.output = output self.val = val super().__init__() class ChangeGain(MoveFader): pass class ChannelMessage(Message): def __init__(self, channel: int, output=False): self.channel = channel self.output = output super().__init__() class MuteToggle(ChannelMessage): pass class SoloToggle(ChannelMessage): pass class PhantomToggle(ChannelMessage): pass class InstToggle(ChannelMessage): pass class LevelSet(Message): def __init__(self, channel: int, val: int, output=False): self.channel = channel self.val = val self.output = output super().__init__() def __init__(self, channel: int, label: str, phantom=False, inst=False, output=False, *children: Widget): super().__init__(*children) self.channel = channel self.label = label self.phantom_available = phantom self.inst_available = inst self.output = output self.mouse_moved = False def compute_db_peak_value(self) -> float: try: return 20 * math.log10((self.peak_value >> 4) / self.PEAK_FACTOR) except ValueError: return -120 def compute_db_rms_value(self) -> float: try: return 10 * math.log10(self.rms_value / self.RMS_FACTOR) except ValueError: return -120 def watch_level(self, val) -> None: self.query_one('.level').update(format_db(val)) self.query_one('#level-indicator').value = encoding.map_range(val, -75, 6, 0, 1) def watch_gain(self, val) -> None: if not self.phantom_available: return self.query_one('#gain').update(format_db(val)) def watch_mute(self, val) -> None: if val: self.query_one('#mute').add_class('enabled') else: self.query_one('#mute').remove_class('enabled') def watch_solo(self, val) -> None: if val: self.query_one('#solo').add_class('enabled') else: self.query_one('#solo').remove_class('enabled') def watch_phantom(self, val) -> None: if not self.phantom_available: return if val: self.query_one('#phantom').add_class('enabled') else: self.query_one('#phantom').remove_class('enabled') def watch_inst(self, val) -> None: if not self.inst_available: return if val: self.query_one('#inst').add_class('enabled') else: self.query_one('#inst').remove_class('enabled') def watch_selected(self, val) -> None: if val: self.add_class('selected') else: self.remove_class('selected') def watch_db_peak_value(self, val) -> None: self.query_one('.peak').update(format_db(val)) def watch_db_rms_value(self, val) -> None: self.query_one('.rms').update(format_db(val)) self.query_one(Meter).value = encoding.map_range(val, -75, 6, 0, 1) def compose(self) -> ComposeResult: yield Static(self.label, classes='label') with Horizontal(): yield Meter(id='meter') yield FaderIndicator(id='level-indicator') with Vertical(): with Vertical(classes='controls'): yield Button("M", classes='mute', compact=True, id='mute') yield Button("S", classes='solo', compact=True, id='solo') if self.phantom_available: yield Button("48v", classes='phantom', compact=True, id='phantom') if self.inst_available: yield Button("Inst", classes='inst', compact=True, id='inst') if self.phantom_available: yield Static('0.0', id='gain') with Vertical(classes='indicators-wrap'): with Container(classes='indicators'): yield Static('0.0', classes='level') yield Static('0.0', classes='peak') yield Static('0.1', classes='rms') def action_mute(self) -> None: self.post_message(self.MuteToggle(self.channel, output=self.output)) def action_solo(self) -> None: self.post_message(self.SoloToggle(self.channel, output=self.output)) def action_phantom(self) -> None: if not self.phantom_available: return self.post_message(self.PhantomToggle(self.channel, output=self.output)) def action_inst(self) -> None: if not self.inst_available: return self.post_message(self.InstToggle(self.channel, output=self.output)) def action_move_fader(self, change) -> None: self.post_message(self.MoveFader(self.channel, output=self.output, change=change)) def action_gain(self, change) -> None: self.post_message(self.ChangeGain(self.channel, output=self.output, change=change)) def _on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None: if event.shift and self.phantom_available: self.action_gain(3) else: self.action_move_fader(1) def _on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None: if event.shift and self.phantom_available: self.action_gain(-3) else: self.action_move_fader(-1) def _on_mouse_down(self, event: events.MouseDown) -> None: pass def _on_mouse_up(self, event: events.MouseUp) -> None: pass def _set_level_from_mouse_event(self, event: events.MouseEvent): position = 1 - (event.pointer_y / (event.widget.region.height - 1)) db = encoding.map_range(position, 0, 1, -75, 6) self.post_message(self.SetFader(self.channel, output=self.output, val=db)) def _on_mouse_move(self, event: events.MouseMove) -> None: if event.button == 1 and event.widget == self.query_one('#level-indicator'): self._set_level_from_mouse_event(event) self.mouse_moved = True def _on_click(self, event: events.Click) -> None: if not self.mouse_moved and event.button == 1 and event.widget == self.query_one('#level-indicator'): self._set_level_from_mouse_event(event) self.mouse_moved = False def on_button_pressed(self, event): if 'mute' == event.button.id: self.action_mute() elif 'solo' == event.button.id: self.action_solo() elif 'phantom' == event.button.id: self.action_phantom() elif 'inst' == event.button.id: self.action_inst() class ChannelRow(Widget): DEFAULT_CSS = """ ChannelRow { height: 33% } """ rms_values = var([]) peak_values = var([]) level_values = var([]) def __init__(self, channels: range, *children: Widget): super().__init__(*children) self.channels = channels def watch_rms_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.rms_value = value def watch_peak_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.peak_value = value def watch_level_values(self, new_values): for channel, value in zip(self.query(Channel), new_values): channel.level_value = value def update_mixer_display(self, mixer: MixerMatrix, current_out: int): pass class InputChannelRow(ChannelRow): def compose(self) -> ComposeResult: with HorizontalScroll(): yield Channel(self.channels[0], 'AN1', phantom=True) yield Channel(self.channels[1], 'AN2', phantom=True, inst=True) for n in range(0, 8): yield Channel(channel=self.channels[n+2], label='ADAT{n}'.format(n=n+1)) def update_mixer_display(self, mixer: MixerMatrix, current_out: int): for channel_widget, levels, mute, solo in zip( self.query(Channel), mixer.matrix[:self.channels[-1]], mixer.input_mutes[:self.channels[-1]], mixer.input_solos[:self.channels[-1]]): channel_widget.level = levels[current_out] channel_widget.mute = mute channel_widget.solo = solo class PlaybackChannelRow(ChannelRow): LABELS = [ 'AN1', 'AN2', 'PH3', 'PH4', 'ADAT1', 'ADAT2', 'ADAT3', 'ADAT4', 'ADAT5', 'ADAT6', 'ADAT7', 'ADAT8', ] def compose(self) -> ComposeResult: with HorizontalScroll(): for n, label in zip(self.channels, self.LABELS): yield Channel(channel=n, label=label) def update_mixer_display(self, mixer: MixerMatrix, current_out: int): for channel_widget, levels, mute, solo in zip( self.query(Channel), mixer.matrix[self.channels[0]:self.channels[-1]], mixer.input_mutes[self.channels[0]:self.channels[-1]], mixer.input_solos[self.channels[0]:self.channels[-1]]): channel_widget.level = levels[current_out] channel_widget.mute = mute channel_widget.solo = solo class OutputChannelRow(ChannelRow): LABELS = [ 'AN1', 'AN2', 'PH3', 'PH4', 'ADAT1', 'ADAT2', 'ADAT3', 'ADAT4', 'ADAT5', 'ADAT6', 'ADAT7', 'ADAT8', ] def compose(self) -> ComposeResult: with HorizontalScroll(): with HorizontalScroll(): for n, label in zip(self.channels, self.LABELS): yield Channel(channel=n, label=label, output=True) def update_mixer_display(self, mixer: MixerMatrix, current_out: int): for idx, channel_widget, level, mute, solo in zip( self.channels, self.query(Channel), mixer.outputs, mixer.output_mutes, mixer.output_solos): channel_widget.level = level channel_widget.mute = mute channel_widget.solo = solo channel_widget.selected = idx == current_out class BFStatus: word0 = 0 word1 = 0 word2 = 0 # word 0 ch1_48v = 0 ch2_48v = 0 ch2_inst = 0 digital_out = 0 # 0 = ADAT, 1 = SPDIF # word 1 out_1_vol = 0 out_2_vol = 0 ch1_gain = 0 ch2_gain = 0 # word 2 out_3_vol = 0 out_4_vol = 0 dim = 0 def parse(self, msg): self.word0 = format(msg[0], '032b') if len(msg) > 2: self.word1 = format(msg[1], '032b') self.word2 = format(msg[2], '032b') self.ch1_48v = get_bitfield(msg[0], 30, 1) self.ch2_48v = get_bitfield(msg[0], 31, 1) self.ch2_inst = get_bitfield(msg[0], 29, 1) self.digital_out = get_bitfield(msg[0], 28, 1) self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8)) self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8)) self.ch2_gain = get_bitfield(msg[1], 24, 5) * 3 + 6 self.ch1_gain = get_bitfield(msg[1], 19, 5) * 3 + 6 self.out_3_vol = level_to_db(get_bitfield(msg[2], 14, 8)) self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8)) self.dim = get_bitfield(msg[2], 5, 1) def out_levels(self): return ( self.out_1_vol, self.out_2_vol, self.out_3_vol, self.out_4_vol, ) class BFRMS: def __init__(self): self.input_rms = [0 for _ in range(10)] self.playback_rms = [0 for _ in range(12)] self.fx_in_rms = [0, 0] self.output_rms = [0 for _ in range(12)] self.fx_out_rms = [0, 0] def _parse_rms(self, values, target, offset=0): n = offset for i in range(0, len(values), 2): target[n] = (values[i] | (values[i + 1] << 32)) n += 1 def parse0(self, id0_msg): self._parse_rms(id0_msg[3:23], self.input_rms) self._parse_rms(id0_msg[23:], self.playback_rms) def parse1(self, id1_msg): self._parse_rms(id1_msg[:6], self.playback_rms, 9) self._parse_rms(id1_msg[6:10], self.fx_in_rms) self._parse_rms(id1_msg[10:34], self.output_rms) self._parse_rms(id1_msg[34:38], self.fx_in_rms) class BFPeak: def __init__(self): self.input_peaks = [0 for _ in range(10)] self.playback_peaks = [0 for _ in range(12)] self.fx_in_peaks = [0, 0] self.output_peaks = [0 for _ in range(12)] self.fx_out_peaks = [0, 0] def parse(self, msg): self.input_peaks = msg[:10] self.playback_peaks = msg[10:22] self.fx_in_peaks = msg[22:24] self.output_peaks = msg[24:36] self.fx_out_peaks = msg[36:38] class Status(Widget): status_data = reactive({}) def compose(self) -> ComposeResult: yield Pretty(self.status_data, id='status') yield Pretty(self.status_data, id='rms') yield Pretty(self.status_data, id='peak') yield Log(self.status_data, id='log') class Mixer(App): in_port: mido.ports.BaseInput out_port: mido.ports.BaseOutput messages = [] messageset = set() STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10]) bf_status = BFStatus() bf_rms = BFRMS() bf_peak = BFPeak() current_mix_out = 0 focused_channel = (0, 2) INPUTS = 10 PLAYBACKS = 12 OUTPUTS = 12 BINDINGS = [ ('o', 'mix_select(1)'), ('u', 'mix_select(-1)'), ('h', 'change_focus(-1, 0)'), ('j', 'change_focus(0, 1)'), ('k', 'change_focus(0, -1)'), ('l', 'change_focus(1, 0)'), ] def __init__(self): super().__init__() in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name] out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name] if in_ports: self.in_port = mido.open_input(in_ports[0]) else: self.in_port = mido.open_input('Mixer', virtual=True) if out_ports: self.out_port = mido.open_output(out_ports[0]) else: self.out_port = mido.open_output('Mixer', virtual=True) self.mixer = MixerMatrix(self.out_port) def on_mount(self) -> None: self.set_interval(.1, self.update_messages) def update_messages(self) -> None: self.out_port.send(self.STATUS_MSG) for msg in self.in_port.iter_pending(): self.process_message(msg) def compose(self) -> ComposeResult: yield Header() yield InputChannelRow(channels=range(0, self.INPUTS)) yield PlaybackChannelRow(channels=range(self.INPUTS, self.PLAYBACKS + self.INPUTS)) yield OutputChannelRow(channels=range(0, self.OUTPUTS)) yield Footer() """ Change the mix output """ def action_mix_select(self, n): self.current_mix_out = (self.current_mix_out + n) % self.OUTPUTS self.update_mixer_display() """ Update focused channel. Mix output follows focus. """ def action_change_focus(self, dx, dy): (x, y) = self.focused_channel y = (y + dy) % 3 x = x + dx if y == 0: x = x % self.INPUTS elif y == 1: x = x % self.PLAYBACKS elif y == 2: x = x % self.OUTPUTS self.current_mix_out = x self.focused_channel = (x, y) ch = self.query(ChannelRow)[y].query(Channel)[x] self.query(ChannelRow)[y].query(Channel)[x].focus() self.update_mixer_display() def on_channel_phantom_toggle(self, message: Channel.PhantomToggle) -> None: if message.channel == 0: self.out_port.send(encode_message(3, [0x00010001 ^ self.bf_status.ch1_48v << 16])) elif message.channel == 1: self.out_port.send(encode_message(3, [0x00020002 ^ self.bf_status.ch2_48v << 17])) def on_channel_inst_toggle(self, message: Channel.PhantomToggle) -> None: if message.channel == 1: self.out_port.send(encode_message(3, [0x00100010 ^ self.bf_status.ch2_inst << 20])) def on_channel_move_fader(self, message: Channel.MoveFader) -> None: if message.output: self.mixer.set_out_level(message.channel, self.mixer.outputs[message.channel] + message.change) else: self.mixer.set_mix_level(message.channel, self.current_mix_out, self.mixer.matrix[message.channel][self.current_mix_out] + message.change) self.update_mixer_display() def on_channel_set_fader(self, message: Channel.SetFader) -> None: if message.output: self.mixer.set_out_level(message.channel, message.val) else: self.mixer.set_mix_level(message.channel, self.current_mix_out, message.val) self.update_mixer_display() def on_channel_change_gain(self, message: Channel.ChangeGain) -> None: if message.channel > 1: return if message.channel == 0: gain = clamp(message.change + self.bf_status.ch1_gain, 6, 60) elif message.channel == 1: gain = clamp(message.change + self.bf_status.ch2_gain, 6, 60) gain_scaled = int((gain - 6) / 3) self.out_port.send(encode_message(4, [gain_scaled << 16 | message.channel])) def on_channel_mute_toggle(self, message: Channel.MuteToggle) -> None: if message.output: self.mixer.toggle_out_mute_status(message.channel) else: self.mixer.toggle_mix_mute_status(message.channel) self.update_mixer_display() def update_mixer_display(self): for row in self.query(ChannelRow): row.update_mixer_display(self.mixer, self.current_mix_out) def process_message(self, msg: mido.Message) -> None: subid, decoded_msg = decode_message(msg) if subid is None: return elif subid == 0x00: self.bf_status.parse(decoded_msg) self.bf_rms.parse0(decoded_msg) elif subid == 0x01: self.bf_rms.parse1(decoded_msg) elif subid == 0x02: self.bf_peak.parse(decoded_msg) else: self.messages.append(hex_format(decoded_msg)) self.query_one('#log').update(self.messages) input_row = self.query_one(InputChannelRow) input_row.rms_values = self.bf_rms.input_rms input_row.peak_values = self.bf_peak.input_peaks channels = input_row.query(Channel) channels[0].phantom = self.bf_status.ch1_48v channels[1].phantom = self.bf_status.ch2_48v channels[1].inst = self.bf_status.ch2_inst channels[0].gain = self.bf_status.ch1_gain channels[1].gain = self.bf_status.ch2_gain playback_row = self.query_one(PlaybackChannelRow) playback_row.rms_values = self.bf_rms.playback_rms playback_row.peak_values = self.bf_peak.playback_peaks output_row = self.query_one(OutputChannelRow) output_row.rms_values = self.bf_rms.output_rms output_row.peak_values = self.bf_peak.output_peaks dirty_faders = False for n, mixvol in enumerate(self.bf_status.out_levels()): if self.mixer.outputs[n] != mixvol: self.mixer.outputs[n] = mixvol dirty_faders = True if dirty_faders: self.update_mixer_display() for row in [input_row, playback_row, output_row]: row.mutate_reactive(ChannelRow.rms_values) row.mutate_reactive(ChannelRow.peak_values) if __name__ == '__main__': app = Mixer() app.run()