862 lines
28 KiB
Python
862 lines
28 KiB
Python
import math
|
|
from collections import namedtuple
|
|
|
|
from rich.console import ConsoleOptions, Console
|
|
from rich.segment import Segment
|
|
from rich.style import Style
|
|
from textual import events
|
|
from textual.app import App, ComposeResult, Widget, RenderResult
|
|
from textual.binding import Binding
|
|
from textual.color import Color
|
|
from textual.containers import HorizontalScroll, Horizontal, Vertical, Container
|
|
from textual.events import Event, MouseEvent
|
|
from textual.geometry import Size
|
|
from textual.reactive import reactive, var
|
|
from textual.strip import Strip
|
|
from textual.widgets import Static, Button, Pretty, Log, Footer, Header
|
|
from textual.messages import Message
|
|
import mido
|
|
|
|
from encoding import decode_message, encode_message, hex_format, get_bitfield, level_to_db, format_db, clamp
|
|
import encoding
|
|
|
|
ChannelState = namedtuple('ChannelState', ['level', 'muted', 'soloed'])
|
|
|
|
|
|
class MixerMatrix:
|
|
def __init__(self, out_port: mido.ports.BaseOutput, in_channels=10, play_channels=12, out_channels=12) -> None:
|
|
self.out_port = out_port
|
|
self.matrix = [[ -120.0 for _ in range(out_channels) ] for _ in range(in_channels)] + [
|
|
[ 0.0 if p == o else -120.0 for p in range(play_channels)
|
|
] for o in range(out_channels)]
|
|
|
|
self.outputs = [0.0 for _ in range(out_channels)]
|
|
self.output_mutes = [False for _ in range(out_channels)]
|
|
self.output_solos = [False for _ in range(out_channels)]
|
|
self.input_mutes = [False for _ in range(in_channels + play_channels)]
|
|
self.input_solos = [False for _ in range(in_channels + play_channels)]
|
|
|
|
def toggle_out_solo_status(self, channel):
|
|
solo = self.output_solos[channel] = not self.output_solos[channel]
|
|
if solo or any(self.output_solos):
|
|
for channel, channel_solo in enumerate(self.output_solos):
|
|
if channel_solo:
|
|
self._send_out_update(channel, self.outputs[channel])
|
|
else:
|
|
self._send_out_update(channel, -120)
|
|
else:
|
|
for channel, level in enumerate(self.outputs):
|
|
if not self.output_mutes[channel]:
|
|
self._send_out_update(channel, level)
|
|
|
|
def toggle_out_mute_status(self, channel):
|
|
mute = self.output_mutes[channel] = not self.output_mutes[channel]
|
|
disabled = self.outputs[channel] == -120.0
|
|
soloed = self.output_solos[channel]
|
|
if mute and not disabled and not soloed:
|
|
self._send_out_update(channel, -120)
|
|
elif not mute and not disabled:
|
|
self._send_out_update(channel, self.outputs[channel])
|
|
|
|
def set_out_level(self, channel, val):
|
|
self.outputs[channel] = clamp(val, -120, 6)
|
|
if not self.output_mutes[channel]:
|
|
self._send_out_update(channel, self.outputs[channel])
|
|
|
|
def toggle_mix_mute_status(self, in_channel):
|
|
mute = self.input_mutes[in_channel] = not self.input_mutes[in_channel]
|
|
for out_channel, level in enumerate(self.matrix[in_channel]):
|
|
if mute and level != -120:
|
|
self._send_mix_update(in_channel, out_channel, -120)
|
|
else:
|
|
self._send_mix_update(in_channel, out_channel, level)
|
|
|
|
def set_mix_level(self, in_channel, out_channel, val):
|
|
self.matrix[in_channel][out_channel] = clamp(val, -120, 6)
|
|
if not self.input_mutes[in_channel]:
|
|
self._send_mix_update(in_channel, out_channel, val)
|
|
|
|
def _send_out_update(self, out_channel, db_val):
|
|
reg = 0x3e0 | out_channel
|
|
msg = encoding.db_to_fader(db_val) << 12 | reg
|
|
|
|
self.out_port.send(encode_message(encoding.SUBID_MIXER, [msg]))
|
|
if out_channel < 4:
|
|
gain_val = encoding.db_to_level(clamp(db_val, -92, 6))
|
|
gain_msg = gain_val << 16 | 0x2 + out_channel
|
|
self.out_port.send(encode_message(encoding.SUBID_GAIN, [gain_msg]))
|
|
|
|
def _send_mix_update(self, in_channel, out_channel, db_val):
|
|
reg = out_channel * 24 + in_channel
|
|
msg = encoding.db_to_fader(db_val) << 12 | reg
|
|
self.out_port.send(encode_message(encoding.SUBID_MIXER, [msg]))
|
|
|
|
|
|
class FaderIndicator(Widget):
|
|
DEFAULT_CSS = """
|
|
FaderIndicator {
|
|
width: 1;
|
|
}
|
|
"""
|
|
value = reactive(0.925)
|
|
def render_line(self, y: int):
|
|
if y == math.floor((self.region.height - 1) * (1 - self.value)):
|
|
c = '<'
|
|
else:
|
|
c = ' '
|
|
return Strip((Segment(c),), 1)
|
|
|
|
def get_content_width(self, container: Size, viewport: Size) -> int:
|
|
return 1
|
|
|
|
|
|
class MeterGradient:
|
|
FRACTIONS = [
|
|
' ',
|
|
'▁',
|
|
'▂',
|
|
'▃',
|
|
'▄',
|
|
'▅',
|
|
'▆',
|
|
'▇',
|
|
'█',
|
|
]
|
|
|
|
def __init__(self, value: int, color1: str, color2: str) -> None:
|
|
self._color1 = Color.parse(color1)
|
|
self._color2 = Color.parse(color2)
|
|
self._value = 1 - value
|
|
|
|
def __rich_console__(
|
|
self, console: Console, options: ConsoleOptions
|
|
) -> RenderResult:
|
|
width = options.max_width
|
|
height = options.height or options.max_height
|
|
color1 = self._color1
|
|
color2 = self._color2
|
|
from_color = Style.from_color
|
|
blend = color1.blend
|
|
rich_color1 = color1.rich_color
|
|
top_char = self.FRACTIONS[math.floor((self._value * height * 8) % 8)]
|
|
top = math.ceil(self._value * height)
|
|
for y in range(height):
|
|
rel_y = y / (height - 1)
|
|
if y < top:
|
|
yield Segment(f"{width * ' '}\n")
|
|
else:
|
|
line_color = from_color(
|
|
(
|
|
blend(color2, rel_y).rich_color
|
|
if height > 1
|
|
else rich_color1
|
|
),
|
|
None,
|
|
)
|
|
char = top_char if y == top else '█'
|
|
yield Segment(f"{width * char}\n", line_color)
|
|
|
|
|
|
class Meter(Widget):
|
|
DEFAULT_CSS = """
|
|
Meter {
|
|
width: 1
|
|
}
|
|
"""
|
|
value = reactive(1.0)
|
|
|
|
def render(self) -> RenderResult:
|
|
return MeterGradient(self.value, '#04ff00', '#04aa00')
|
|
|
|
|
|
class Channel(Widget):
|
|
DEFAULT_CSS = """
|
|
Channel {
|
|
width: 12;
|
|
border: solid #333333;
|
|
Horizontal {
|
|
height: 100%;
|
|
}
|
|
.label {
|
|
width: 100%;
|
|
text-align: center;
|
|
}
|
|
.controls {
|
|
margin-left: 1;
|
|
margin-right: 1;
|
|
width: 5
|
|
}
|
|
Button {
|
|
margin-bottom: 1;
|
|
padding: 0;
|
|
min-width: 6;
|
|
height: 1;
|
|
}
|
|
.mute.enabled {
|
|
background: #00f !important;
|
|
}
|
|
.solo.enabled {
|
|
background: #0ff !important;
|
|
}
|
|
.phantom.enabled {
|
|
background: #f00 !important;
|
|
}
|
|
.inst.enabled {
|
|
background: #ff0 !important;
|
|
}
|
|
.indicators-wrap {
|
|
margin-left: 1;
|
|
align: left bottom;
|
|
}
|
|
.indicators {
|
|
height: 4;
|
|
}
|
|
.peak, .rms {
|
|
color: #55aa00;
|
|
}
|
|
.level {
|
|
text-style: bold;
|
|
}
|
|
}
|
|
Channel.selected {
|
|
border: #fff;
|
|
}
|
|
Channel:focus {
|
|
background: #222;
|
|
border: #fff;
|
|
}
|
|
"""
|
|
|
|
BINDINGS = [
|
|
('equals', 'move_fader(.5)'),
|
|
('plus', 'move_fader(.5)'),
|
|
('minus', 'move_fader(-.5)'),
|
|
Binding('m', 'mute', "Mute", show=True),
|
|
Binding('s', 'solo', "Solo", show=True),
|
|
Binding('p', 'phantom', "48v", show=True),
|
|
Binding('i', 'inst', "Inst.", show=True),
|
|
Binding('q', 'gain(-3)', "+gain", show=True),
|
|
Binding('e', 'gain(3)', "-gain", show=True),
|
|
]
|
|
|
|
can_focus = True
|
|
selected = reactive(False)
|
|
mute = reactive(False)
|
|
solo = reactive(False)
|
|
phantom = reactive(False)
|
|
inst = reactive(False)
|
|
gain = reactive(0.0)
|
|
level = reactive(0.0)
|
|
rms_value = var(0)
|
|
peak_value = var(0)
|
|
db_rms_value = reactive(-100.0)
|
|
db_peak_value = reactive(-100.0)
|
|
RMS_FACTOR = float(2 ** 54)
|
|
PEAK_FACTOR = float(2 ** 23)
|
|
|
|
class MoveFader(Message):
|
|
def __init__(self, channel: int, output=False, change=0):
|
|
self.channel = channel
|
|
self.output = output
|
|
self.change = change
|
|
super().__init__()
|
|
|
|
class SetFader(Message):
|
|
def __init__(self, channel: int, output=False, val=0):
|
|
self.channel = channel
|
|
self.output = output
|
|
self.val = val
|
|
super().__init__()
|
|
|
|
class ChangeGain(MoveFader):
|
|
pass
|
|
|
|
class ChannelMessage(Message):
|
|
def __init__(self, channel: int, output=False):
|
|
self.channel = channel
|
|
self.output = output
|
|
super().__init__()
|
|
|
|
class MuteToggle(ChannelMessage):
|
|
pass
|
|
|
|
class SoloToggle(ChannelMessage):
|
|
pass
|
|
|
|
class PhantomToggle(ChannelMessage):
|
|
pass
|
|
|
|
class InstToggle(ChannelMessage):
|
|
pass
|
|
|
|
class LevelSet(Message):
|
|
def __init__(self, channel: int, val: int, output=False):
|
|
self.channel = channel
|
|
self.val = val
|
|
self.output = output
|
|
super().__init__()
|
|
|
|
|
|
def __init__(self, channel: int, label: str, phantom=False, inst=False, output=False, *children: Widget):
|
|
super().__init__(*children)
|
|
self.channel = channel
|
|
self.label = label
|
|
self.phantom_available = phantom
|
|
self.inst_available = inst
|
|
self.output = output
|
|
self.mouse_moved = False
|
|
|
|
def compute_db_peak_value(self) -> float:
|
|
try:
|
|
return 20 * math.log10((self.peak_value >> 4) / self.PEAK_FACTOR)
|
|
except ValueError:
|
|
return -120
|
|
|
|
def compute_db_rms_value(self) -> float:
|
|
try:
|
|
return 10 * math.log10(self.rms_value / self.RMS_FACTOR)
|
|
except ValueError:
|
|
return -120
|
|
|
|
def watch_level(self, val) -> None:
|
|
self.query_one('.level').update(format_db(val))
|
|
self.query_one('#level-indicator').value = encoding.map_range(val, -75, 6, 0, 1)
|
|
|
|
def watch_gain(self, val) -> None:
|
|
if not self.phantom_available:
|
|
return
|
|
|
|
self.query_one('#gain').update(format_db(val))
|
|
|
|
def watch_mute(self, val) -> None:
|
|
if val:
|
|
self.query_one('#mute').add_class('enabled')
|
|
else:
|
|
self.query_one('#mute').remove_class('enabled')
|
|
|
|
def watch_solo(self, val) -> None:
|
|
if val:
|
|
self.query_one('#solo').add_class('enabled')
|
|
else:
|
|
self.query_one('#solo').remove_class('enabled')
|
|
|
|
def watch_phantom(self, val) -> None:
|
|
if not self.phantom_available:
|
|
return
|
|
|
|
if val:
|
|
self.query_one('#phantom').add_class('enabled')
|
|
else:
|
|
self.query_one('#phantom').remove_class('enabled')
|
|
|
|
def watch_inst(self, val) -> None:
|
|
if not self.inst_available:
|
|
return
|
|
|
|
if val:
|
|
self.query_one('#inst').add_class('enabled')
|
|
else:
|
|
self.query_one('#inst').remove_class('enabled')
|
|
|
|
def watch_selected(self, val) -> None:
|
|
if val:
|
|
self.add_class('selected')
|
|
else:
|
|
self.remove_class('selected')
|
|
|
|
|
|
def watch_db_peak_value(self, val) -> None:
|
|
self.query_one('.peak').update(format_db(val))
|
|
|
|
def watch_db_rms_value(self, val) -> None:
|
|
self.query_one('.rms').update(format_db(val))
|
|
self.query_one(Meter).value = encoding.map_range(val, -75, 6, 0, 1)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Static(self.label, classes='label')
|
|
with Horizontal():
|
|
yield Meter(id='meter')
|
|
yield FaderIndicator(id='level-indicator')
|
|
with Vertical():
|
|
with Vertical(classes='controls'):
|
|
yield Button("M", classes='mute', compact=True, id='mute')
|
|
yield Button("S", classes='solo', compact=True, id='solo')
|
|
if self.phantom_available:
|
|
yield Button("48v", classes='phantom', compact=True, id='phantom')
|
|
if self.inst_available:
|
|
yield Button("Inst", classes='inst', compact=True, id='inst')
|
|
if self.phantom_available:
|
|
yield Static('0.0', id='gain')
|
|
with Vertical(classes='indicators-wrap'):
|
|
with Container(classes='indicators'):
|
|
yield Static('0.0', classes='level')
|
|
yield Static('0.0', classes='peak')
|
|
yield Static('0.1', classes='rms')
|
|
|
|
def action_mute(self) -> None:
|
|
self.post_message(self.MuteToggle(self.channel, output=self.output))
|
|
|
|
def action_solo(self) -> None:
|
|
self.post_message(self.SoloToggle(self.channel, output=self.output))
|
|
|
|
def action_phantom(self) -> None:
|
|
if not self.phantom_available:
|
|
return
|
|
|
|
self.post_message(self.PhantomToggle(self.channel, output=self.output))
|
|
|
|
def action_inst(self) -> None:
|
|
if not self.inst_available:
|
|
return
|
|
|
|
self.post_message(self.InstToggle(self.channel, output=self.output))
|
|
|
|
def action_move_fader(self, change) -> None:
|
|
self.post_message(self.MoveFader(self.channel, output=self.output, change=change))
|
|
|
|
def action_gain(self, change) -> None:
|
|
self.post_message(self.ChangeGain(self.channel, output=self.output, change=change))
|
|
|
|
def _on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None:
|
|
if event.shift and self.phantom_available:
|
|
self.action_gain(3)
|
|
else:
|
|
self.action_move_fader(1)
|
|
|
|
def _on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None:
|
|
if event.shift and self.phantom_available:
|
|
self.action_gain(-3)
|
|
else:
|
|
self.action_move_fader(-1)
|
|
|
|
def _on_mouse_down(self, event: events.MouseDown) -> None:
|
|
pass
|
|
|
|
def _on_mouse_up(self, event: events.MouseUp) -> None:
|
|
pass
|
|
|
|
def _set_level_from_mouse_event(self, event: events.MouseEvent):
|
|
position = 1 - (event.pointer_y / (event.widget.region.height - 1))
|
|
db = encoding.map_range(position, 0, 1, -75, 6)
|
|
self.post_message(self.SetFader(self.channel, output=self.output, val=db))
|
|
|
|
def _on_mouse_move(self, event: events.MouseMove) -> None:
|
|
if event.button == 1 and event.widget == self.query_one('#level-indicator'):
|
|
self._set_level_from_mouse_event(event)
|
|
self.mouse_moved = True
|
|
|
|
def _on_click(self, event: events.Click) -> None:
|
|
if not self.mouse_moved and event.button == 1 and event.widget == self.query_one('#level-indicator'):
|
|
self._set_level_from_mouse_event(event)
|
|
self.mouse_moved = False
|
|
|
|
|
|
def on_button_pressed(self, event):
|
|
if 'mute' == event.button.id:
|
|
self.action_mute()
|
|
elif 'solo' == event.button.id:
|
|
self.action_solo()
|
|
elif 'phantom' == event.button.id:
|
|
self.action_phantom()
|
|
elif 'inst' == event.button.id:
|
|
self.action_inst()
|
|
|
|
|
|
|
|
class ChannelRow(Widget):
|
|
DEFAULT_CSS = """
|
|
ChannelRow {
|
|
height: 33%
|
|
}
|
|
"""
|
|
|
|
rms_values = var([])
|
|
peak_values = var([])
|
|
level_values = var([])
|
|
|
|
def __init__(self, channels: range, *children: Widget):
|
|
super().__init__(*children)
|
|
self.channels = channels
|
|
|
|
def watch_rms_values(self, new_values):
|
|
for channel, value in zip(self.query(Channel), new_values):
|
|
channel.rms_value = value
|
|
|
|
def watch_peak_values(self, new_values):
|
|
for channel, value in zip(self.query(Channel), new_values):
|
|
channel.peak_value = value
|
|
|
|
def watch_level_values(self, new_values):
|
|
for channel, value in zip(self.query(Channel), new_values):
|
|
channel.level_value = value
|
|
|
|
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
|
|
pass
|
|
|
|
|
|
class InputChannelRow(ChannelRow):
|
|
def compose(self) -> ComposeResult:
|
|
with HorizontalScroll():
|
|
yield Channel(self.channels[0], 'AN1', phantom=True)
|
|
yield Channel(self.channels[1], 'AN2', phantom=True, inst=True)
|
|
for n in range(0, 8):
|
|
yield Channel(channel=self.channels[n+2], label='ADAT{n}'.format(n=n+1))
|
|
|
|
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
|
|
for channel_widget, levels, mute, solo in zip(
|
|
self.query(Channel),
|
|
mixer.matrix[:self.channels[-1]],
|
|
mixer.input_mutes[:self.channels[-1]],
|
|
mixer.input_solos[:self.channels[-1]]):
|
|
channel_widget.level = levels[current_out]
|
|
channel_widget.mute = mute
|
|
channel_widget.solo = solo
|
|
|
|
|
|
class PlaybackChannelRow(ChannelRow):
|
|
LABELS = [
|
|
'AN1',
|
|
'AN2',
|
|
'PH3',
|
|
'PH4',
|
|
'ADAT1',
|
|
'ADAT2',
|
|
'ADAT3',
|
|
'ADAT4',
|
|
'ADAT5',
|
|
'ADAT6',
|
|
'ADAT7',
|
|
'ADAT8',
|
|
]
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with HorizontalScroll():
|
|
for n, label in zip(self.channels, self.LABELS):
|
|
yield Channel(channel=n, label=label)
|
|
|
|
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
|
|
for channel_widget, levels, mute, solo in zip(
|
|
self.query(Channel),
|
|
mixer.matrix[self.channels[0]:self.channels[-1]],
|
|
mixer.input_mutes[self.channels[0]:self.channels[-1]],
|
|
mixer.input_solos[self.channels[0]:self.channels[-1]]):
|
|
channel_widget.level = levels[current_out]
|
|
channel_widget.mute = mute
|
|
channel_widget.solo = solo
|
|
|
|
class OutputChannelRow(ChannelRow):
|
|
LABELS = [
|
|
'AN1',
|
|
'AN2',
|
|
'PH3',
|
|
'PH4',
|
|
'ADAT1',
|
|
'ADAT2',
|
|
'ADAT3',
|
|
'ADAT4',
|
|
'ADAT5',
|
|
'ADAT6',
|
|
'ADAT7',
|
|
'ADAT8',
|
|
]
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with HorizontalScroll():
|
|
with HorizontalScroll():
|
|
for n, label in zip(self.channels, self.LABELS):
|
|
yield Channel(channel=n, label=label, output=True)
|
|
|
|
def update_mixer_display(self, mixer: MixerMatrix, current_out: int):
|
|
for idx, channel_widget, level, mute, solo in zip(
|
|
self.channels,
|
|
self.query(Channel),
|
|
mixer.outputs,
|
|
mixer.output_mutes,
|
|
mixer.output_solos):
|
|
channel_widget.level = level
|
|
channel_widget.mute = mute
|
|
channel_widget.solo = solo
|
|
channel_widget.selected = idx == current_out
|
|
|
|
class BFStatus:
|
|
word0 = 0
|
|
word1 = 0
|
|
word2 = 0
|
|
|
|
# word 0
|
|
ch1_48v = 0
|
|
ch2_48v = 0
|
|
ch2_inst = 0
|
|
digital_out = 0 # 0 = ADAT, 1 = SPDIF
|
|
|
|
# word 1
|
|
out_1_vol = 0
|
|
out_2_vol = 0
|
|
ch1_gain = 0
|
|
ch2_gain = 0
|
|
|
|
# word 2
|
|
out_3_vol = 0
|
|
out_4_vol = 0
|
|
dim = 0
|
|
|
|
def parse(self, msg):
|
|
self.word0 = format(msg[0], '032b')
|
|
|
|
if len(msg) > 2:
|
|
self.word1 = format(msg[1], '032b')
|
|
self.word2 = format(msg[2], '032b')
|
|
self.ch1_48v = get_bitfield(msg[0], 30, 1)
|
|
self.ch2_48v = get_bitfield(msg[0], 31, 1)
|
|
self.ch2_inst = get_bitfield(msg[0], 29, 1)
|
|
self.digital_out = get_bitfield(msg[0], 28, 1)
|
|
|
|
self.out_1_vol = level_to_db(get_bitfield(msg[1], 0, 8))
|
|
self.out_2_vol = level_to_db(get_bitfield(msg[1], 9, 8))
|
|
|
|
self.ch2_gain = get_bitfield(msg[1], 24, 5) * 3 + 6
|
|
self.ch1_gain = get_bitfield(msg[1], 19, 5) * 3 + 6
|
|
self.out_3_vol = level_to_db(get_bitfield(msg[2], 14, 8))
|
|
self.out_4_vol = level_to_db(get_bitfield(msg[2], 23, 8))
|
|
self.dim = get_bitfield(msg[2], 5, 1)
|
|
|
|
def out_levels(self):
|
|
return (
|
|
self.out_1_vol,
|
|
self.out_2_vol,
|
|
self.out_3_vol,
|
|
self.out_4_vol,
|
|
)
|
|
|
|
|
|
class BFRMS:
|
|
def __init__(self):
|
|
self.input_rms = [0 for _ in range(10)]
|
|
self.playback_rms = [0 for _ in range(12)]
|
|
self.fx_in_rms = [0, 0]
|
|
self.output_rms = [0 for _ in range(12)]
|
|
self.fx_out_rms = [0, 0]
|
|
|
|
def _parse_rms(self, values, target, offset=0):
|
|
n = offset
|
|
for i in range(0, len(values), 2):
|
|
target[n] = (values[i] | (values[i + 1] << 32))
|
|
n += 1
|
|
|
|
def parse0(self, id0_msg):
|
|
self._parse_rms(id0_msg[3:23], self.input_rms)
|
|
self._parse_rms(id0_msg[23:], self.playback_rms)
|
|
|
|
def parse1(self, id1_msg):
|
|
self._parse_rms(id1_msg[:6], self.playback_rms, 9)
|
|
self._parse_rms(id1_msg[6:10], self.fx_in_rms)
|
|
self._parse_rms(id1_msg[10:34], self.output_rms)
|
|
self._parse_rms(id1_msg[34:38], self.fx_in_rms)
|
|
|
|
|
|
class BFPeak:
|
|
def __init__(self):
|
|
self.input_peaks = [0 for _ in range(10)]
|
|
self.playback_peaks = [0 for _ in range(12)]
|
|
self.fx_in_peaks = [0, 0]
|
|
self.output_peaks = [0 for _ in range(12)]
|
|
self.fx_out_peaks = [0, 0]
|
|
|
|
def parse(self, msg):
|
|
self.input_peaks = msg[:10]
|
|
self.playback_peaks = msg[10:22]
|
|
self.fx_in_peaks = msg[22:24]
|
|
self.output_peaks = msg[24:36]
|
|
self.fx_out_peaks = msg[36:38]
|
|
|
|
|
|
class Status(Widget):
|
|
status_data = reactive({})
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Pretty(self.status_data, id='status')
|
|
yield Pretty(self.status_data, id='rms')
|
|
yield Pretty(self.status_data, id='peak')
|
|
yield Log(self.status_data, id='log')
|
|
|
|
|
|
class Mixer(App):
|
|
in_port: mido.ports.BaseInput
|
|
out_port: mido.ports.BaseOutput
|
|
messages = []
|
|
messageset = set()
|
|
STATUS_MSG = mido.Message('sysex', data=[0x00, 0x20, 0x0D, 0x10, 0x10])
|
|
bf_status = BFStatus()
|
|
bf_rms = BFRMS()
|
|
bf_peak = BFPeak()
|
|
current_mix_out = 0
|
|
focused_channel = (0, 2)
|
|
|
|
INPUTS = 10
|
|
PLAYBACKS = 12
|
|
OUTPUTS = 12
|
|
BINDINGS = [
|
|
('o', 'mix_select(1)'),
|
|
('u', 'mix_select(-1)'),
|
|
('h', 'change_focus(-1, 0)'),
|
|
('j', 'change_focus(0, 1)'),
|
|
('k', 'change_focus(0, -1)'),
|
|
('l', 'change_focus(1, 0)'),
|
|
]
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
in_ports = [name for name in mido.get_input_names() if 'Babyface' in name and 'Port 2' in name]
|
|
out_ports = [name for name in mido.get_output_names() if 'Babyface' in name and 'Port 2' in name]
|
|
if in_ports:
|
|
self.in_port = mido.open_input(in_ports[0])
|
|
else:
|
|
self.in_port = mido.open_input('Mixer', virtual=True)
|
|
|
|
if out_ports:
|
|
self.out_port = mido.open_output(out_ports[0])
|
|
else:
|
|
self.out_port = mido.open_output('Mixer', virtual=True)
|
|
|
|
self.mixer = MixerMatrix(self.out_port)
|
|
|
|
def on_mount(self) -> None:
|
|
self.set_interval(.1, self.update_messages)
|
|
|
|
def update_messages(self) -> None:
|
|
self.out_port.send(self.STATUS_MSG)
|
|
for msg in self.in_port.iter_pending():
|
|
self.process_message(msg)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield InputChannelRow(channels=range(0, self.INPUTS))
|
|
yield PlaybackChannelRow(channels=range(self.INPUTS, self.PLAYBACKS + self.INPUTS))
|
|
yield OutputChannelRow(channels=range(0, self.OUTPUTS))
|
|
yield Footer()
|
|
|
|
"""
|
|
Change the mix output
|
|
"""
|
|
def action_mix_select(self, n):
|
|
self.current_mix_out = (self.current_mix_out + n) % self.OUTPUTS
|
|
self.update_mixer_display()
|
|
|
|
"""
|
|
Update focused channel. Mix output follows focus.
|
|
"""
|
|
def action_change_focus(self, dx, dy):
|
|
(x, y) = self.focused_channel
|
|
y = (y + dy) % 3
|
|
x = x + dx
|
|
if y == 0:
|
|
x = x % self.INPUTS
|
|
elif y == 1:
|
|
x = x % self.PLAYBACKS
|
|
elif y == 2:
|
|
x = x % self.OUTPUTS
|
|
self.current_mix_out = x
|
|
|
|
self.focused_channel = (x, y)
|
|
ch = self.query(ChannelRow)[y].query(Channel)[x]
|
|
self.query(ChannelRow)[y].query(Channel)[x].focus()
|
|
self.update_mixer_display()
|
|
|
|
def on_channel_phantom_toggle(self, message: Channel.PhantomToggle) -> None:
|
|
if message.channel == 0:
|
|
self.out_port.send(encode_message(3, [0x00010001 ^ self.bf_status.ch1_48v << 16]))
|
|
elif message.channel == 1:
|
|
self.out_port.send(encode_message(3, [0x00020002 ^ self.bf_status.ch2_48v << 17]))
|
|
|
|
def on_channel_inst_toggle(self, message: Channel.PhantomToggle) -> None:
|
|
if message.channel == 1:
|
|
self.out_port.send(encode_message(3, [0x00100010 ^ self.bf_status.ch2_inst << 20]))
|
|
|
|
def on_channel_move_fader(self, message: Channel.MoveFader) -> None:
|
|
if message.output:
|
|
self.mixer.set_out_level(message.channel, self.mixer.outputs[message.channel] + message.change)
|
|
else:
|
|
self.mixer.set_mix_level(message.channel, self.current_mix_out,
|
|
self.mixer.matrix[message.channel][self.current_mix_out] + message.change)
|
|
self.update_mixer_display()
|
|
|
|
def on_channel_set_fader(self, message: Channel.SetFader) -> None:
|
|
if message.output:
|
|
self.mixer.set_out_level(message.channel, message.val)
|
|
else:
|
|
self.mixer.set_mix_level(message.channel, self.current_mix_out, message.val)
|
|
self.update_mixer_display()
|
|
|
|
def on_channel_change_gain(self, message: Channel.ChangeGain) -> None:
|
|
if message.channel > 1:
|
|
return
|
|
|
|
if message.channel == 0:
|
|
gain = clamp(message.change + self.bf_status.ch1_gain, 6, 60)
|
|
elif message.channel == 1:
|
|
gain = clamp(message.change + self.bf_status.ch2_gain, 6, 60)
|
|
|
|
gain_scaled = int((gain - 6) / 3)
|
|
self.out_port.send(encode_message(4, [gain_scaled << 16 | message.channel]))
|
|
|
|
def on_channel_mute_toggle(self, message: Channel.MuteToggle) -> None:
|
|
if message.output:
|
|
self.mixer.toggle_out_mute_status(message.channel)
|
|
else:
|
|
self.mixer.toggle_mix_mute_status(message.channel)
|
|
self.update_mixer_display()
|
|
|
|
def update_mixer_display(self):
|
|
for row in self.query(ChannelRow):
|
|
row.update_mixer_display(self.mixer, self.current_mix_out)
|
|
|
|
def process_message(self, msg: mido.Message) -> None:
|
|
subid, decoded_msg = decode_message(msg)
|
|
if subid is None:
|
|
return
|
|
elif subid == 0x00:
|
|
self.bf_status.parse(decoded_msg)
|
|
self.bf_rms.parse0(decoded_msg)
|
|
elif subid == 0x01:
|
|
self.bf_rms.parse1(decoded_msg)
|
|
elif subid == 0x02:
|
|
self.bf_peak.parse(decoded_msg)
|
|
else:
|
|
self.messages.append(hex_format(decoded_msg))
|
|
self.query_one('#log').update(self.messages)
|
|
|
|
input_row = self.query_one(InputChannelRow)
|
|
input_row.rms_values = self.bf_rms.input_rms
|
|
input_row.peak_values = self.bf_peak.input_peaks
|
|
channels = input_row.query(Channel)
|
|
channels[0].phantom = self.bf_status.ch1_48v
|
|
channels[1].phantom = self.bf_status.ch2_48v
|
|
channels[1].inst = self.bf_status.ch2_inst
|
|
channels[0].gain = self.bf_status.ch1_gain
|
|
channels[1].gain = self.bf_status.ch2_gain
|
|
|
|
playback_row = self.query_one(PlaybackChannelRow)
|
|
playback_row.rms_values = self.bf_rms.playback_rms
|
|
playback_row.peak_values = self.bf_peak.playback_peaks
|
|
|
|
output_row = self.query_one(OutputChannelRow)
|
|
output_row.rms_values = self.bf_rms.output_rms
|
|
output_row.peak_values = self.bf_peak.output_peaks
|
|
|
|
dirty_faders = False
|
|
for n, mixvol in enumerate(self.bf_status.out_levels()):
|
|
if self.mixer.outputs[n] != mixvol:
|
|
self.mixer.outputs[n] = mixvol
|
|
dirty_faders = True
|
|
|
|
if dirty_faders:
|
|
self.update_mixer_display()
|
|
|
|
for row in [input_row, playback_row, output_row]:
|
|
row.mutate_reactive(ChannelRow.rms_values)
|
|
row.mutate_reactive(ChannelRow.peak_values)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app = Mixer()
|
|
app.run()
|