feat(queue): file-based queue with atomic claim/complete/fail
This commit is contained in:
parent
16b75e212a
commit
fbed5829db
|
|
@ -0,0 +1,66 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class QueueItem(BaseModel):
|
||||
update_id: int
|
||||
received_at: str # ISO 8601
|
||||
type: str # "text" | "voice" | "photo"
|
||||
text: str = ""
|
||||
raw_audio_path: Optional[str] = None
|
||||
image_embed: Optional[str] = None
|
||||
image_caption: Optional[str] = None
|
||||
attempts: int = 0
|
||||
|
||||
|
||||
class Queue:
|
||||
"""File-based queue with atomic claim/complete/fail via os.rename."""
|
||||
|
||||
def __init__(self, root: Path, max_attempts: int = 3):
|
||||
self.root = root
|
||||
self.pending = root / "pending"
|
||||
self.working = root / "working"
|
||||
self.done = root / "done"
|
||||
self.failed = root / "failed"
|
||||
for d in (self.pending, self.working, self.done, self.failed):
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
self.max_attempts = max_attempts
|
||||
|
||||
def _path(self, subdir: Path, update_id: int) -> Path:
|
||||
return subdir / f"{update_id}.json"
|
||||
|
||||
def enqueue(self, item: QueueItem) -> None:
|
||||
target = self._path(self.pending, item.update_id)
|
||||
tmp = target.with_suffix(".tmp")
|
||||
tmp.write_text(item.model_dump_json())
|
||||
tmp.replace(target)
|
||||
|
||||
def claim_next(self) -> Optional[QueueItem]:
|
||||
for src in sorted(self.pending.iterdir()):
|
||||
dst = self.working / src.name
|
||||
try:
|
||||
src.rename(dst)
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
return QueueItem.model_validate_json(dst.read_text())
|
||||
return None
|
||||
|
||||
def complete(self, item: QueueItem) -> None:
|
||||
src = self._path(self.working, item.update_id)
|
||||
dst = self._path(self.done, item.update_id)
|
||||
src.rename(dst)
|
||||
|
||||
def fail(self, item: QueueItem) -> None:
|
||||
item.attempts += 1
|
||||
src = self._path(self.working, item.update_id)
|
||||
if item.attempts >= self.max_attempts:
|
||||
dst = self._path(self.failed, item.update_id)
|
||||
else:
|
||||
dst = self._path(self.pending, item.update_id)
|
||||
tmp = src.with_suffix(".tmp")
|
||||
tmp.write_text(item.model_dump_json())
|
||||
tmp.replace(dst)
|
||||
if src.exists():
|
||||
src.unlink()
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
from journal_bot.queue import Queue, QueueItem
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def queue(tmp_path):
|
||||
return Queue(tmp_path)
|
||||
|
||||
|
||||
def make_item(update_id=1):
|
||||
return QueueItem(
|
||||
update_id=update_id,
|
||||
received_at="2026-06-14T14:32:17+02:00",
|
||||
type="text",
|
||||
text="Hello world",
|
||||
)
|
||||
|
||||
|
||||
def test_enqueue_creates_pending_file(queue, tmp_path):
|
||||
queue.enqueue(make_item(1))
|
||||
pending = list((tmp_path / "pending").iterdir())
|
||||
assert len(pending) == 1
|
||||
assert pending[0].name == "1.json"
|
||||
|
||||
|
||||
def test_claim_moves_to_working(queue, tmp_path):
|
||||
queue.enqueue(make_item(1))
|
||||
item = queue.claim_next()
|
||||
assert item is not None
|
||||
assert item.update_id == 1
|
||||
assert not (tmp_path / "pending" / "1.json").exists()
|
||||
assert (tmp_path / "working" / "1.json").exists()
|
||||
|
||||
|
||||
def test_claim_returns_none_when_empty(queue):
|
||||
assert queue.claim_next() is None
|
||||
|
||||
|
||||
def test_complete_moves_to_done(queue, tmp_path):
|
||||
queue.enqueue(make_item(1))
|
||||
item = queue.claim_next()
|
||||
queue.complete(item)
|
||||
assert (tmp_path / "done" / "1.json").exists()
|
||||
assert not (tmp_path / "working" / "1.json").exists()
|
||||
|
||||
|
||||
def test_fail_increments_attempts_and_returns_to_pending(queue, tmp_path):
|
||||
queue.enqueue(make_item(1))
|
||||
item = queue.claim_next()
|
||||
queue.fail(item)
|
||||
pending_path = tmp_path / "pending" / "1.json"
|
||||
assert pending_path.exists()
|
||||
data = json.loads(pending_path.read_text())
|
||||
assert data["attempts"] == 1
|
||||
|
||||
|
||||
def test_fail_after_max_attempts_moves_to_failed(queue, tmp_path):
|
||||
queue.enqueue(make_item(1))
|
||||
for _ in range(3):
|
||||
item = queue.claim_next()
|
||||
queue.fail(item)
|
||||
assert (tmp_path / "failed" / "1.json").exists()
|
||||
assert not (tmp_path / "pending" / "1.json").exists()
|
||||
|
||||
|
||||
def test_race_condition_only_one_winner(queue, tmp_path):
|
||||
queue.enqueue(make_item(1))
|
||||
pending = tmp_path / "pending" / "1.json"
|
||||
target = tmp_path / "working" / "1.json"
|
||||
target.parent.mkdir(exist_ok=True)
|
||||
pending.rename(target)
|
||||
assert queue.claim_next() is None
|
||||
Loading…
Reference in New Issue