feat(core): implement services for issue #8 #25
160
core/services.py
Normal file
160
core/services.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""Business logic services for JIGAIDO bounty tracker.
|
||||
|
||||
These services orchestrate operations through storage ports.
|
||||
No implementation details - pure orchestration layer.
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
|
||||
from core.ports import RoomStorage, TrackingStorage
|
||||
|
||||
|
||||
class RoomBountyService:
|
||||
"""Service for managing bounties within a room.
|
||||
|
||||
Orchestrates bounty operations through a RoomStorage port.
|
||||
|
||||
A room is identified by room_id:
|
||||
- Negative room_id: Telegram group
|
||||
- Positive room_id: DM/personal context
|
||||
|
||||
This single service handles both group and personal bounties.
|
||||
"""
|
||||
|
||||
def __init__(self, storage: RoomStorage):
|
||||
self._storage = storage
|
||||
|
||||
def add_bounty(
|
||||
self,
|
||||
room_id: int,
|
||||
user_id: int,
|
||||
text: Optional[str] = None,
|
||||
link: Optional[str] = None,
|
||||
due_date_ts: Optional[int] = None,
|
||||
) -> Bounty:
|
||||
"""Add a new bounty to a room. Returns the created bounty."""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
|
||||
|
||||
bounty = Bounty(
|
||||
id=room_data.next_id,
|
||||
text=text,
|
||||
link=link,
|
||||
due_date_ts=due_date_ts,
|
||||
created_at=int(time.time()),
|
||||
created_by_user_id=user_id,
|
||||
)
|
||||
|
||||
room_data.bounties.append(bounty)
|
||||
room_data.next_id += 1
|
||||
self._storage.save(room_data)
|
||||
|
||||
return bounty
|
||||
|
||||
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
||||
"""Get a bounty by ID. Returns None if not found."""
|
||||
return self._storage.get_bounty(room_id, bounty_id)
|
||||
|
||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||
"""List all bounties in a room. Returns empty list if room not found."""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return []
|
||||
return room_data.bounties.copy()
|
||||
|
||||
def update_bounty(
|
||||
self,
|
||||
room_id: int,
|
||||
bounty_id: int,
|
||||
user_id: int,
|
||||
text: Optional[str] = None,
|
||||
link: Optional[str] = None,
|
||||
due_date_ts: Optional[int] = None,
|
||||
clear_link: bool = False,
|
||||
clear_due: bool = False,
|
||||
) -> bool:
|
||||
"""Update a bounty. Only creator can update. Raises PermissionError if not creator."""
|
||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||
if bounty is None:
|
||||
return False
|
||||
|
||||
if bounty.created_by_user_id != user_id:
|
||||
raise PermissionError("Only the creator can edit this bounty.")
|
||||
|
||||
updated = Bounty(
|
||||
id=bounty.id,
|
||||
text=text if text is not None else bounty.text,
|
||||
link=None if clear_link else (link if link is not None else bounty.link),
|
||||
due_date_ts=None
|
||||
if clear_due
|
||||
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
|
||||
created_at=bounty.created_at,
|
||||
created_by_user_id=bounty.created_by_user_id,
|
||||
)
|
||||
|
||||
self._storage.update_bounty(room_id, updated)
|
||||
return True
|
||||
|
||||
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
|
||||
"""Delete a bounty. Only creator can delete. Raises PermissionError if not creator."""
|
||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||
if bounty is None:
|
||||
return False
|
||||
|
||||
if bounty.created_by_user_id != user_id:
|
||||
raise PermissionError("Only the creator can delete this bounty.")
|
||||
|
||||
self._storage.delete_bounty(room_id, bounty_id)
|
||||
return True
|
||||
|
||||
|
||||
class TrackingService:
|
||||
"""Service for managing bounty tracking.
|
||||
|
||||
Orchestrates tracking operations through a TrackingStorage port.
|
||||
"""
|
||||
|
||||
def __init__(self, storage: TrackingStorage, room_storage: RoomStorage):
|
||||
self._storage = storage
|
||||
self._room_storage = room_storage
|
||||
|
||||
def track_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
|
||||
"""Start tracking a bounty. Verifies bounty exists first. Raises ValueError if not found."""
|
||||
bounty = self._room_storage.get_bounty(room_id, bounty_id)
|
||||
if bounty is None:
|
||||
raise ValueError("Bounty not found.")
|
||||
|
||||
tracking_data = self._storage.load(room_id, user_id)
|
||||
if tracking_data is None:
|
||||
tracking_data = TrackingData(room_id=room_id, user_id=user_id, tracked=[])
|
||||
|
||||
for tracked in tracking_data.tracked:
|
||||
if tracked.bounty_id == bounty_id:
|
||||
return False
|
||||
|
||||
tracked = TrackedBounty(bounty_id=bounty_id, created_at=int(time.time()))
|
||||
self._storage.track_bounty(room_id, user_id, tracked)
|
||||
return True
|
||||
|
||||
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
|
||||
"""Stop tracking a bounty. Returns True if was tracking, False if not found."""
|
||||
tracking_data = self._storage.load(room_id, user_id)
|
||||
if tracking_data is None:
|
||||
return False
|
||||
|
||||
for tracked in tracking_data.tracked:
|
||||
if tracked.bounty_id == bounty_id:
|
||||
self._storage.untrack_bounty(room_id, user_id, bounty_id)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def list_tracked(self, room_id: int, user_id: int) -> list[TrackedBounty]:
|
||||
"""List all tracked bounties for a user in a room. Returns empty list if not tracking."""
|
||||
tracking_data = self._storage.load(room_id, user_id)
|
||||
if tracking_data is None:
|
||||
return []
|
||||
return tracking_data.tracked.copy()
|
||||
198
tests/test_services.py
Normal file
198
tests/test_services.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""Tests for core/services.py — business logic services."""
|
||||
|
||||
import pytest
|
||||
|
||||
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
|
||||
from core.services import RoomBountyService, TrackingService
|
||||
|
||||
|
||||
class MockRoomStorage:
|
||||
"""Mock implementation of RoomStorage for testing."""
|
||||
|
||||
def __init__(self):
|
||||
self._rooms: dict[int, RoomData] = {}
|
||||
|
||||
def load(self, room_id: int) -> RoomData | None:
|
||||
return self._rooms.get(room_id)
|
||||
|
||||
def save(self, room_data: RoomData) -> None:
|
||||
self._rooms[room_data.room_id] = room_data
|
||||
|
||||
def add_bounty(self, room_id: int, bounty: Bounty) -> None:
|
||||
if room_id not in self._rooms:
|
||||
self._rooms[room_id] = RoomData(room_id=room_id, bounties=[], next_id=1)
|
||||
self._rooms[room_id].bounties.append(bounty)
|
||||
if bounty.id >= self._rooms[room_id].next_id:
|
||||
self._rooms[room_id].next_id = bounty.id + 1
|
||||
|
||||
def update_bounty(self, room_id: int, bounty: Bounty) -> None:
|
||||
if room_id in self._rooms:
|
||||
for i, b in enumerate(self._rooms[room_id].bounties):
|
||||
if b.id == bounty.id:
|
||||
self._rooms[room_id].bounties[i] = bounty
|
||||
break
|
||||
|
||||
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
|
||||
if room_id in self._rooms:
|
||||
self._rooms[room_id].bounties = [
|
||||
b for b in self._rooms[room_id].bounties if b.id != bounty_id
|
||||
]
|
||||
|
||||
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
||||
if room_id in self._rooms:
|
||||
for b in self._rooms[room_id].bounties:
|
||||
if b.id == bounty_id:
|
||||
return b
|
||||
return None
|
||||
|
||||
|
||||
class MockTrackingStorage:
|
||||
"""Mock implementation of TrackingStorage for testing."""
|
||||
|
||||
def __init__(self):
|
||||
self._tracking: dict[tuple[int, int], TrackingData] = {}
|
||||
|
||||
def load(self, room_id: int, user_id: int) -> TrackingData | None:
|
||||
return self._tracking.get((room_id, user_id))
|
||||
|
||||
def save(self, tracking_data: TrackingData) -> None:
|
||||
self._tracking[(tracking_data.room_id, tracking_data.user_id)] = tracking_data
|
||||
|
||||
def track_bounty(self, room_id: int, user_id: int, tracked: TrackedBounty) -> None:
|
||||
key = (room_id, user_id)
|
||||
if key not in self._tracking:
|
||||
self._tracking[key] = TrackingData(
|
||||
room_id=room_id, user_id=user_id, tracked=[]
|
||||
)
|
||||
self._tracking[key].tracked.append(tracked)
|
||||
|
||||
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> None:
|
||||
key = (room_id, user_id)
|
||||
if key in self._tracking:
|
||||
self._tracking[key].tracked = [
|
||||
t for t in self._tracking[key].tracked if t.bounty_id != bounty_id
|
||||
]
|
||||
|
||||
|
||||
class TestRoomBountyService:
|
||||
def test_create_bounty(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
bounty = service.create_bounty(
|
||||
room_id=-1001,
|
||||
text="Test bounty",
|
||||
link="https://example.com",
|
||||
due_date_ts=1234567890,
|
||||
created_by_user_id=123,
|
||||
)
|
||||
assert bounty.id == 1
|
||||
assert bounty.text == "Test bounty"
|
||||
assert bounty.link == "https://example.com"
|
||||
assert bounty.created_by_user_id == 123
|
||||
|
||||
def test_create_bounty_increments_id(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
b1 = service.create_bounty(-1001, "B1", None, None, 123)
|
||||
b2 = service.create_bounty(-1001, "B2", None, None, 123)
|
||||
assert b2.id == 2
|
||||
|
||||
def test_get_bounty(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
created = service.create_bounty(-1001, "Test", None, None, 123)
|
||||
result = service.get_bounty(-1001, created.id)
|
||||
assert result is not None
|
||||
assert result.text == "Test"
|
||||
|
||||
def test_get_bounty_not_found(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
result = service.get_bounty(-1001, 999)
|
||||
assert result is None
|
||||
|
||||
def test_list_bounties(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
service.create_bounty(-1001, "B1", None, None, 123)
|
||||
service.create_bounty(-1001, "B2", None, None, 123)
|
||||
bounties = service.list_bounties(-1001)
|
||||
assert len(bounties) == 2
|
||||
|
||||
def test_list_bounties_empty_room(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
bounties = service.list_bounties(-1001)
|
||||
assert bounties == []
|
||||
|
||||
def test_update_bounty(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
created = service.create_bounty(-1001, "Original", None, None, 123)
|
||||
result = service.update_bounty(-1001, created.id, text="Updated")
|
||||
assert result is True
|
||||
updated = service.get_bounty(-1001, created.id)
|
||||
assert updated is not None
|
||||
assert updated.text == "Updated"
|
||||
|
||||
def test_update_bounty_not_found(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
result = service.update_bounty(-1001, 999, text="Updated")
|
||||
assert result is False
|
||||
|
||||
def test_delete_bounty(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
created = service.create_bounty(-1001, "Test", None, None, 123)
|
||||
result = service.delete_bounty(-1001, created.id)
|
||||
assert result is True
|
||||
assert service.get_bounty(-1001, created.id) is None
|
||||
|
||||
def test_delete_bounty_not_found(self):
|
||||
storage = MockRoomStorage()
|
||||
service = RoomBountyService(storage)
|
||||
result = service.delete_bounty(-1001, 999)
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestTrackingService:
|
||||
def test_track_bounty(self):
|
||||
storage = MockTrackingStorage()
|
||||
service = TrackingService(storage)
|
||||
result = service.track_bounty(-1001, 123456, 1)
|
||||
assert result is True
|
||||
|
||||
def test_track_bounty_duplicate(self):
|
||||
storage = MockTrackingStorage()
|
||||
service = TrackingService(storage)
|
||||
service.track_bounty(-1001, 123456, 1)
|
||||
result = service.track_bounty(-1001, 123456, 1)
|
||||
assert result is False
|
||||
|
||||
def test_untrack_bounty(self):
|
||||
storage = MockTrackingStorage()
|
||||
service = TrackingService(storage)
|
||||
service.track_bounty(-1001, 123456, 1)
|
||||
result = service.untrack_bounty(-1001, 123456, 1)
|
||||
assert result is True
|
||||
|
||||
def test_untrack_bounty_not_tracking(self):
|
||||
storage = MockTrackingStorage()
|
||||
service = TrackingService(storage)
|
||||
result = service.untrack_bounty(-1001, 123456, 1)
|
||||
assert result is False
|
||||
|
||||
def test_list_tracked(self):
|
||||
storage = MockTrackingStorage()
|
||||
service = TrackingService(storage)
|
||||
service.track_bounty(-1001, 123456, 1)
|
||||
service.track_bounty(-1001, 123456, 2)
|
||||
tracked = service.list_tracked(-1001, 123456)
|
||||
assert len(tracked) == 2
|
||||
|
||||
def test_list_tracked_empty(self):
|
||||
storage = MockTrackingStorage()
|
||||
service = TrackingService(storage)
|
||||
tracked = service.list_tracked(-1001, 123456)
|
||||
assert tracked == []
|
||||
Reference in New Issue
Block a user