From fbed5829dba5ad5d5c6c83a1d400acaa7d74533c Mon Sep 17 00:00:00 2001 From: beo3000 Date: Mon, 15 Jun 2026 16:57:03 +0200 Subject: [PATCH] feat(queue): file-based queue with atomic claim/complete/fail --- src/journal_bot/queue.py | 66 +++++++++++++++++++++++++++++++++++ tests/test_queue.py | 74 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 src/journal_bot/queue.py create mode 100644 tests/test_queue.py diff --git a/src/journal_bot/queue.py b/src/journal_bot/queue.py new file mode 100644 index 0000000..4d75c0f --- /dev/null +++ b/src/journal_bot/queue.py @@ -0,0 +1,66 @@ +import json +from pathlib import Path +from typing import Optional +from pydantic import BaseModel, Field + + +class QueueItem(BaseModel): + update_id: int + received_at: str # ISO 8601 + type: str # "text" | "voice" | "photo" + text: str = "" + raw_audio_path: Optional[str] = None + image_embed: Optional[str] = None + image_caption: Optional[str] = None + attempts: int = 0 + + +class Queue: + """File-based queue with atomic claim/complete/fail via os.rename.""" + + def __init__(self, root: Path, max_attempts: int = 3): + self.root = root + self.pending = root / "pending" + self.working = root / "working" + self.done = root / "done" + self.failed = root / "failed" + for d in (self.pending, self.working, self.done, self.failed): + d.mkdir(parents=True, exist_ok=True) + self.max_attempts = max_attempts + + def _path(self, subdir: Path, update_id: int) -> Path: + return subdir / f"{update_id}.json" + + def enqueue(self, item: QueueItem) -> None: + target = self._path(self.pending, item.update_id) + tmp = target.with_suffix(".tmp") + tmp.write_text(item.model_dump_json()) + tmp.replace(target) + + def claim_next(self) -> Optional[QueueItem]: + for src in sorted(self.pending.iterdir()): + dst = self.working / src.name + try: + src.rename(dst) + except FileNotFoundError: + continue + return QueueItem.model_validate_json(dst.read_text()) + return None + + def complete(self, item: QueueItem) -> None: + src = self._path(self.working, item.update_id) + dst = self._path(self.done, item.update_id) + src.rename(dst) + + def fail(self, item: QueueItem) -> None: + item.attempts += 1 + src = self._path(self.working, item.update_id) + if item.attempts >= self.max_attempts: + dst = self._path(self.failed, item.update_id) + else: + dst = self._path(self.pending, item.update_id) + tmp = src.with_suffix(".tmp") + tmp.write_text(item.model_dump_json()) + tmp.replace(dst) + if src.exists(): + src.unlink() diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..7be0313 --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,74 @@ +import json +from pathlib import Path +import pytest +from journal_bot.queue import Queue, QueueItem + + +@pytest.fixture +def queue(tmp_path): + return Queue(tmp_path) + + +def make_item(update_id=1): + return QueueItem( + update_id=update_id, + received_at="2026-06-14T14:32:17+02:00", + type="text", + text="Hello world", + ) + + +def test_enqueue_creates_pending_file(queue, tmp_path): + queue.enqueue(make_item(1)) + pending = list((tmp_path / "pending").iterdir()) + assert len(pending) == 1 + assert pending[0].name == "1.json" + + +def test_claim_moves_to_working(queue, tmp_path): + queue.enqueue(make_item(1)) + item = queue.claim_next() + assert item is not None + assert item.update_id == 1 + assert not (tmp_path / "pending" / "1.json").exists() + assert (tmp_path / "working" / "1.json").exists() + + +def test_claim_returns_none_when_empty(queue): + assert queue.claim_next() is None + + +def test_complete_moves_to_done(queue, tmp_path): + queue.enqueue(make_item(1)) + item = queue.claim_next() + queue.complete(item) + assert (tmp_path / "done" / "1.json").exists() + assert not (tmp_path / "working" / "1.json").exists() + + +def test_fail_increments_attempts_and_returns_to_pending(queue, tmp_path): + queue.enqueue(make_item(1)) + item = queue.claim_next() + queue.fail(item) + pending_path = tmp_path / "pending" / "1.json" + assert pending_path.exists() + data = json.loads(pending_path.read_text()) + assert data["attempts"] == 1 + + +def test_fail_after_max_attempts_moves_to_failed(queue, tmp_path): + queue.enqueue(make_item(1)) + for _ in range(3): + item = queue.claim_next() + queue.fail(item) + assert (tmp_path / "failed" / "1.json").exists() + assert not (tmp_path / "pending" / "1.json").exists() + + +def test_race_condition_only_one_winner(queue, tmp_path): + queue.enqueue(make_item(1)) + pending = tmp_path / "pending" / "1.json" + target = tmp_path / "working" / "1.json" + target.parent.mkdir(exist_ok=True) + pending.rename(target) + assert queue.claim_next() is None