journal-bot/tests/test_queue.py

75 lines
2.0 KiB
Python

import json
from pathlib import Path
import pytest
from journal_bot.queue import Queue, QueueItem
@pytest.fixture
def queue(tmp_path):
return Queue(tmp_path)
def make_item(update_id=1):
return QueueItem(
update_id=update_id,
received_at="2026-06-14T14:32:17+02:00",
type="text",
text="Hello world",
)
def test_enqueue_creates_pending_file(queue, tmp_path):
queue.enqueue(make_item(1))
pending = list((tmp_path / "pending").iterdir())
assert len(pending) == 1
assert pending[0].name == "1.json"
def test_claim_moves_to_working(queue, tmp_path):
queue.enqueue(make_item(1))
item = queue.claim_next()
assert item is not None
assert item.update_id == 1
assert not (tmp_path / "pending" / "1.json").exists()
assert (tmp_path / "working" / "1.json").exists()
def test_claim_returns_none_when_empty(queue):
assert queue.claim_next() is None
def test_complete_moves_to_done(queue, tmp_path):
queue.enqueue(make_item(1))
item = queue.claim_next()
queue.complete(item)
assert (tmp_path / "done" / "1.json").exists()
assert not (tmp_path / "working" / "1.json").exists()
def test_fail_increments_attempts_and_returns_to_pending(queue, tmp_path):
queue.enqueue(make_item(1))
item = queue.claim_next()
queue.fail(item)
pending_path = tmp_path / "pending" / "1.json"
assert pending_path.exists()
data = json.loads(pending_path.read_text())
assert data["attempts"] == 1
def test_fail_after_max_attempts_moves_to_failed(queue, tmp_path):
queue.enqueue(make_item(1))
for _ in range(3):
item = queue.claim_next()
queue.fail(item)
assert (tmp_path / "failed" / "1.json").exists()
assert not (tmp_path / "pending" / "1.json").exists()
def test_race_condition_only_one_winner(queue, tmp_path):
queue.enqueue(make_item(1))
pending = tmp_path / "pending" / "1.json"
target = tmp_path / "working" / "1.json"
target.parent.mkdir(exist_ok=True)
pending.rename(target)
assert queue.claim_next() is None