hinawa-maudio-bebob-cui 10.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2018 Takashi Sakamoto

import time
import signal
import sys

from hinawa_utils.misc.cui_kit import CuiKit

from hinawa_utils.bebob.maudio_unit import MaudioUnit

def _handle_target_volume(unit, args, cmd, targets_func, set_func, get_func):
14 15
    chs = (0, 1)
    ops = ('set', 'get')
16 17 18
    targets = targets_func()
    if len(args) >= 1 and args[0] in targets:
        target = args[0]
19
        if len(args) >= 2 and int(args[1]) in chs:
20
            ch = int(args[1])
21
            if len(args) >= 3 and args[2] in ops:
22 23 24 25 26 27 28 29 30 31 32
                op = args[2]
                if op == 'set' and len(args) >= 4:
                    db = float(args[3])
                    set_func(target, ch, db)
                    return True
                elif op == 'get':
                    print(get_func(target, ch))
                    return True
    print('Arguments for {0} command:'.format(cmd))
    print('  {0} TARGET CH OP [dB]'.format(cmd))
    print('    TARGET:   [{0}]'.format('|'.join(targets)))
33 34
    print('    CH:       [{0}]'.format('|'.join(chs)))
    print('    OP:       [{0}]'.format('|'.join(ops)))
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
    print('    dB:       [-128.00..128.00] if OP=set')
    return False

def handle_input_gain(unit, args):
    return _handle_target_volume(unit, args, 'input-gain',
                                 unit.protocol.get_input_labels,
                                 unit.protocol.set_input_gain,
                                 unit.protocol.get_input_gain)

def handle_aux_input(unit, args):
    return _handle_target_volume(unit, args, 'aux-input',
                                 unit.protocol.get_aux_input_labels,
                                 unit.protocol.set_aux_input,
                                 unit.protocol.get_aux_input)

def handle_output_volume(unit, args):
    return _handle_target_volume(unit, args, 'output-volume',
                                 unit.protocol.get_output_labels,
                                 unit.protocol.set_output_volume,
                                 unit.protocol.get_output_volume)

def handle_headphone_volume(unit, args):
    return _handle_target_volume(unit, args, 'headphone-volume',
                                 unit.protocol.get_headphone_labels,
                                 unit.protocol.set_headphone_volume,
                                 unit.protocol.get_headphone_volume)

def handle_input_balance(unit, args):
63 64
    chs = (0, 1)
    ops = ('set', 'get')
65 66 67
    targets = unit.protocol.get_input_balance_labels()
    if len(args) >= 1 and args[0] in targets:
        target = args[0]
68
        if len(args) >= 2 and int(args[1]) in chs:
69
            ch = int(args[1])
70
            if len(args) >= 3 and args[2] in ops:
71 72 73 74 75 76 77 78 79 80 81
                op = args[2]
                if len(args) >= 4 and op == 'set':
                    balance = float(args[3])
                    unit.protocol.set_input_balance(target, ch, balance)
                    return True
                elif op == 'get':
                    print(unit.protocol.get_input_balance(target, ch))
                    return True
    print('Arguments for input-balance command')
    print('  input-balance TARGET CH OP [BALANCE]')
    print('    TARGET: [{0}]'.format('|'.join(targets)))
82 83
    print('    CH:     [{0}]'.format('|'.join(chs)))
    print('    OP:     [{0}]'.format('|'.join(ops)))
84 85 86 87
    print('    BALANCE:[-128.0..128.0] (left-right) if OP=set')
    return False

def handle_aux_balance(unit, args):
88 89 90
    chs = (0, 1)
    ops = ('set', 'get')
    if len(args) >= 1 and int(args[0]) in chs:
91
        ch = int(args[0])
92
        if len(args) >= 2 and args[1] in ops:
93 94 95 96 97 98 99 100 101 102
            op = args[1]
            if len(args) >= 3 and op == 'set':
                balance = float(args[2])
                unit.protocol.set_aux_balance(ch, balance)
                return True
            else:
                print(unit.protocol.get_aux_balance(ch))
                return True
    print('Arguments for aux-balance command:')
    print('  aux-balance CH OP [BALANCE]')
103 104
    print('    CH:     [{0}]'.format('|'.join(chs)))
    print('    OP:     [{0}]'.format('|'.join(ops)))
105 106 107 108
    print('    BALACE: [-128.0..128.0] (left-right) if OP=set]')
    return False

def handle_aux_volume(unit, args):
109 110 111
    chs = (0, 1)
    ops = ('set', 'get')
    if len(args) > 0 and int(args[0]) in chs:
112
        ch = int(args[0])
113
        if len(args) > 1 and args[1] in ops:
114 115 116 117 118 119 120 121 122 123
            op = args[1]
            if len(args) > 2 and op == 'set':
                db = float(args[2])
                unit.protocol.set_aux_volume(ch, db)
                return True
            elif op == 'get':
                print(unit.protocol.get_aux_volume(ch))
                return True
    print('Arguments for aux-volume command:')
    print('  aux-volume CH OP [dB]')
124 125
    print('    CH:     [{0}]'.format('|'.join(chs)))
    print('    OP:     [{0}]'.format('|'.join(ops)))
126 127 128 129
    print('    dB:     [-128.0..128.0] if OP=set')
    return False

def handle_mixer_routing(unit, args):
130
    ops = ('set', 'get')
131 132 133 134 135 136
    targets = unit.protocol.get_mixer_labels()
    sources = unit.protocol.get_mixer_source_labels()
    if len(args) >= 1 and args[0] in targets:
        target = args[0]
        if len(args) >= 2 and args[1] in sources:
            source = args[1]
137
            if len(args) >= 3 and args[2] in ops:
138 139 140 141 142 143 144 145 146 147 148 149
                op = args[2]
                if len(args) >= 4 and op == 'set':
                    enable = bool(int(args[3]))
                    unit.protocol.set_mixer_routing(target, source, enable)
                    return True
                elif op == 'get':
                    print(unit.protocol.get_mixer_routing(target, source))
                    return True
    print('Arguments for mixer-routing command:')
    print('  mixer-routing TARGET SOURCE [ENABLE]')
    print('    TARGET: [{0}]'.format('|'.join(targets)))
    print('    SOURCE: [{0}]'.format('|'.join(sources)))
150
    print('    OP:     [{0}]'.format('|'.join(ops)))
151 152 153 154 155
    print('    ENABLE: [0|1]')
    return False

def _handle_target_source(unit, args, cmd, targets_func, sources_func,
                          set_func, get_func):
156
    ops = ('set', 'get')
157 158 159
    targets = targets_func()
    if len(args) >= 1 and args[0] in targets:
        target = args[0]
160
        if len(args) >= 2 and args[1] in ops:
161 162 163 164 165 166 167 168 169 170 171 172
            op = args[1]
            sources = sources_func(target)
            if len(args) >= 3 and op == 'set' and args[2] in sources:
                source = args[2]
                set_func(target, source)
                return True
            elif op == 'get':
                print(get_func(target))
                return True
    print('Arguments for {0} command:'.format(cmd))
    print('  {0} TARGET OP [SRC]'.format(cmd))
    print('    TARGET:    [{0}]'.format('|'.join(targets)))
173
    print('    OP:        [{0}]'.format('|'.join(ops)))
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    for target in targets:
        sources = sources_func(target)
        print('    SRC:       [{0}] if TARGET={1} and OP=set'.format(
                                                '|'.join(sources), target))
    return False

def handle_headphone_source(unit, args):
    return _handle_target_source(unit, args, 'headphone-source',
                                 unit.protocol.get_headphone_labels,
                                 unit.protocol.get_headphone_source_labels,
                                 unit.protocol.set_headphone_source,
                                 unit.protocol.get_headphone_source)

def handle_output_source(unit, args):
    return _handle_target_source(unit, args,  'output-source',
                                 unit.protocol.get_output_labels,
                                 unit.protocol.get_output_source_labels,
                                 unit.protocol.set_output_source,
                                 unit.protocol.get_output_source)

def handle_clock_source(unit, args):
195
    ops = ('set', 'get')
196
    sources = unit.protocol.get_clock_source_labels()
197
    if len(args) > 0 and args[0] in ops:
198 199 200 201 202 203 204 205 206 207
        op = args[0]
        if len(args) > 1 and op == 'set' and args[1] in sources:
            source = args[1]
            unit.protocol.set_clock_source(source)
            return True
        elif op == 'get':
            print(unit.protocol.get_clock_source())
            return True
    print('Arguments for clock-source command:')
    print('  clock-source OP [SRC]')
208
    print('    OP:     {0}'.format('|'.join(ops)))
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
    print('    SRC:    {0} if OP=set'.format('|'.join(sources)))
    print('  Packet streaming should be stopped.')
    return False

def handle_listen_metering(unit, args):
    # This is handled by another context.
    def handle_unix_signal(signum, frame):
        sys.exit()
    signal.signal(signal.SIGINT, handle_unix_signal)
    while 1:
        # At higher sampling rate, this causes timeout frequently.
        try:
            meters = unit.protocol.get_meters()
            for name in sorted(meters):
                print('{0}: {1:08x}'.format(name, meters[name]))
        except Exception as e:
            pass
        print('')
        time.sleep(0.1)
    return True

def handle_sampling_rate(unit, args):
231
    ops = ('set', 'get')
232
    rates = unit.protocol.get_sampling_rate_labels()
233
    if len(args) >= 1 and args[0] in ops:
234 235 236 237 238 239 240 241 242 243
        op = args[0]
        if len(args) >= 2 and op == 'set':
            rate = int(args[1])
            unit.protocol.set_sampling_rate(rate)
            return True
        elif op == 'get':
            print(unit.protocol.get_sampling_rate())
            return True
    print('Arguments for sampling-rate command:')
    print('  sampling-rate OP [RATE]')
244
    print('    OP:     {0}'.format('|'.join(ops)))
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    print('    RATE:   {0}'.format('|'.join(map(str, rates))))
    return False

cmds = {
    'input-gain':           handle_input_gain,
    'input-balance':        handle_input_balance,
    'mixer-routing':        handle_mixer_routing,
    'listen-metering':      handle_listen_metering,
    'clock-source':         handle_clock_source,
    'sampling-rate':        handle_sampling_rate,
}

fullpath = CuiKit.seek_snd_unit_path()
if fullpath:
    unit = MaudioUnit(fullpath)
    if len(unit.protocol.get_aux_input_labels()) > 0:
        cmds['aux-input'] = handle_aux_input
        cmds['aux-volume'] = handle_aux_volume
        if (hasattr(unit.protocol, 'set_aux_balance') and
            hasattr(unit.protocol, 'get_aux_balance')):
            cmds['aux-balance'] = handle_aux_balance
    if len(unit.protocol.get_output_labels()) > 0:
        cmds['output-volume'] = handle_output_volume
        cmds['output-source'] = handle_output_source
    if len(unit.protocol.get_headphone_labels()) > 0:
        cmds['headphone-volume'] = handle_headphone_volume
        cmds['headphone-source'] = handle_headphone_source
    CuiKit.dispatch_command(unit, cmds)