190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
import logging
|
|
import re
|
|
import threading
|
|
import time
|
|
|
|
import numpy as np
|
|
import sounddevice as sd
|
|
|
|
from whisper_app import app, config
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_HWDEV_RE = re.compile(r"\(hw:(\d+),(\d+)\)")
|
|
|
|
|
|
def audio_callback(indata, frames, time_info, status):
|
|
if app.state == app.AppState.RECORDING:
|
|
app.audio_chunks.append(indata.copy())
|
|
|
|
|
|
def resolve_device(name: str | None) -> int | None:
|
|
"""Resolve a device name to its current PortAudio index, or None for default."""
|
|
if not name:
|
|
return None
|
|
for i, d in enumerate(sd.query_devices()):
|
|
if d["max_input_channels"] > 0 and d["name"] == name:
|
|
return i
|
|
log.warning("Audio device '%s' not found, using default", name)
|
|
return None
|
|
|
|
|
|
def get_input_devices() -> list[tuple[int, str]]:
|
|
"""Return list of (index, name) for real input devices on the default host API.
|
|
|
|
On ALSA (Linux): filters out virtual/plugin devices (pulse, pipewire, jack,
|
|
spdif, etc.) and deduplicates entries with identical names.
|
|
"""
|
|
default_api = sd.query_hostapis(sd.default.hostapi)["name"]
|
|
devs = [(i, d["name"]) for i, d in enumerate(sd.query_devices())
|
|
if d["max_input_channels"] > 0
|
|
and sd.query_hostapis(d["hostapi"])["name"] == default_api]
|
|
|
|
# Detect ALSA: real hardware devices contain "(hw:X,Y)"
|
|
has_hw = any(_HWDEV_RE.search(name) for _, name in devs)
|
|
if not has_hw:
|
|
return devs
|
|
|
|
seen_names: set[str] = set()
|
|
result = []
|
|
for i, name in devs:
|
|
if not _HWDEV_RE.search(name):
|
|
continue # virtual/plugin device
|
|
if name in seen_names:
|
|
continue # exact duplicate
|
|
seen_names.add(name)
|
|
result.append((i, name))
|
|
return result
|
|
|
|
|
|
def test_device(device_name: str | None, duration: float,
|
|
on_level: callable, on_done: callable) -> None:
|
|
"""Record from device for *duration* seconds, calling on_level(float 0..1) periodically."""
|
|
device = resolve_device(device_name)
|
|
|
|
def _run():
|
|
try:
|
|
sr = config.config["sample_rate"]
|
|
block = int(sr * 0.05) # 50 ms blocks
|
|
peak = 0.0
|
|
|
|
def _cb(indata, frames, time_info, status):
|
|
nonlocal peak
|
|
level = float(np.abs(indata).max())
|
|
peak = max(peak, level)
|
|
on_level(min(level / 0.1, 1.0)) # normalize: 0.1 amplitude = 100%
|
|
|
|
with sd.InputStream(samplerate=sr, channels=1, device=device,
|
|
callback=_cb, blocksize=block):
|
|
sd.sleep(int(duration * 1000))
|
|
on_done(peak > 0.001)
|
|
except Exception as e:
|
|
log.error("Mic test failed: %s", e)
|
|
on_done(False)
|
|
|
|
threading.Thread(target=_run, daemon=True).start()
|
|
|
|
|
|
class AudioManager:
|
|
"""Manages audio stream with automatic device hotplug handling.
|
|
|
|
Polls the device list every few seconds. When devices change (USB
|
|
plug/unplug, docking station) the stream is silently restarted so
|
|
the configured (or system-default) device is used.
|
|
"""
|
|
|
|
_POLL_INTERVAL = 3 # seconds
|
|
|
|
def __init__(self):
|
|
self._stream: sd.InputStream | None = None
|
|
self._lock = threading.Lock()
|
|
self._running = False
|
|
self._monitor_thread: threading.Thread | None = None
|
|
self._last_devices: set[str] = set()
|
|
|
|
# -- public API --
|
|
|
|
def start(self):
|
|
"""Open audio stream and begin device monitoring."""
|
|
self._open_stream()
|
|
self._running = True
|
|
self._monitor_thread = threading.Thread(
|
|
target=self._monitor_loop, daemon=True)
|
|
self._monitor_thread.start()
|
|
|
|
def stop(self):
|
|
"""Stop monitoring and close stream."""
|
|
self._running = False
|
|
with self._lock:
|
|
if self._stream:
|
|
try:
|
|
self._stream.stop()
|
|
self._stream.close()
|
|
except Exception:
|
|
pass
|
|
self._stream = None
|
|
|
|
def restart(self):
|
|
"""Restart with (possibly changed) config. Called after settings save."""
|
|
with self._lock:
|
|
self._close_stream_locked()
|
|
self._open_stream_locked()
|
|
|
|
# -- internals --
|
|
|
|
def _open_stream(self):
|
|
with self._lock:
|
|
self._open_stream_locked()
|
|
|
|
def _open_stream_locked(self):
|
|
device_name = config.config.get("audio_device")
|
|
device = resolve_device(device_name)
|
|
sr = config.config["sample_rate"]
|
|
try:
|
|
self._stream = sd.InputStream(
|
|
samplerate=sr, channels=1, device=device,
|
|
callback=audio_callback,
|
|
)
|
|
self._stream.start()
|
|
except sd.PortAudioError:
|
|
log.warning("Audio device %s failed, falling back to default",
|
|
device_name)
|
|
self._stream = sd.InputStream(
|
|
samplerate=sr, channels=1, device=None,
|
|
callback=audio_callback,
|
|
)
|
|
self._stream.start()
|
|
self._last_devices = self._device_snapshot()
|
|
|
|
def _close_stream_locked(self):
|
|
if self._stream:
|
|
try:
|
|
self._stream.stop()
|
|
self._stream.close()
|
|
except Exception:
|
|
pass
|
|
self._stream = None
|
|
|
|
@staticmethod
|
|
def _device_snapshot() -> set[str]:
|
|
try:
|
|
return {d["name"] for d in sd.query_devices()
|
|
if d["max_input_channels"] > 0}
|
|
except Exception:
|
|
return set()
|
|
|
|
def _monitor_loop(self):
|
|
while self._running:
|
|
time.sleep(self._POLL_INTERVAL)
|
|
if not self._running:
|
|
break
|
|
if app.state == app.AppState.RECORDING:
|
|
continue # never disrupt an active recording
|
|
current = self._device_snapshot()
|
|
if current != self._last_devices:
|
|
log.info("Audio devices changed, restarting stream")
|
|
with self._lock:
|
|
self._close_stream_locked()
|
|
self._open_stream_locked()
|
|
app.log("Audio-Gerät neu verbunden")
|