whisper-dictation/whisper_app/audio.py

190 lines
6.1 KiB
Python

import logging
import re
import threading
import time
import numpy as np
import sounddevice as sd
from whisper_app import app, config
log = logging.getLogger(__name__)
_HWDEV_RE = re.compile(r"\(hw:(\d+),(\d+)\)")
def audio_callback(indata, frames, time_info, status):
if app.state == app.AppState.RECORDING:
app.audio_chunks.append(indata.copy())
def resolve_device(name: str | None) -> int | None:
"""Resolve a device name to its current PortAudio index, or None for default."""
if not name:
return None
for i, d in enumerate(sd.query_devices()):
if d["max_input_channels"] > 0 and d["name"] == name:
return i
log.warning("Audio device '%s' not found, using default", name)
return None
def get_input_devices() -> list[tuple[int, str]]:
"""Return list of (index, name) for real input devices on the default host API.
On ALSA (Linux): filters out virtual/plugin devices (pulse, pipewire, jack,
spdif, etc.) and deduplicates entries with identical names.
"""
default_api = sd.query_hostapis(sd.default.hostapi)["name"]
devs = [(i, d["name"]) for i, d in enumerate(sd.query_devices())
if d["max_input_channels"] > 0
and sd.query_hostapis(d["hostapi"])["name"] == default_api]
# Detect ALSA: real hardware devices contain "(hw:X,Y)"
has_hw = any(_HWDEV_RE.search(name) for _, name in devs)
if not has_hw:
return devs
seen_names: set[str] = set()
result = []
for i, name in devs:
if not _HWDEV_RE.search(name):
continue # virtual/plugin device
if name in seen_names:
continue # exact duplicate
seen_names.add(name)
result.append((i, name))
return result
def test_device(device_name: str | None, duration: float,
on_level: callable, on_done: callable) -> None:
"""Record from device for *duration* seconds, calling on_level(float 0..1) periodically."""
device = resolve_device(device_name)
def _run():
try:
sr = config.config["sample_rate"]
block = int(sr * 0.05) # 50 ms blocks
peak = 0.0
def _cb(indata, frames, time_info, status):
nonlocal peak
level = float(np.abs(indata).max())
peak = max(peak, level)
on_level(min(level / 0.1, 1.0)) # normalize: 0.1 amplitude = 100%
with sd.InputStream(samplerate=sr, channels=1, device=device,
callback=_cb, blocksize=block):
sd.sleep(int(duration * 1000))
on_done(peak > 0.001)
except Exception as e:
log.error("Mic test failed: %s", e)
on_done(False)
threading.Thread(target=_run, daemon=True).start()
class AudioManager:
"""Manages audio stream with automatic device hotplug handling.
Polls the device list every few seconds. When devices change (USB
plug/unplug, docking station) the stream is silently restarted so
the configured (or system-default) device is used.
"""
_POLL_INTERVAL = 3 # seconds
def __init__(self):
self._stream: sd.InputStream | None = None
self._lock = threading.Lock()
self._running = False
self._monitor_thread: threading.Thread | None = None
self._last_devices: set[str] = set()
# -- public API --
def start(self):
"""Open audio stream and begin device monitoring."""
self._open_stream()
self._running = True
self._monitor_thread = threading.Thread(
target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
def stop(self):
"""Stop monitoring and close stream."""
self._running = False
with self._lock:
if self._stream:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
def restart(self):
"""Restart with (possibly changed) config. Called after settings save."""
with self._lock:
self._close_stream_locked()
self._open_stream_locked()
# -- internals --
def _open_stream(self):
with self._lock:
self._open_stream_locked()
def _open_stream_locked(self):
device_name = config.config.get("audio_device")
device = resolve_device(device_name)
sr = config.config["sample_rate"]
try:
self._stream = sd.InputStream(
samplerate=sr, channels=1, device=device,
callback=audio_callback,
)
self._stream.start()
except sd.PortAudioError:
log.warning("Audio device %s failed, falling back to default",
device_name)
self._stream = sd.InputStream(
samplerate=sr, channels=1, device=None,
callback=audio_callback,
)
self._stream.start()
self._last_devices = self._device_snapshot()
def _close_stream_locked(self):
if self._stream:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
@staticmethod
def _device_snapshot() -> set[str]:
try:
return {d["name"] for d in sd.query_devices()
if d["max_input_channels"] > 0}
except Exception:
return set()
def _monitor_loop(self):
while self._running:
time.sleep(self._POLL_INTERVAL)
if not self._running:
break
if app.state == app.AppState.RECORDING:
continue # never disrupt an active recording
current = self._device_snapshot()
if current != self._last_devices:
log.info("Audio devices changed, restarting stream")
with self._lock:
self._close_stream_locked()
self._open_stream_locked()
app.log("Audio-Gerät neu verbunden")