babymix/main.py
Brian Hrebec 04060ef5a1 Added basic functionality
- gain
- mute
- mixer
- channel setup options
- keybinds and mousebinds
- focus behavior
2026-03-12 23:33:13 -05:00

828 lines
26 KiB
Python

import math
from collections import namedtuple
from rich.console import ConsoleOptions, Console
from rich.segment import Segment
from rich.style import Style
from textual import events
from textual.app import App, ComposeResult, Widget, RenderResult
from textual.color import Color
from textual.containers import HorizontalScroll, Horizontal, Vertical, Container
from textual.events import Event, MouseEvent
from textual.geometry import Size
from textual.reactive import reactive, var
from textual.strip import Strip
from textual.widgets import Static, Button, Pretty, Log, Footer, Header
from textual.messages import Message
import mido
from encoding import decode_message, encode_message, hex_format, get_bitfield, level_to_db, format_db, clamp
import encoding
ChannelState = namedtuple('ChannelState', ['level', 'muted', 'soloed'])
class MixerMatrix:
def __init__(self, out_port: mido.ports.BaseOutput, in_channels=10, play_channels=12, out_channels=12) -> None:
self.out_port = out_port
self.matrix = [[ -120.0 for _ in range(out_channels) ] for _ in range(in_channels)] + [
[ 0.0 if p == o else -120.0 for p in range(play_channels)
] for o in range(out_channels)]
self.outputs = [0.0 for _ in range(out_channels)]
self.output_mutes = [False for _ in range(out_channels)]
self.output_solos = [False for _ in range(out_channels)]
self.input_mutes = [False for _ in range(in_channels + play_channels)]
self.input_solos = [False for _ in range(in_channels + play_channels)]
def toggle_out_solo_status(self, channel):
solo = self.output_solos[channel] = not self.output_solos[channel]
if solo or any(self.output_solos):
for channel, channel_solo in enumerate(self.output_solos):
if channel_solo:
self._send_out_update(channel, self.outputs[channel])
else:
self._send_out_update(channel, -120)
else:
for channel, level in enumerate(self.outputs):
if not self.output_mutes[channel]:
self._send_out_update(channel, level)
def toggle_out_mute_status(self, channel):
mute = self.output_mutes[channel] = not self.output_mutes[channel]
disabled = self.outputs[channel] == -120.0
soloed = self.output_solos[channel]
if mute and not disabled and not soloed:
self._send_out_update(channel, -120)
elif not mute and not disabled:
self._send_out_update(channel, self.outputs[channel])
def set_out_level(self, channel, val):
self.outputs[channel] = clamp(val, -120, 6)
if not self.output_mutes[channel]:
self._send_out_update(channel, self.outputs[channel])
def toggle_mix_mute_status(self, in_channel):
mute = self.input_mutes[in_channel] = not self.input_mutes[in_channel]
for out_channel, level in enumerate(self.matrix[in_channel]):
if mute and level != -120:
self._send_mix_update(in_channel, out_channel, -120)
else:
self._send_mix_update(in_channel, out_channel, level)
def set_mix_level(self, in_channel, out_channel, val):
self.matrix[in_channel][out_channel] = clamp(val, -120, 6)
if not self.input_mutes[in_channel]:
self._send_mix_update(in_channel, out_channel, val)
def _send_out_update(self, out_channel, db_val):
reg = 0x3e0 | out_channel
msg = encoding.db_to_fader(db_val) << 12 | reg
self.out_port.send(encode_message(encoding.SUBID_MIXER, [msg]))
if out_channel < 4:
gain_val = encoding.db_to_level(clamp(db_val, -92, 6))
gain_msg = gain_val << 16 | 0x2 + out_channel
self.out_port.send(encode_message(encoding.SUBID_GAIN, [gain_msg]))
def _send_mix_update(self, in_channel, out_channel, db_val):
reg = out_channel * 24 + in_channel
msg = encoding.db_to_fader(db_val) << 12 | reg
self.out_port.send(encode_message(encoding.SUBID_MIXER, [msg]))
class FaderIndicator(Widget):
DEFAULT_CSS = """
FaderIndicator {
width: 1;
}
"""
value = reactive(0.925)
def render_line(self, y: int):
if y == math.floor((self.region.height - 1) * (1 - self.value)):
c = '<'
else:
c = ' '
return Strip((Segment(c),), 1)
def get_content_width(self, container: Size, viewport: Size) -> int:
return 1
class MeterGradient:
FRACTIONS = [
' ',
'',
'',
'',
'',
'',
'',
'',
'',
]
def __init__(self, value: int, color1: str, color2: str) -> None:
self._color1 = Color.parse(color1)
self._color2 = Color.parse(color2)
self._value = 1 - value
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
width = options.max_width
height = options.height or options.max_height
color1 = self._color1
color2 = self._color2
from_color = Style.from_color
blend = color1.blend
rich_color1 = color1.rich_color
top_char = self.FRACTIONS[math.floor((self._value * height * 8) % 8)]
top = math.ceil(self._value * height)
for y in range(height):
rel_y = y / (height - 1)
if y < top:
yield Segment(f"{width * ' '}\n")
else:
line_color = from_color(
(
blend(color2, rel_y).rich_color
if height > 1
else rich_color1
),
None,
)
char = top_char if y == top else ''
yield Segment(f"{width * char}\n", line_color)
class Meter(Widget):
DEFAULT_CSS = """
Meter {
width: 1
}
"""
value = reactive(1.0)
def render(self) -> RenderResult:
return MeterGradient(self.value, '#04ff00', '#04aa00')
class Channel(Widget):
DEFAULT_CSS = """
Channel {
width: 12;
border: solid #333333;
Horizontal {
height: 100%;
}
.label {
width: 100%;
text-align: center;
}
.controls {
margin-left: 1;
margin-right: 1;
width: 5
}
Button {
margin-bottom: 1;
padding: 0;
min-width: 6;
height: 1;
}
.mute.enabled {
background: #00f !important;
}
.solo.enabled {
background: #0ff !important;
}
.phantom.enabled {
background: #f00 !important;
}
.inst.enabled {
background: #ff0 !important;
}
.indicators-wrap {
margin-left: 1;
align: left bottom;
}
.indicators {
height: 4;
}
.peak, .rms {
color: #55aa00;
}
.level {
text-style: bold;
}
}
Channel.selected {
border: #fff;
}
Channel:focus {
background: #222;
border: #fff;
}
"""
BINDINGS = [
('equals', 'move_fader(.5)'),
('plus', 'move_fader(.5)'),
('minus', 'move_fader(-.5)'),
('m', 'mute'),
('s', 'solo'),
('p', 'phantom'),
('i', 'inst'),
('q', 'gain(-3)'),
('e', 'gain(3)'),
]
can_focus = True
selected = reactive(False)
mute = reactive(False)
solo = reactive(False)
phantom = reactive(False)
inst = reactive(False)
gain = reactive(0.0)
level = reactive(0.0)
rms_value = var(0)
peak_value = var(0)
db_rms_value = reactive(-100.0)
db_peak_value = reactive(-100.0)
RMS_FACTOR = float(2 ** 54)
PEAK_FACTOR = float(2 ** 23)
class MoveFader(Message):
def __init__(self, channel: int, output=False, change=0):
self.channel = channel
self.output = output
self.change = change
super().__init__()
class ChangeGain(MoveFader):
pass
class ChannelMessage(Message):
def __init__(self, channel: int, output=False):
self.channel = channel
self.output = output
super().__init__()
class MuteToggle(ChannelMessage):
pass
class SoloToggle(ChannelMessage):
pass
class PhantomToggle(ChannelMessage):
pass
class InstToggle(ChannelMessage):
pass
class LevelSet(Message):
def __init__(self, channel: int, val: int, output=False):
self.channel = channel
self.val = val
self.output = output
super().__init__()
def __init__(self, channel: int, label: str, phantom=False, inst=False, output=False, *children: Widget):
super().__init__(*children)
self.channel = channel
self.label = label
self.phantom_available = phantom
self.inst_available = inst
self.output = output
def compute_db_peak_value(self) -> float:
try:
return 20 * math.log10((self.peak_value >> 4) / self.PEAK_FACTOR)
except ValueError:
return -120
def compute_db_rms_value(self) -> float:
try:
return 10 * math.log10(self.rms_value / self.RMS_FACTOR)
except ValueError:
return -120
def watch_level(self, val) -> None:
self.query_one('.level').update(format_db(val))
self.query_one('#level-indicator').value = encoding.map_range(val, -75, 6, 0, 1)
def watch_gain(self, val) -> None:
if not self.phantom_available:
return
self.query_one('#gain').update(format_db(val))
def watch_mute(self, val) -> None:
if val:
self.query_one('#mute').add_class('enabled')
else:
self.query_one('#mute').remove_class('enabled')
def watch_solo(self, val) -> None:
if val:
self.query_one('#solo').add_class('enabled')
else:
self.query_one('#solo').remove_class('enabled')
def watch_phantom(self, val) -> None:
if not self.phantom_available:
return
if val:
self.query_one('#phantom').add_class('enabled')
else:
self.query_one('#phantom').remove_class('enabled')
def watch_inst(self, val) -> None:
if not self.inst_available:
return
if val:
self.query_one('#inst').add_class('enabled')
else:
self.query_one('#inst').remove_class('enabled')
def watch_selected(self, val) -> None:
if val:
self.add_class('selected')
else:
self.remove_class('selected')
def watch_db_peak_value(self, val) -> None:
self.query_one('.peak').update(format_db(val))
def watch_db_rms_value(self, val) -> None:
self.query_one('.rms').update(format_db(val))
self.query_one(Meter).value = encoding.map_range(val, -75, 6, 0, 1)
def compose(self) -> ComposeResult:
yield Static(self.label, classes='label')
with Horizontal():
yield Meter(id='meter')
yield FaderIndicator(id='level-indicator')
with Vertical():
with Vertical(classes='controls'):
yield Button("M", classes='mute', compact=True, id='mute')
yield Button("S", classes='solo', compact=True, id='solo')
if self.phantom_available:
yield Button("48v", classes='phantom', compact=True, id='phantom')
if self.inst_available:
yield Button("Inst", classes='inst', compact=True, id='inst')
if self.phantom_available:
yield Static('0.0', id='gain')
with Vertical(classes='indicators-wrap'):
with Container(classes='indicators'):
yield Static('0.0', classes='level')
yield Static('0.0', classes='peak')
yield Static('0.1', classes='rms')
def action_mute(self) -> None:
self.post_message(self.MuteToggle(self.channel, output=self.output))
def action_solo(self) -> None:
self.post_message(self.SoloToggle(self.channel, output=self.output))
def action_phantom(self) -> None:
if not self.phantom_available:
return
self.post_message(self.PhantomToggle(self.channel, output=self.output))
def action_inst(self) -> None:
if not self.inst_available:
return
self.post_message(self.InstToggle(self.channel, output=self.output))
def action_move_fader(self, change) -> None:
self.post_message(self.MoveFader(self.channel, output=self.output, change=change))
def action_gain(self, change) -> None:
self.post_message(self.ChangeGain(self.channel, output=self.output, change=change))
def _on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None:
if event.shift and self.phantom_available:
self.action_gain(3)
else:
self.action_move_fader(1)
def _on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None:
if event.shift and self.phantom_available:
self.action_gain(-3)
else:
self.action_move_fader(-1)
def on_button_pressed(self, event):
if 'mute' == event.button.id:
self.action_mute()
elif 'solo' == event.button.id:
self.action_solo()
elif 'phantom' == event.button.id:
self.action_phantom()
elif 'inst' == event.button.id:
self.action_inst()
class ChannelRow(Widget):
DEFAULT_CSS = """
ChannelRow {
height: 33%
}
"""
rms_values = var([])
peak_values = var([])
level_values = var([])
def __init__(self, channels: range, *children: Widget):
super().__init__(*children)
self.channels = channels
def watch_rms_values(self, new_values):
for channel, value in zip(self.query(Channel), new_values):
channel.rms_value = value
def watch_peak_values(self, new_values):
for channel, value in zip(self.query(Channel), new_values):
channel.peak_value = value
def watch_level_values(self, new_values):
for channel, value in zip(self.query(Channel), new_values):
channel.level_value = value
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
pass
class InputChannelRow(ChannelRow):
def compose(self) -> ComposeResult:
with HorizontalScroll():
yield Channel(self.channels[0], 'AN1', phantom=True)
yield Channel(self.channels[1], 'AN2', phantom=True, inst=True)
for n in range(0, 8):
yield Channel(channel=self.channels[n+2], label='ADAT{n}'.format(n=n+1))
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
for channel_widget, levels, mute, solo in zip(
self.query(Channel),
mixer.matrix[:self.channels[-1]],
mixer.input_mutes[:self.channels[-1]],
mixer.input_solos[:self.channels[-1]]):
channel_widget.level = levels[current_out]
channel_widget.mute = mute
channel_widget.solo = solo
class PlaybackChannelRow(ChannelRow):
LABELS = [
'AN1',
'AN2',
'PH3',
'PH4',
'ADAT1',
'ADAT2',
'ADAT3',
'ADAT4',
'ADAT5',
'ADAT6',
'ADAT7',
'ADAT8',
]
def compose(self) -> ComposeResult:
with HorizontalScroll():
for n, label in zip(self.channels, self.LABELS):
yield Channel(channel=n, label=label)
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
for channel_widget, levels, mute, solo in zip(
self.query(Channel),
mixer.matrix[self.channels[0]:self.channels[-1]],
mixer.input_mutes[self.channels[0]:self.channels[-1]],
mixer.input_solos[self.channels[0]:self.channels[-1]]):
channel_widget.level = levels[current_out]
channel_widget.mute = mute
channel_widget.solo = solo
class OutputChannelRow(ChannelRow):
LABELS = [
'AN1',
'AN2',
'PH3',
'PH4',
'ADAT1',
'ADAT2',
'ADAT3',
'ADAT4',
'ADAT5',
'ADAT6',
'ADAT7',
'ADAT8',
]
def compose(self) -> ComposeResult:
with HorizontalScroll():
with HorizontalScroll():
for n, label in zip(self.channels, self.LABELS):
yield Channel(channel=n, label=label, output=True)
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
for idx, channel_widget, level, mute, solo in zip(
self.channels,
self.query(Channel),
mixer.outputs,
mixer.output_mutes,
mixer.output_solos):
channel_widget.level = level
channel_widget.mute = mute
channel_widget.solo = solo
channel_widget.selected = idx == current_out
class BFStatus:
word0 = 0
word1 = 0
word2 = 0
# word 0
ch1_48v = 0
ch2_48v = 0
ch2_inst = 0
digital_out = 0 # 0 = ADAT, 1 = SPDIF
# word 1
out_1_vol = 0
out_2_vol = 0
ch1_gain = 0
ch2_gain = 0
# word 2
out_3_vol = 0
out_4_vol = 0
dim = 0
def parse(self, msg):
self.word0 = format(msg[0], '032b')
if len(msg) > 2:
self.word1 = format(msg[1], '032b')
self.word2 = format(msg[2], '032b')
self.ch1_48v = get_bitfield(msg[0], 30, 1)
self.ch2_48v = get_bitfield(msg[0], 31, 1)
self.ch2_inst = get_bitfield(msg[0], 29, 1)
self.digital_out = get_bitfield(msg[0], 28, 1)
self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8))
self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8))
self.ch2_gain = get_bitfield(msg[1], 24, 5) * 3 + 6
self.ch1_gain = get_bitfield(msg[1], 19, 5) * 3 + 6
self.out_3_vol = level_to_db(get_bitfield(msg[2], 14, 8))
self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8))
self.dim = get_bitfield(msg[2], 5, 1)
def out_levels(self):
return (
self.out_1_vol,
self.out_2_vol,
self.out_3_vol,
self.out_4_vol,
)
class BFRMS:
def __init__(self):
self.input_rms = [0 for _ in range(10)]
self.playback_rms = [0 for _ in range(12)]
self.fx_in_rms = [0, 0]
self.output_rms = [0 for _ in range(12)]
self.fx_out_rms = [0, 0]
def _parse_rms(self, values, target, offset=0):
n = offset
for i in range(0, len(values), 2):
target[n] = (values[i] | (values[i + 1] << 32))
n += 1
def parse0(self, id0_msg):
self._parse_rms(id0_msg[3:23], self.input_rms)
self._parse_rms(id0_msg[23:], self.playback_rms)
def parse1(self, id1_msg):
self._parse_rms(id1_msg[:6], self.playback_rms, 9)
self._parse_rms(id1_msg[6:10], self.fx_in_rms)
self._parse_rms(id1_msg[10:34], self.output_rms)
self._parse_rms(id1_msg[34:38], self.fx_in_rms)
class BFPeak:
def __init__(self):
self.input_peaks = [0 for _ in range(10)]
self.playback_peaks = [0 for _ in range(12)]
self.fx_in_peaks = [0, 0]
self.output_peaks = [0 for _ in range(12)]
self.fx_out_peaks = [0, 0]
def parse(self, msg):
self.input_peaks = msg[:10]
self.playback_peaks = msg[10:22]
self.fx_in_peaks = msg[22:24]
self.output_peaks = msg[24:36]
self.fx_out_peaks = msg[36:38]
class Status(Widget):
status_data = reactive({})
def compose(self) -> ComposeResult:
yield Pretty(self.status_data, id='status')
yield Pretty(self.status_data, id='rms')
yield Pretty(self.status_data, id='peak')
yield Log(self.status_data, id='log')
class Mixer(App):
in_port: mido.ports.BaseInput
out_port: mido.ports.BaseOutput
messages = []
messageset = set()
STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10])
bf_status = BFStatus()
bf_rms = BFRMS()
bf_peak = BFPeak()
current_mix_out = 0
focused_channel = (0, 2)
INPUTS = 10
PLAYBACKS = 12
OUTPUTS = 12
BINDINGS = [
('o', 'mix_select(1)'),
('u', 'mix_select(-1)'),
('h', 'change_focus(-1, 0)'),
('j', 'change_focus(0, 1)'),
('k', 'change_focus(0, -1)'),
('l', 'change_focus(1, 0)'),
]
def __init__(self):
super().__init__()
in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name]
out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name]
if in_ports:
self.in_port = mido.open_input(in_ports[0])
else:
self.in_port = mido.open_input('Mixer', virtual=True)
if out_ports:
self.out_port = mido.open_output(out_ports[0])
else:
self.out_port = mido.open_output('Mixer', virtual=True)
self.mixer = MixerMatrix(self.out_port)
def on_mount(self) -> None:
self.set_interval(.1, self.update_messages)
def update_messages(self) -> None:
self.out_port.send(self.STATUS_MSG)
for msg in self.in_port.iter_pending():
self.process_message(msg)
def compose(self) -> ComposeResult:
yield Header()
yield InputChannelRow(channels=range(0, self.INPUTS))
yield PlaybackChannelRow(channels=range(self.INPUTS, self.PLAYBACKS + self.INPUTS))
yield OutputChannelRow(channels=range(0, self.OUTPUTS))
yield Footer()
"""
Change the mix output
"""
def action_mix_select(self, n):
self.current_mix_out = (self.current_mix_out + n) % self.OUTPUTS
self.update_mixer_display()
"""
Update focused channel. Mix output follows focus.
"""
def action_change_focus(self, dx, dy):
(x, y) = self.focused_channel
y = (y + dy) % 3
x = x + dx
if y == 0:
x = x % self.INPUTS
elif y == 1:
x = x % self.PLAYBACKS
elif y == 2:
x = x % self.OUTPUTS
self.current_mix_out = x
self.focused_channel = (x, y)
ch = self.query(ChannelRow)[y].query(Channel)[x]
self.query(ChannelRow)[y].query(Channel)[x].focus()
self.update_mixer_display()
def _move_fader(self, dval):
self.update_mixer_display()
def on_channel_phantom_toggle(self, message: Channel.PhantomToggle) -> None:
if message.channel == 0:
self.out_port.send(encode_message(3, [0x00010001 ^ self.bf_status.ch1_48v << 16]))
elif message.channel == 1:
self.out_port.send(encode_message(3, [0x00020002 ^ self.bf_status.ch2_48v << 17]))
def on_channel_inst_toggle(self, message: Channel.PhantomToggle) -> None:
if message.channel == 1:
self.out_port.send(encode_message(3, [0x00100010 ^ self.bf_status.ch2_inst << 20]))
def on_channel_move_fader(self, message: Channel.MoveFader) -> None:
if message.output:
self.mixer.set_out_level(message.channel, self.mixer.outputs[message.channel] + message.change)
else:
self.mixer.set_mix_level(message.channel, self.current_mix_out,
self.mixer.matrix[message.channel][self.current_mix_out] + message.change)
self.update_mixer_display()
def on_channel_change_gain(self, message: Channel.ChangeGain) -> None:
if message.channel > 1:
return
if message.channel == 0:
gain = clamp(message.change + self.bf_status.ch1_gain, 6, 60)
elif message.channel == 1:
gain = clamp(message.change + self.bf_status.ch2_gain, 6, 60)
gain_scaled = int((gain - 6) / 3)
self.out_port.send(encode_message(4, [gain_scaled << 16 | message.channel]))
def on_channel_mute_toggle(self, message: Channel.MuteToggle) -> None:
if message.output:
self.mixer.toggle_out_mute_status(message.channel)
else:
self.mixer.toggle_mix_mute_status(message.channel)
self.update_mixer_display()
def update_mixer_display(self):
for row in self.query(ChannelRow):
row.update_mixer_display(self.mixer, self.current_mix_out)
def process_message(self, msg: mido.Message) -> None:
subid, decoded_msg = decode_message(msg)
if subid is None:
return
elif subid == 0x00:
self.bf_status.parse(decoded_msg)
self.bf_rms.parse0(decoded_msg)
elif subid == 0x01:
self.bf_rms.parse1(decoded_msg)
elif subid == 0x02:
self.bf_peak.parse(decoded_msg)
else:
self.messages.append(hex_format(decoded_msg))
self.query_one('#log').update(self.messages)
input_row = self.query_one(InputChannelRow)
input_row.rms_values = self.bf_rms.input_rms
input_row.peak_values = self.bf_peak.input_peaks
channels = input_row.query(Channel)
channels[0].phantom = self.bf_status.ch1_48v
channels[1].phantom = self.bf_status.ch2_48v
channels[1].inst = self.bf_status.ch2_inst
channels[0].gain = self.bf_status.ch1_gain
channels[1].gain = self.bf_status.ch2_gain
playback_row = self.query_one(PlaybackChannelRow)
playback_row.rms_values = self.bf_rms.playback_rms
playback_row.peak_values = self.bf_peak.playback_peaks
output_row = self.query_one(OutputChannelRow)
output_row.rms_values = self.bf_rms.output_rms
output_row.peak_values = self.bf_peak.output_peaks
dirty_faders = False
for n, mixvol in enumerate(self.bf_status.out_levels()):
if self.mixer.outputs[n] != mixvol:
self.mixer.outputs[n] = mixvol
dirty_faders = True
if dirty_faders:
self.update_mixer_display()
for row in [input_row, playback_row, output_row]:
row.mutate_reactive(ChannelRow.rms_values)
row.mutate_reactive(ChannelRow.peak_values)
if __name__ == '__main__':
app = Mixer()
app.run()