Files
jigaido/core/services.py
shokollm 8494b4621c feat: switch admin identification from user_id to username
- Replace admin_user_ids (list[int]) with admin_usernames (list[str])
- Update all service methods to use username for permission checks
- Add delete button to bot responses for message cleanup
- Update tests to match new implementation

Note: Breaking change - existing data files need fresh start
2026-04-09 08:02:36 +00:00

346 lines
12 KiB
Python

"""Pure business logic services for JIGAIDO."""
import time
from typing import Optional
from core.models import Bounty, RoomData, TrackedBounty, TrackingData
from core.ports import RoomStorage, TrackingStorage
class BountyService:
"""Service for bounty operations in a room.
A room is identified by room_id:
- Negative room_id: Telegram group (e.g., -1001)
- Positive room_id: DM/personal context (user's Telegram ID)
This service handles both group and personal bounties through room_id.
Permissions:
- /add, /edit, /delete: admin only
- /admin, /admin add, /admin remove: admin only
- /bounty, /show, /track, /untrack, /my: everyone
"""
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, username: str | None) -> bool:
"""Check if user is admin in a room by username."""
if not username:
return False
room_data = self._storage.load(room_id)
if room_data is None:
return False
return username in (room_data.admin_usernames or [])
def add_admin(
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
room_data = self._storage.load(room_id)
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
admin_usernames = room_data.admin_usernames
if admin_usernames is None:
admin_usernames = []
room_data.admin_usernames = []
if username in admin_usernames:
raise ValueError(f"@{username} is already an admin.")
admin_usernames.append(username)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None or not (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
if username not in (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
(room_data.admin_usernames or []).remove(username)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[str]:
"""List all admin usernames in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_usernames or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_username: str | None
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
room_data.timezone = timezone
self._storage.save(room_data)
def get_timezone(self, room_id: int) -> str:
"""Get the timezone for a room. Returns UTC+0 if not set."""
room_data = self._storage.load(room_id)
if room_data is None:
return "UTC+0"
return room_data.timezone or "UTC+0"
def check_link_unique(
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
) -> bool:
"""Check if a link is unique within a room (not used by another bounty)."""
if not link:
return True
room_data = self._storage.load(room_id)
if room_data is None:
return True
for bounty in room_data.bounties:
if bounty.deleted_at is not None:
continue
if bounty.link == link and bounty.id != exclude_bounty_id:
return False
return True
def add_bounty(
self,
room_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
created_by_username: Optional[str] = None,
) -> Bounty:
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
raise ValueError("A bounty with this link already exists in this room.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
else:
room_data.next_id += 1
bounty = Bounty(
id=room_data.next_id,
created_by_user_id=user_id,
created_by_username=created_by_username,
text=text,
link=link,
due_date_ts=due_date_ts,
created_at=int(time.time()),
)
self._storage.add_bounty(room_id, bounty)
return bounty
def list_bounties(self, room_id: int) -> list[Bounty]:
"""List all non-deleted bounties in a room."""
return self._storage.list_bounties(room_id)
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
"""List all soft-deleted bounties in a room. For /recover functionality."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
return [b for b in all_bounties if b.deleted_at is not None]
def get_deleted_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific soft-deleted bounty by ID."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
for b in all_bounties:
if b.id == bounty_id and b.deleted_at is not None:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, username: str | None) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, username):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
if not bounty:
return "not_found"
if bounty.deleted_at is None:
return "not_deleted"
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty)
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
Returns dict of bounty_id -> result ('recovered', 'not_found', 'not_deleted', 'permission_denied')
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, username)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if bounty and bounty.deleted_at is not None:
return None
return bounty
def update_bounty(
self,
room_id: int,
bounty_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
clear_link: bool = False,
clear_due: bool = False,
) -> bool:
"""Update a bounty. Only admins can update."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
room_id, link, exclude_bounty_id=bounty_id
):
raise ValueError("A bounty with this link already exists in this room.")
updated = Bounty(
id=bounty.id,
created_by_user_id=bounty.created_by_user_id,
text=text if text is not None else bounty.text,
link=None if clear_link else (link if link is not None else bounty.link),
due_date_ts=None
if clear_due
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
created_at=bounty.created_at,
deleted_at=bounty.deleted_at,
created_by_username=bounty.created_by_username,
)
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, username: str | None) -> bool:
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can delete bounties.")
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
return True
def delete_bounties(
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
Results can be: 'deleted', 'not_found', 'permission_denied'
"""
results = {}
for bounty_id in bounty_ids:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, username):
results[bounty_id] = "permission_denied"
continue
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
results[bounty_id] = "deleted"
return results
class TrackingService:
"""Service for tracking bounty operations."""
def __init__(self, tracking_storage: TrackingStorage, room_storage: RoomStorage):
self._tracking = tracking_storage
self._room = room_storage
def track_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
"""Start tracking a bounty. Returns True if newly tracked."""
bounty = self._room.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
tracking_data = TrackingData(room_id=room_id, user_id=user_id, tracked=[])
for tracked in tracking_data.tracked:
if tracked.bounty_id == bounty_id:
return False
tracked = TrackedBounty(bounty_id=bounty_id, created_at=int(time.time()))
self._tracking.track_bounty(room_id, user_id, tracked)
return True
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
"""Stop tracking a bounty. Returns True if was tracking."""
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
return False
for tracked in tracking_data.tracked:
if tracked.bounty_id == bounty_id:
self._tracking.untrack_bounty(room_id, user_id, bounty_id)
return True
return False
def get_tracked_bounties(self, room_id: int, user_id: int) -> list[Bounty]:
"""Get all bounties tracked by a user in a room."""
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
return []
room_data = self._room.load(room_id)
if room_data is None:
return []
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
return [
bounty_map[t.bounty_id]
for t in tracking_data.tracked
if t.bounty_id in bounty_map
]