Files
jigaido/core/services.py
shokollm 8ac8cd21ec fix: allow self-promotion to first admin in room
Users can now add themselves as the first admin without existing
admin permission. This enables /start in DMs to work correctly.
2026-04-04 15:34:58 +00:00

331 lines
12 KiB
Python

"""Pure business logic services for JIGAIDO."""
import time
from typing import Optional
from core.models import Bounty, RoomData, TrackedBounty, TrackingData
from core.ports import RoomStorage, TrackingStorage
class BountyService:
"""Service for bounty operations in a room.
A room is identified by room_id:
- Negative room_id: Telegram group (e.g., -1001)
- Positive room_id: DM/personal context (user's Telegram ID)
This service handles both group and personal bounties through room_id.
Permissions:
- /add, /edit, /delete: admin only
- /admin, /admin add, /admin remove: admin only
- /bounty, /show, /track, /untrack, /my: everyone
"""
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, user_id: int) -> bool:
"""Check if user is admin in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return False
return user_id in (room_data.admin_user_ids or [])
def add_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
) -> None:
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
room_data = self._storage.load(room_id)
has_no_admins = room_data is None or not room_data.admin_user_ids
is_self_promotion = requesting_user_id == admin_user_id
if not self.is_admin(room_id, requesting_user_id):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
)
if admin_user_id not in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).append(admin_user_id)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None:
return
if admin_user_id in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).remove(admin_user_id)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[int]:
"""List all admin user IDs in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_user_ids or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_user_id: int
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
)
room_data.timezone = timezone
self._storage.save(room_data)
def get_timezone(self, room_id: int) -> str:
"""Get the timezone for a room. Returns UTC+0 if not set."""
room_data = self._storage.load(room_id)
if room_data is None:
return "UTC+0"
return room_data.timezone or "UTC+0"
def check_link_unique(
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
) -> bool:
"""Check if a link is unique within a room (not used by another bounty)."""
if not link:
return True
room_data = self._storage.load(room_id)
if room_data is None:
return True
for bounty in room_data.bounties:
if bounty.deleted_at is not None:
continue
if bounty.link == link and bounty.id != exclude_bounty_id:
return False
return True
def add_bounty(
self,
room_id: int,
user_id: int,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
) -> Bounty:
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
raise ValueError("A bounty with this link already exists in this room.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
else:
room_data.next_id += 1
bounty = Bounty(
id=room_data.next_id,
created_by_user_id=user_id,
text=text,
link=link,
due_date_ts=due_date_ts,
created_at=int(time.time()),
)
self._storage.add_bounty(room_id, bounty)
return bounty
def list_bounties(self, room_id: int) -> list[Bounty]:
"""List all non-deleted bounties in a room."""
return self._storage.list_bounties(room_id)
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
"""List all soft-deleted bounties in a room. For /recover functionality."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
return [b for b in all_bounties if b.deleted_at is not None]
def get_deleted_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific soft-deleted bounty by ID."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
for b in all_bounties:
if b.id == bounty_id and b.deleted_at is not None:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, user_id: int) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, user_id):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
if not bounty:
return "not_found"
if bounty.deleted_at is None:
return "not_deleted"
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty)
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
Returns dict of bounty_id -> result ('recovered', 'not_found', 'not_deleted', 'permission_denied')
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, user_id)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if bounty and bounty.deleted_at is not None:
return None
return bounty
def update_bounty(
self,
room_id: int,
bounty_id: int,
user_id: int,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
clear_link: bool = False,
clear_due: bool = False,
) -> bool:
"""Update a bounty. Only admins can update."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
room_id, link, exclude_bounty_id=bounty_id
):
raise ValueError("A bounty with this link already exists in this room.")
updated = Bounty(
id=bounty.id,
created_by_user_id=bounty.created_by_user_id,
text=text if text is not None else bounty.text,
link=None if clear_link else (link if link is not None else bounty.link),
due_date_ts=None
if clear_due
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
created_at=bounty.created_at,
deleted_at=bounty.deleted_at,
created_by_username=bounty.created_by_username,
)
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can delete bounties.")
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
return True
def delete_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
Results can be: 'deleted', 'not_found', 'permission_denied'
"""
results = {}
for bounty_id in bounty_ids:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, user_id):
results[bounty_id] = "permission_denied"
continue
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
results[bounty_id] = "deleted"
return results
class TrackingService:
"""Service for tracking bounty operations."""
def __init__(self, tracking_storage: TrackingStorage, room_storage: RoomStorage):
self._tracking = tracking_storage
self._room = room_storage
def track_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
"""Start tracking a bounty. Returns True if newly tracked."""
bounty = self._room.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
tracking_data = TrackingData(room_id=room_id, user_id=user_id, tracked=[])
for tracked in tracking_data.tracked:
if tracked.bounty_id == bounty_id:
return False
tracked = TrackedBounty(bounty_id=bounty_id, created_at=int(time.time()))
self._tracking.track_bounty(room_id, user_id, tracked)
return True
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
"""Stop tracking a bounty. Returns True if was tracking."""
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
return False
for tracked in tracking_data.tracked:
if tracked.bounty_id == bounty_id:
self._tracking.untrack_bounty(room_id, user_id, bounty_id)
return True
return False
def get_tracked_bounties(self, room_id: int, user_id: int) -> list[Bounty]:
"""Get all bounties tracked by a user in a room."""
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
return []
room_data = self._room.load(room_id)
if room_data is None:
return []
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
return [
bounty_map[t.bounty_id]
for t in tracking_data.tracked
if t.bounty_id in bounty_map
]