From c3d345b6a7ed1d32cb63255a29718da2517b40aa Mon Sep 17 00:00:00 2001 From: beo3000 Date: Mon, 15 Jun 2026 17:36:02 +0200 Subject: [PATCH] feat(cli): ingest/process/both/write subcommands wiring all components --- src/journal_bot/__main__.py | 94 +++++++++++++++++++++++++++++++++++++ src/journal_bot/ingest.py | 66 ++++++++++++++++++++++++++ src/journal_bot/process.py | 48 +++++++++++++++++++ tests/test_ingest.py | 77 ++++++++++++++++++++++++++++++ tests/test_process.py | 73 ++++++++++++++++++++++++++++ 5 files changed, 358 insertions(+) create mode 100644 src/journal_bot/__main__.py create mode 100644 src/journal_bot/ingest.py create mode 100644 src/journal_bot/process.py create mode 100644 tests/test_ingest.py create mode 100644 tests/test_process.py diff --git a/src/journal_bot/__main__.py b/src/journal_bot/__main__.py new file mode 100644 index 0000000..9960f13 --- /dev/null +++ b/src/journal_bot/__main__.py @@ -0,0 +1,94 @@ +"""CLI entrypoint for journal-bot.""" +import argparse +import asyncio +from datetime import date +from pathlib import Path +import sys +from .config import Config +from .queue import Queue +from .state import State +from .vault_writer import VaultWriter +from .telegram_client import TelegramClient +from .transcribe import Transcriber +from .ingest import ingest_once +from .process import process_once +from .processor_lmstudio import LMStudioProcessor + + +def _load_prompt() -> str: + here = Path(__file__).parent + return (here / "prompts" / "journal_system.md").read_text(encoding="utf-8") + + +async def cmd_ingest(cfg: Config) -> int: + tg = TelegramClient( + token=cfg.telegram_token, + allowed_user_id=cfg.allowed_user_id, + download_dir=cfg.attachments_dir, + ) + transcriber = Transcriber(model_name=cfg.whisper_model, device=cfg.whisper_device) + queue = Queue(cfg.queue_dir) + state = State(cfg.state_dir) + try: + n = await ingest_once(tg, transcriber, queue, state, attachments_dir=cfg.attachments_dir) + finally: + await tg.aclose() + print(f"Ingested {n} items") + return 0 + + +def cmd_process(cfg: Config) -> int: + processor = LMStudioProcessor( + base_url=cfg.lmstudio_url, + model=cfg.lmstudio_model, + system_prompt=_load_prompt(), + ) + queue = Queue(cfg.queue_dir) + writer = VaultWriter(cfg.vault_path) + n = process_once(processor, queue, writer, vault_path=cfg.vault_path, today=date.today()) + print(f"Processed {n} items") + return 0 + + +async def cmd_both(cfg: Config) -> int: + rc = await cmd_ingest(cfg) + if rc != 0: + return rc + return cmd_process(cfg) + + +def cmd_write(cfg: Config, target_path: str, entry_file: Path) -> int: + """Helper used by the Claude Code skill to write a single entry.""" + writer = VaultWriter(cfg.vault_path) + body = entry_file.read_text(encoding="utf-8") + writer.append(target_path, body) + print(f"Wrote to {target_path}") + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(prog="journal_bot") + sub = parser.add_subparsers(dest="cmd", required=True) + sub.add_parser("ingest") + sub.add_parser("process") + sub.add_parser("both") + w = sub.add_parser("write") + w.add_argument("--target-path", required=True) + w.add_argument("--entry-file", required=True, type=Path) + args = parser.parse_args() + + cfg = Config() + + if args.cmd == "ingest": + return asyncio.run(cmd_ingest(cfg)) + if args.cmd == "process": + return cmd_process(cfg) + if args.cmd == "both": + return asyncio.run(cmd_both(cfg)) + if args.cmd == "write": + return cmd_write(cfg, args.target_path, args.entry_file) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/journal_bot/ingest.py b/src/journal_bot/ingest.py new file mode 100644 index 0000000..d94c9ae --- /dev/null +++ b/src/journal_bot/ingest.py @@ -0,0 +1,66 @@ +from pathlib import Path +from .queue import Queue, QueueItem +from .state import State + + +async def ingest_once( + telegram, + transcriber, + queue: Queue, + state: State, + attachments_dir: Path, +) -> int: + """Polls telegram once, queues new items, returns count of items enqueued.""" + attachments_dir.mkdir(parents=True, exist_ok=True) + updates = await telegram.get_updates(offset=state.last_update_id, timeout=0) + count = 0 + for upd in updates: + if state.has_processed(upd.update_id): + state.set_last_update_id(max(state.last_update_id, upd.update_id)) + continue + + local_dt = upd.date.astimezone() + stamp = local_dt.strftime("%Y%m%d-%H%M%S") + item_text = "" + raw_audio_path: str | None = None + image_embed: str | None = None + image_caption: str | None = None + + if upd.kind == "text": + item_text = upd.text + elif upd.kind == "voice": + audio_dest = attachments_dir / f"journal-{stamp}.ogg" + await telegram.download_file(upd.voice_file_id, audio_dest) + res = transcriber.transcribe(audio_dest, premium_text=upd.transcribed_text) + if res.success: + item_text = res.text + else: + item_text = "\U0001F399️ Nicht transkribiert" + raw_audio_path = str(audio_dest.relative_to(attachments_dir.parent)) + elif upd.kind == "photo": + img_dest = attachments_dir / f"journal-{stamp}.jpg" + await telegram.download_file(upd.photo_file_id, img_dest) + rel = img_dest.relative_to(attachments_dir.parent) + image_embed = f"![[{rel.as_posix()}]]" + image_caption = upd.caption + item_text = upd.caption or "" + else: + state.set_last_update_id(max(state.last_update_id, upd.update_id)) + state.mark_processed(upd.update_id) + continue + + item = QueueItem( + update_id=upd.update_id, + received_at=local_dt.isoformat(), + type=upd.kind, + text=item_text, + raw_audio_path=raw_audio_path, + image_embed=image_embed, + image_caption=image_caption, + ) + queue.enqueue(item) + state.mark_processed(upd.update_id) + state.set_last_update_id(max(state.last_update_id, upd.update_id)) + await telegram.react_ok(upd.chat_id, upd.message_id) + count += 1 + return count diff --git a/src/journal_bot/process.py b/src/journal_bot/process.py new file mode 100644 index 0000000..7af3921 --- /dev/null +++ b/src/journal_bot/process.py @@ -0,0 +1,48 @@ +from datetime import date, datetime +from pathlib import Path +from .context import collect_vault_context +from .queue import Queue +from .vault_writer import VaultWriter +from .processor_protocol import ProcessorInput + + +def process_once(processor, queue: Queue, writer: VaultWriter, vault_path: Path, today: date) -> int: + if not processor.health_check(): + return 0 + ctx = collect_vault_context(vault_path, today=today) + processed = 0 + failed_ids: set[int] = set() + while True: + item = queue.claim_next() + if item is None: + break + if item.update_id in failed_ids: + # Already failed this run; re-queue without incrementing and stop. + queue.enqueue(item) + (queue.working / f"{item.update_id}.json").unlink(missing_ok=True) + break + try: + received_dt = datetime.fromisoformat(item.received_at) + received_time = received_dt.strftime("%H:%M") + payload = ProcessorInput( + today=ctx["today"], + weekday=ctx["weekday"], + received_time=received_time, + persons=ctx["persons"], + projects=ctx["projects"], + text=item.text, + image_embed=item.image_embed, + image_caption=item.image_caption, + image_local_path=None, + ) + output = processor.process(payload) + entry = output.entry_markdown + if item.image_embed and item.image_embed not in entry: + entry = entry + "\n\n" + item.image_embed + writer.append(output.target_path, entry, clarifications=output.clarifications) + queue.complete(item) + processed += 1 + except Exception: + queue.fail(item) + failed_ids.add(item.update_id) + return processed diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..f28811a --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,77 @@ +from datetime import datetime, timezone +from pathlib import Path +from journal_bot.ingest import ingest_once +from journal_bot.queue import Queue +from journal_bot.state import State +from journal_bot.telegram_client import ParsedUpdate + + +class FakeTelegram: + def __init__(self, updates): + self._updates = updates + self.reactions: list[tuple[int, int]] = [] + self.downloads: list[str] = [] + + async def get_updates(self, offset, timeout=0): + return [u for u in self._updates if u.update_id > offset] + + async def download_file(self, file_id, dest): + self.downloads.append(file_id) + dest.write_bytes(b"audio") + return dest + + async def react_ok(self, chat_id, message_id): + self.reactions.append((chat_id, message_id)) + + +class FakeTranscriber: + def transcribe(self, path, premium_text=None): + from journal_bot.transcribe import TranscriptionResult + return TranscriptionResult(True, "transkribiert", "whisper") + + +async def test_ingest_text_creates_queue_item(tmp_path): + upd = ParsedUpdate( + update_id=10, message_id=1, chat_id=42, + date=datetime(2026, 6, 14, 14, 32, tzinfo=timezone.utc), + kind="text", text="Hallo", + ) + queue = Queue(tmp_path / "q") + state = State(tmp_path / "s") + tg = FakeTelegram([upd]) + await ingest_once(tg, FakeTranscriber(), queue, state, attachments_dir=tmp_path / "att") + items = list((tmp_path / "q" / "pending").iterdir()) + assert len(items) == 1 + assert state.last_update_id == 10 + assert state.has_processed(10) + assert (42, 1) in tg.reactions + + +async def test_ingest_voice_transcribes(tmp_path): + upd = ParsedUpdate( + update_id=20, message_id=2, chat_id=42, + date=datetime(2026, 6, 14, 14, 32, tzinfo=timezone.utc), + kind="voice", voice_file_id="VOICE", + ) + queue = Queue(tmp_path / "q") + state = State(tmp_path / "s") + tg = FakeTelegram([upd]) + await ingest_once(tg, FakeTranscriber(), queue, state, attachments_dir=tmp_path / "att") + import json + item_data = json.loads(next((tmp_path / "q" / "pending").iterdir()).read_text(encoding="utf-8")) + assert item_data["text"] == "transkribiert" + assert item_data["type"] == "voice" + + +async def test_ingest_skips_already_processed(tmp_path): + upd = ParsedUpdate( + update_id=30, message_id=3, chat_id=42, + date=datetime(2026, 6, 14, 14, 32, tzinfo=timezone.utc), + kind="text", text="Hallo", + ) + queue = Queue(tmp_path / "q") + state = State(tmp_path / "s") + state.mark_processed(30) + tg = FakeTelegram([upd]) + await ingest_once(tg, FakeTranscriber(), queue, state, attachments_dir=tmp_path / "att") + assert not any((tmp_path / "q" / "pending").iterdir()) diff --git a/tests/test_process.py b/tests/test_process.py new file mode 100644 index 0000000..fb38ed7 --- /dev/null +++ b/tests/test_process.py @@ -0,0 +1,73 @@ +from datetime import date +from pathlib import Path +from journal_bot.process import process_once +from journal_bot.queue import Queue, QueueItem +from journal_bot.processor_protocol import ProcessorOutput +from journal_bot.vault_writer import VaultWriter + + +class FakeProcessor: + def __init__(self, output: ProcessorOutput | None = None, healthy=True, raises=None): + self._output = output + self._healthy = healthy + self._raises = raises + + def health_check(self): + return self._healthy + + def process(self, payload): + if self._raises: + raise self._raises + return self._output + + +def make_item(update_id=1, text="Hallo"): + return QueueItem( + update_id=update_id, + received_at="2026-06-14T14:32:17+02:00", + type="text", + text=text, + ) + + +def test_process_writes_to_vault_and_completes(tmp_path): + vault = tmp_path / "vault" + (vault / "05 Daily Notes").mkdir(parents=True) + queue = Queue(tmp_path / "q") + queue.enqueue(make_item(1)) + processor = FakeProcessor(ProcessorOutput( + target_date="2026-06-14", + target_path="05 Daily Notes/2026-06-14.md", + entry_markdown="## 14:32\nHallo", + )) + writer = VaultWriter(vault) + n = process_once(processor, queue, writer, vault_path=vault, today=date(2026, 6, 14)) + assert n == 1 + assert (tmp_path / "q" / "done" / "1.json").exists() + assert (vault / "05 Daily Notes" / "2026-06-14.md").exists() + + +def test_process_skips_when_unhealthy(tmp_path): + vault = tmp_path / "vault" + (vault / "05 Daily Notes").mkdir(parents=True) + queue = Queue(tmp_path / "q") + queue.enqueue(make_item(1)) + processor = FakeProcessor(healthy=False) + writer = VaultWriter(vault) + n = process_once(processor, queue, writer, vault_path=vault, today=date(2026, 6, 14)) + assert n == 0 + assert (tmp_path / "q" / "pending" / "1.json").exists() + + +def test_process_fails_item_on_error(tmp_path): + vault = tmp_path / "vault" + (vault / "05 Daily Notes").mkdir(parents=True) + queue = Queue(tmp_path / "q") + queue.enqueue(make_item(1)) + processor = FakeProcessor(raises=RuntimeError("boom")) + writer = VaultWriter(vault) + process_once(processor, queue, writer, vault_path=vault, today=date(2026, 6, 14)) + assert (tmp_path / "q" / "pending" / "1.json").exists() + import json + data = json.loads((tmp_path / "q" / "pending" / "1.json").read_text(encoding="utf-8")) + assert data["attempts"] == 1