feat(cli): ingest/process/both/write subcommands wiring all components

This commit is contained in:
beo3000 2026-06-15 17:36:02 +02:00
parent 2cbe19fecc
commit c3d345b6a7
5 changed files with 358 additions and 0 deletions

View File

@ -0,0 +1,94 @@
"""CLI entrypoint for journal-bot."""
import argparse
import asyncio
from datetime import date
from pathlib import Path
import sys
from .config import Config
from .queue import Queue
from .state import State
from .vault_writer import VaultWriter
from .telegram_client import TelegramClient
from .transcribe import Transcriber
from .ingest import ingest_once
from .process import process_once
from .processor_lmstudio import LMStudioProcessor
def _load_prompt() -> str:
here = Path(__file__).parent
return (here / "prompts" / "journal_system.md").read_text(encoding="utf-8")
async def cmd_ingest(cfg: Config) -> int:
tg = TelegramClient(
token=cfg.telegram_token,
allowed_user_id=cfg.allowed_user_id,
download_dir=cfg.attachments_dir,
)
transcriber = Transcriber(model_name=cfg.whisper_model, device=cfg.whisper_device)
queue = Queue(cfg.queue_dir)
state = State(cfg.state_dir)
try:
n = await ingest_once(tg, transcriber, queue, state, attachments_dir=cfg.attachments_dir)
finally:
await tg.aclose()
print(f"Ingested {n} items")
return 0
def cmd_process(cfg: Config) -> int:
processor = LMStudioProcessor(
base_url=cfg.lmstudio_url,
model=cfg.lmstudio_model,
system_prompt=_load_prompt(),
)
queue = Queue(cfg.queue_dir)
writer = VaultWriter(cfg.vault_path)
n = process_once(processor, queue, writer, vault_path=cfg.vault_path, today=date.today())
print(f"Processed {n} items")
return 0
async def cmd_both(cfg: Config) -> int:
rc = await cmd_ingest(cfg)
if rc != 0:
return rc
return cmd_process(cfg)
def cmd_write(cfg: Config, target_path: str, entry_file: Path) -> int:
"""Helper used by the Claude Code skill to write a single entry."""
writer = VaultWriter(cfg.vault_path)
body = entry_file.read_text(encoding="utf-8")
writer.append(target_path, body)
print(f"Wrote to {target_path}")
return 0
def main() -> int:
parser = argparse.ArgumentParser(prog="journal_bot")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("ingest")
sub.add_parser("process")
sub.add_parser("both")
w = sub.add_parser("write")
w.add_argument("--target-path", required=True)
w.add_argument("--entry-file", required=True, type=Path)
args = parser.parse_args()
cfg = Config()
if args.cmd == "ingest":
return asyncio.run(cmd_ingest(cfg))
if args.cmd == "process":
return cmd_process(cfg)
if args.cmd == "both":
return asyncio.run(cmd_both(cfg))
if args.cmd == "write":
return cmd_write(cfg, args.target_path, args.entry_file)
return 1
if __name__ == "__main__":
sys.exit(main())

66
src/journal_bot/ingest.py Normal file
View File

@ -0,0 +1,66 @@
from pathlib import Path
from .queue import Queue, QueueItem
from .state import State
async def ingest_once(
telegram,
transcriber,
queue: Queue,
state: State,
attachments_dir: Path,
) -> int:
"""Polls telegram once, queues new items, returns count of items enqueued."""
attachments_dir.mkdir(parents=True, exist_ok=True)
updates = await telegram.get_updates(offset=state.last_update_id, timeout=0)
count = 0
for upd in updates:
if state.has_processed(upd.update_id):
state.set_last_update_id(max(state.last_update_id, upd.update_id))
continue
local_dt = upd.date.astimezone()
stamp = local_dt.strftime("%Y%m%d-%H%M%S")
item_text = ""
raw_audio_path: str | None = None
image_embed: str | None = None
image_caption: str | None = None
if upd.kind == "text":
item_text = upd.text
elif upd.kind == "voice":
audio_dest = attachments_dir / f"journal-{stamp}.ogg"
await telegram.download_file(upd.voice_file_id, audio_dest)
res = transcriber.transcribe(audio_dest, premium_text=upd.transcribed_text)
if res.success:
item_text = res.text
else:
item_text = "\U0001F399 Nicht transkribiert"
raw_audio_path = str(audio_dest.relative_to(attachments_dir.parent))
elif upd.kind == "photo":
img_dest = attachments_dir / f"journal-{stamp}.jpg"
await telegram.download_file(upd.photo_file_id, img_dest)
rel = img_dest.relative_to(attachments_dir.parent)
image_embed = f"![[{rel.as_posix()}]]"
image_caption = upd.caption
item_text = upd.caption or ""
else:
state.set_last_update_id(max(state.last_update_id, upd.update_id))
state.mark_processed(upd.update_id)
continue
item = QueueItem(
update_id=upd.update_id,
received_at=local_dt.isoformat(),
type=upd.kind,
text=item_text,
raw_audio_path=raw_audio_path,
image_embed=image_embed,
image_caption=image_caption,
)
queue.enqueue(item)
state.mark_processed(upd.update_id)
state.set_last_update_id(max(state.last_update_id, upd.update_id))
await telegram.react_ok(upd.chat_id, upd.message_id)
count += 1
return count

View File

@ -0,0 +1,48 @@
from datetime import date, datetime
from pathlib import Path
from .context import collect_vault_context
from .queue import Queue
from .vault_writer import VaultWriter
from .processor_protocol import ProcessorInput
def process_once(processor, queue: Queue, writer: VaultWriter, vault_path: Path, today: date) -> int:
if not processor.health_check():
return 0
ctx = collect_vault_context(vault_path, today=today)
processed = 0
failed_ids: set[int] = set()
while True:
item = queue.claim_next()
if item is None:
break
if item.update_id in failed_ids:
# Already failed this run; re-queue without incrementing and stop.
queue.enqueue(item)
(queue.working / f"{item.update_id}.json").unlink(missing_ok=True)
break
try:
received_dt = datetime.fromisoformat(item.received_at)
received_time = received_dt.strftime("%H:%M")
payload = ProcessorInput(
today=ctx["today"],
weekday=ctx["weekday"],
received_time=received_time,
persons=ctx["persons"],
projects=ctx["projects"],
text=item.text,
image_embed=item.image_embed,
image_caption=item.image_caption,
image_local_path=None,
)
output = processor.process(payload)
entry = output.entry_markdown
if item.image_embed and item.image_embed not in entry:
entry = entry + "\n\n" + item.image_embed
writer.append(output.target_path, entry, clarifications=output.clarifications)
queue.complete(item)
processed += 1
except Exception:
queue.fail(item)
failed_ids.add(item.update_id)
return processed

77
tests/test_ingest.py Normal file
View File

@ -0,0 +1,77 @@
from datetime import datetime, timezone
from pathlib import Path
from journal_bot.ingest import ingest_once
from journal_bot.queue import Queue
from journal_bot.state import State
from journal_bot.telegram_client import ParsedUpdate
class FakeTelegram:
def __init__(self, updates):
self._updates = updates
self.reactions: list[tuple[int, int]] = []
self.downloads: list[str] = []
async def get_updates(self, offset, timeout=0):
return [u for u in self._updates if u.update_id > offset]
async def download_file(self, file_id, dest):
self.downloads.append(file_id)
dest.write_bytes(b"audio")
return dest
async def react_ok(self, chat_id, message_id):
self.reactions.append((chat_id, message_id))
class FakeTranscriber:
def transcribe(self, path, premium_text=None):
from journal_bot.transcribe import TranscriptionResult
return TranscriptionResult(True, "transkribiert", "whisper")
async def test_ingest_text_creates_queue_item(tmp_path):
upd = ParsedUpdate(
update_id=10, message_id=1, chat_id=42,
date=datetime(2026, 6, 14, 14, 32, tzinfo=timezone.utc),
kind="text", text="Hallo",
)
queue = Queue(tmp_path / "q")
state = State(tmp_path / "s")
tg = FakeTelegram([upd])
await ingest_once(tg, FakeTranscriber(), queue, state, attachments_dir=tmp_path / "att")
items = list((tmp_path / "q" / "pending").iterdir())
assert len(items) == 1
assert state.last_update_id == 10
assert state.has_processed(10)
assert (42, 1) in tg.reactions
async def test_ingest_voice_transcribes(tmp_path):
upd = ParsedUpdate(
update_id=20, message_id=2, chat_id=42,
date=datetime(2026, 6, 14, 14, 32, tzinfo=timezone.utc),
kind="voice", voice_file_id="VOICE",
)
queue = Queue(tmp_path / "q")
state = State(tmp_path / "s")
tg = FakeTelegram([upd])
await ingest_once(tg, FakeTranscriber(), queue, state, attachments_dir=tmp_path / "att")
import json
item_data = json.loads(next((tmp_path / "q" / "pending").iterdir()).read_text(encoding="utf-8"))
assert item_data["text"] == "transkribiert"
assert item_data["type"] == "voice"
async def test_ingest_skips_already_processed(tmp_path):
upd = ParsedUpdate(
update_id=30, message_id=3, chat_id=42,
date=datetime(2026, 6, 14, 14, 32, tzinfo=timezone.utc),
kind="text", text="Hallo",
)
queue = Queue(tmp_path / "q")
state = State(tmp_path / "s")
state.mark_processed(30)
tg = FakeTelegram([upd])
await ingest_once(tg, FakeTranscriber(), queue, state, attachments_dir=tmp_path / "att")
assert not any((tmp_path / "q" / "pending").iterdir())

73
tests/test_process.py Normal file
View File

@ -0,0 +1,73 @@
from datetime import date
from pathlib import Path
from journal_bot.process import process_once
from journal_bot.queue import Queue, QueueItem
from journal_bot.processor_protocol import ProcessorOutput
from journal_bot.vault_writer import VaultWriter
class FakeProcessor:
def __init__(self, output: ProcessorOutput | None = None, healthy=True, raises=None):
self._output = output
self._healthy = healthy
self._raises = raises
def health_check(self):
return self._healthy
def process(self, payload):
if self._raises:
raise self._raises
return self._output
def make_item(update_id=1, text="Hallo"):
return QueueItem(
update_id=update_id,
received_at="2026-06-14T14:32:17+02:00",
type="text",
text=text,
)
def test_process_writes_to_vault_and_completes(tmp_path):
vault = tmp_path / "vault"
(vault / "05 Daily Notes").mkdir(parents=True)
queue = Queue(tmp_path / "q")
queue.enqueue(make_item(1))
processor = FakeProcessor(ProcessorOutput(
target_date="2026-06-14",
target_path="05 Daily Notes/2026-06-14.md",
entry_markdown="## 14:32\nHallo",
))
writer = VaultWriter(vault)
n = process_once(processor, queue, writer, vault_path=vault, today=date(2026, 6, 14))
assert n == 1
assert (tmp_path / "q" / "done" / "1.json").exists()
assert (vault / "05 Daily Notes" / "2026-06-14.md").exists()
def test_process_skips_when_unhealthy(tmp_path):
vault = tmp_path / "vault"
(vault / "05 Daily Notes").mkdir(parents=True)
queue = Queue(tmp_path / "q")
queue.enqueue(make_item(1))
processor = FakeProcessor(healthy=False)
writer = VaultWriter(vault)
n = process_once(processor, queue, writer, vault_path=vault, today=date(2026, 6, 14))
assert n == 0
assert (tmp_path / "q" / "pending" / "1.json").exists()
def test_process_fails_item_on_error(tmp_path):
vault = tmp_path / "vault"
(vault / "05 Daily Notes").mkdir(parents=True)
queue = Queue(tmp_path / "q")
queue.enqueue(make_item(1))
processor = FakeProcessor(raises=RuntimeError("boom"))
writer = VaultWriter(vault)
process_once(processor, queue, writer, vault_path=vault, today=date(2026, 6, 14))
assert (tmp_path / "q" / "pending" / "1.json").exists()
import json
data = json.loads((tmp_path / "q" / "pending" / "1.json").read_text(encoding="utf-8"))
assert data["attempts"] == 1