- Add time parsing (HH:MM format) after date Example: /add Fix bug https://github.com/foo/bar april 15 14:30 - Update check_link_unique to return conflicting bounty ID - Add_bounty now includes bounty ID in duplicate link error - cmd_add now catches PermissionError and displays admin-only message - Update usage text and help message - Fixes #45
275 lines
9.8 KiB
Python
275 lines
9.8 KiB
Python
"""Pure business logic services for JIGAIDO."""
|
|
|
|
import time
|
|
from typing import Optional
|
|
|
|
from core.models import Bounty, RoomData, TrackedBounty, TrackingData
|
|
from core.ports import RoomStorage, TrackingStorage
|
|
|
|
|
|
class BountyService:
|
|
"""Service for bounty operations in a room.
|
|
|
|
A room is identified by room_id:
|
|
- Negative room_id: Telegram group (e.g., -1001)
|
|
- Positive room_id: DM/personal context (user's Telegram ID)
|
|
|
|
This service handles both group and personal bounties through room_id.
|
|
|
|
Permissions:
|
|
- /add, /edit, /delete: admin only
|
|
- /admin, /admin add, /admin remove: admin only
|
|
- /bounty, /show, /track, /untrack, /my: everyone
|
|
"""
|
|
|
|
def __init__(self, storage: RoomStorage):
|
|
self._storage = storage
|
|
|
|
def is_admin(self, room_id: int, user_id: int) -> bool:
|
|
"""Check if user is admin in a room."""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return False
|
|
return user_id in (room_data.admin_user_ids or [])
|
|
|
|
def add_admin(
|
|
self, room_id: int, admin_user_id: int, requesting_user_id: int
|
|
) -> None:
|
|
"""Add an admin to a room. Requires admin permission."""
|
|
if not self.is_admin(room_id, requesting_user_id):
|
|
raise PermissionError("Only admins can add admins.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
room_data = RoomData(
|
|
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
|
|
)
|
|
|
|
if admin_user_id not in (room_data.admin_user_ids or []):
|
|
(room_data.admin_user_ids or []).append(admin_user_id)
|
|
self._storage.save(room_data)
|
|
|
|
def remove_admin(
|
|
self, room_id: int, admin_user_id: int, requesting_user_id: int
|
|
) -> None:
|
|
"""Remove an admin from a room. Requires admin permission."""
|
|
if not self.is_admin(room_id, requesting_user_id):
|
|
raise PermissionError("Only admins can remove admins.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return
|
|
|
|
if admin_user_id in (room_data.admin_user_ids or []):
|
|
(room_data.admin_user_ids or []).remove(admin_user_id)
|
|
self._storage.save(room_data)
|
|
|
|
def list_admins(self, room_id: int) -> list[int]:
|
|
"""List all admin user IDs in a room."""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return []
|
|
return list(room_data.admin_user_ids or [])
|
|
|
|
def set_timezone(
|
|
self, room_id: int, timezone: str, requesting_user_id: int
|
|
) -> None:
|
|
"""Set the timezone for a room. Requires admin permission."""
|
|
if not self.is_admin(room_id, requesting_user_id):
|
|
raise PermissionError("Only admins can set timezone.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
room_data = RoomData(
|
|
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
|
|
)
|
|
|
|
room_data.timezone = timezone
|
|
self._storage.save(room_data)
|
|
|
|
def get_timezone(self, room_id: int) -> str:
|
|
"""Get the timezone for a room. Returns UTC+0 if not set."""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return "UTC+0"
|
|
return room_data.timezone or "UTC+0"
|
|
|
|
def check_link_unique(
|
|
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
|
|
) -> int | None:
|
|
"""Check if a link is unique within a room (not used by another bounty).
|
|
|
|
Returns the conflicting bounty ID if found, or None if unique/allowed.
|
|
"""
|
|
if not link:
|
|
return None
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return None
|
|
|
|
for bounty in room_data.bounties:
|
|
if bounty.deleted_at is not None:
|
|
continue
|
|
if bounty.link == link and bounty.id != exclude_bounty_id:
|
|
return bounty.id
|
|
return None
|
|
|
|
def add_bounty(
|
|
self,
|
|
room_id: int,
|
|
user_id: int,
|
|
text: Optional[str] = None,
|
|
link: Optional[str] = None,
|
|
due_date_ts: Optional[int] = None,
|
|
) -> Bounty:
|
|
"""Add a new bounty to the room. Requires admin permission."""
|
|
if not self.is_admin(room_id, user_id):
|
|
raise PermissionError("Only admins can add bounties.")
|
|
|
|
conflicting_id = self.check_link_unique(room_id, link)
|
|
if conflicting_id is not None:
|
|
raise ValueError(
|
|
f"A bounty with this link already exists: #{conflicting_id}"
|
|
)
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
|
|
else:
|
|
room_data.next_id += 1
|
|
|
|
bounty = Bounty(
|
|
id=room_data.next_id,
|
|
created_by_user_id=user_id,
|
|
text=text,
|
|
link=link,
|
|
due_date_ts=due_date_ts,
|
|
created_at=int(time.time()),
|
|
)
|
|
self._storage.add_bounty(room_id, bounty)
|
|
return bounty
|
|
|
|
def list_bounties(self, room_id: int) -> list[Bounty]:
|
|
"""List all non-deleted bounties in a room."""
|
|
return self._storage.list_bounties(room_id)
|
|
|
|
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
|
|
"""List all soft-deleted bounties in a room. For /recover functionality."""
|
|
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
|
return [b for b in all_bounties if b.deleted_at is not None]
|
|
|
|
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
|
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if bounty and bounty.deleted_at is not None:
|
|
return None
|
|
return bounty
|
|
|
|
def update_bounty(
|
|
self,
|
|
room_id: int,
|
|
bounty_id: int,
|
|
user_id: int,
|
|
text: Optional[str] = None,
|
|
link: Optional[str] = None,
|
|
due_date_ts: Optional[int] = None,
|
|
clear_link: bool = False,
|
|
clear_due: bool = False,
|
|
) -> bool:
|
|
"""Update a bounty. Only admins can update."""
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
return False
|
|
if not self.is_admin(room_id, user_id):
|
|
raise PermissionError("Only admins can edit bounties.")
|
|
|
|
if (
|
|
link
|
|
and self.check_link_unique(room_id, link, exclude_bounty_id=bounty_id)
|
|
is not None
|
|
):
|
|
raise ValueError("A bounty with this link already exists in this room.")
|
|
|
|
updated = Bounty(
|
|
id=bounty.id,
|
|
created_by_user_id=bounty.created_by_user_id,
|
|
text=text if text is not None else bounty.text,
|
|
link=None if clear_link else (link if link is not None else bounty.link),
|
|
due_date_ts=None
|
|
if clear_due
|
|
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
|
|
created_at=bounty.created_at,
|
|
deleted_at=bounty.deleted_at,
|
|
created_by_username=bounty.created_by_username,
|
|
)
|
|
self._storage.update_bounty(room_id, updated)
|
|
return True
|
|
|
|
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
|
|
"""Soft delete a bounty. Only admins can delete."""
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
return False
|
|
if not self.is_admin(room_id, user_id):
|
|
raise PermissionError("Only admins can delete bounties.")
|
|
|
|
bounty.deleted_at = int(time.time())
|
|
self._storage.update_bounty(room_id, bounty)
|
|
return True
|
|
|
|
|
|
class TrackingService:
|
|
"""Service for tracking bounty operations."""
|
|
|
|
def __init__(self, tracking_storage: TrackingStorage, room_storage: RoomStorage):
|
|
self._tracking = tracking_storage
|
|
self._room = room_storage
|
|
|
|
def track_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
|
|
"""Start tracking a bounty. Returns True if newly tracked."""
|
|
bounty = self._room.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
raise ValueError("Bounty not found.")
|
|
|
|
tracking_data = self._tracking.load(room_id, user_id)
|
|
if tracking_data is None:
|
|
tracking_data = TrackingData(room_id=room_id, user_id=user_id, tracked=[])
|
|
|
|
for tracked in tracking_data.tracked:
|
|
if tracked.bounty_id == bounty_id:
|
|
return False
|
|
|
|
tracked = TrackedBounty(bounty_id=bounty_id, created_at=int(time.time()))
|
|
self._tracking.track_bounty(room_id, user_id, tracked)
|
|
return True
|
|
|
|
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
|
|
"""Stop tracking a bounty. Returns True if was tracking."""
|
|
tracking_data = self._tracking.load(room_id, user_id)
|
|
if tracking_data is None:
|
|
return False
|
|
|
|
for tracked in tracking_data.tracked:
|
|
if tracked.bounty_id == bounty_id:
|
|
self._tracking.untrack_bounty(room_id, user_id, bounty_id)
|
|
return True
|
|
return False
|
|
|
|
def get_tracked_bounties(self, room_id: int, user_id: int) -> list[Bounty]:
|
|
"""Get all bounties tracked by a user in a room."""
|
|
tracking_data = self._tracking.load(room_id, user_id)
|
|
if tracking_data is None:
|
|
return []
|
|
|
|
room_data = self._room.load(room_id)
|
|
if room_data is None:
|
|
return []
|
|
|
|
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
|
|
return [
|
|
bounty_map[t.bounty_id]
|
|
for t in tracking_data.tracked
|
|
if t.bounty_id in bounty_map
|
|
]
|