Files
jigaido/core/services.py
shokollm 7c17bff110 feat: add category service layer methods
Service layer for category feature (Issue #86):

Category Management:
- add_category() - Create category (admin only, validates slug format)
- delete_category() - Soft delete category (admin only)
- list_categories() - List active categories
- get_category() - Get category by slug

Category-to-Bounty Association:
- add_category_to_bounty() - Add category to bounty (admin only)
- remove_category_from_bounty() - Remove category from bounty (admin only)
- update_bounty_categories() - Replace all categories on bounty (admin only)

All methods properly validate permissions, slug format, and existence.
Soft delete preserves category data for bounties that reference it.
2026-04-09 10:38:08 +00:00

574 lines
19 KiB
Python

"""Pure business logic services for JIGAIDO."""
import time
from typing import Optional
from core.models import Bounty, Category, RoomData, TrackedBounty, TrackingData
from core.ports import RoomStorage, TrackingStorage
class BountyService:
"""Service for bounty operations in a room.
A room is identified by room_id:
- Negative room_id: Telegram group (e.g., -1001)
- Positive room_id: DM/personal context (user's Telegram ID)
This service handles both group and personal bounties through room_id.
Permissions:
- /add, /edit, /delete: admin only
- /admin, /admin add, /admin remove: admin only
- /bounty, /show, /track, /untrack, /my: everyone
"""
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, username: str | None) -> bool:
"""Check if user is admin in a room by username."""
if not username:
return False
room_data = self._storage.load(room_id)
if room_data is None:
return False
return username in (room_data.admin_usernames or [])
def add_admin(
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
room_data = self._storage.load(room_id)
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
admin_usernames = room_data.admin_usernames
if admin_usernames is None:
admin_usernames = []
room_data.admin_usernames = []
if username in admin_usernames:
raise ValueError(f"@{username} is already an admin.")
admin_usernames.append(username)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None or not (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
if username not in (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
(room_data.admin_usernames or []).remove(username)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[str]:
"""List all admin usernames in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_usernames or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_username: str | None
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
room_data.timezone = timezone
self._storage.save(room_data)
def get_timezone(self, room_id: int) -> str:
"""Get the timezone for a room. Returns UTC+0 if not set."""
room_data = self._storage.load(room_id)
if room_data is None:
return "UTC+0"
return room_data.timezone or "UTC+0"
def check_link_unique(
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
) -> bool:
"""Check if a link is unique within a room (not used by another bounty)."""
if not link:
return True
room_data = self._storage.load(room_id)
if room_data is None:
return True
for bounty in room_data.bounties:
if bounty.deleted_at is not None:
continue
if bounty.link == link and bounty.id != exclude_bounty_id:
return False
return True
def add_bounty(
self,
room_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
created_by_username: Optional[str] = None,
) -> Bounty:
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
raise ValueError("A bounty with this link already exists in this room.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
else:
room_data.next_id += 1
bounty = Bounty(
id=room_data.next_id,
created_by_user_id=user_id,
created_by_username=created_by_username,
text=text,
link=link,
due_date_ts=due_date_ts,
created_at=int(time.time()),
)
self._storage.add_bounty(room_id, bounty)
return bounty
def list_bounties(self, room_id: int) -> list[Bounty]:
"""List all non-deleted bounties in a room."""
return self._storage.list_bounties(room_id)
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
"""List all soft-deleted bounties in a room. For /recover functionality."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
return [b for b in all_bounties if b.deleted_at is not None]
def get_deleted_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific soft-deleted bounty by ID."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
for b in all_bounties:
if b.id == bounty_id and b.deleted_at is not None:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, username: str | None) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, username):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
if not bounty:
return "not_found"
if bounty.deleted_at is None:
return "not_deleted"
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty)
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
Returns dict of bounty_id -> result ('recovered', 'not_found', 'not_deleted', 'permission_denied')
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, username)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if bounty and bounty.deleted_at is not None:
return None
return bounty
def update_bounty(
self,
room_id: int,
bounty_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
clear_link: bool = False,
clear_due: bool = False,
) -> bool:
"""Update a bounty. Only admins can update."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
room_id, link, exclude_bounty_id=bounty_id
):
raise ValueError("A bounty with this link already exists in this room.")
updated = Bounty(
id=bounty.id,
created_by_user_id=bounty.created_by_user_id,
text=text if text is not None else bounty.text,
link=None if clear_link else (link if link is not None else bounty.link),
due_date_ts=None
if clear_due
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
created_at=bounty.created_at,
deleted_at=bounty.deleted_at,
created_by_username=bounty.created_by_username,
)
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, username: str | None) -> bool:
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can delete bounties.")
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
return True
def delete_bounties(
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
Results can be: 'deleted', 'not_found', 'permission_denied'
"""
results = {}
for bounty_id in bounty_ids:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, username):
results[bounty_id] = "permission_denied"
continue
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
results[bounty_id] = "deleted"
return results
# --- Category Management ---
def add_category(
self,
room_id: int,
slug: str,
name: str,
username: str | None,
) -> Category:
"""Create a new category. Admin only.
Args:
room_id: Room identifier
slug: Category ID (lowercase alphabetic, e.g., "bug")
name: Display name (e.g., "Bug Report")
username: Requesting admin's username
Returns:
Created Category
Raises:
PermissionError: If not admin
ValueError: If slug already exists or invalid
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can add categories.")
# Validate slug format (lowercase alphabetic only)
if not slug or not slug.isalpha() or not slug.islower():
raise ValueError(
"Category slug must be lowercase alphabetic only (e.g., 'bug', 'feature')."
)
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[], categories=[]
)
# Check for duplicate slug
for cat in room_data.categories:
if cat.id == slug and cat.deleted_at is None:
raise ValueError(f"Category '{slug}' already exists.")
category = Category(
id=slug,
name=name,
created_at=int(time.time()),
deleted_at=None,
)
room_data.categories.append(category)
self._storage.save(room_data)
return category
def delete_category(
self,
room_id: int,
slug: str,
username: str | None,
) -> bool:
"""Soft delete a category. Admin only.
Args:
room_id: Room identifier
slug: Category slug to delete
username: Requesting admin's username
Returns:
True if deleted, False if not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can delete categories.")
room_data = self._storage.load(room_id)
if room_data is None:
return False
for cat in room_data.categories:
if cat.id == slug and cat.deleted_at is None:
cat.deleted_at = int(time.time())
self._storage.save(room_data)
return True
return False
def list_categories(self, room_id: int) -> list[Category]:
"""List active categories (excludes soft-deleted).
Args:
room_id: Room identifier
Returns:
List of active categories
"""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return [c for c in room_data.categories if c.deleted_at is None]
def get_category(self, room_id: int, slug: str) -> Category | None:
"""Get a category by slug (excludes soft-deleted).
Args:
room_id: Room identifier
slug: Category slug
Returns:
Category or None if not found
"""
room_data = self._storage.load(room_id)
if room_data is None:
return None
for cat in room_data.categories:
if cat.id == slug and cat.deleted_at is None:
return cat
return None
def _validate_category_exists(self, room_id: int, slug: str) -> None:
"""Validate that a category exists (and is not deleted). Raises ValueError if not found."""
if not self.get_category(room_id, slug):
raise ValueError(f"Category '{slug}' not found.")
def add_category_to_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None,
) -> bool:
"""Add category to a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to add
username: Requesting admin's username
Returns:
True if newly added, False if already exists
Raises:
PermissionError: If not admin
ValueError: If bounty or category not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can manage bounty categories.")
bounty = self.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
self._validate_category_exists(room_id, category_slug)
if category_slug in bounty.category_ids:
return False # Already exists
bounty.category_ids.append(category_slug)
self._storage.update_bounty(room_id, bounty)
return True
def remove_category_from_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None,
) -> bool:
"""Remove category from a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to remove
username: Requesting admin's username
Returns:
True if removed, False if not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can manage bounty categories.")
bounty = self.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
if category_slug not in bounty.category_ids:
return False
bounty.category_ids.remove(category_slug)
self._storage.update_bounty(room_id, bounty)
return True
def update_bounty_categories(
self,
room_id: int,
bounty_id: int,
category_slugs: list[str],
username: str | None,
) -> bool:
"""Replace all categories on a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slugs: New list of category slugs
username: Requesting admin's username
Returns:
True if updated
Raises:
PermissionError: If not admin
ValueError: If bounty or any category not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can manage bounty categories.")
bounty = self.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
# Validate all categories exist
for slug in category_slugs:
self._validate_category_exists(room_id, slug)
bounty.category_ids = category_slugs
self._storage.update_bounty(room_id, bounty)
return True
class TrackingService:
"""Service for tracking bounty operations."""
def __init__(self, tracking_storage: TrackingStorage, room_storage: RoomStorage):
self._tracking = tracking_storage
self._room = room_storage
def track_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
"""Start tracking a bounty. Returns True if newly tracked."""
bounty = self._room.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
tracking_data = TrackingData(room_id=room_id, user_id=user_id, tracked=[])
for tracked in tracking_data.tracked:
if tracked.bounty_id == bounty_id:
return False
tracked = TrackedBounty(bounty_id=bounty_id, created_at=int(time.time()))
self._tracking.track_bounty(room_id, user_id, tracked)
return True
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
"""Stop tracking a bounty. Returns True if was tracking."""
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
return False
for tracked in tracking_data.tracked:
if tracked.bounty_id == bounty_id:
self._tracking.untrack_bounty(room_id, user_id, bounty_id)
return True
return False
def get_tracked_bounties(self, room_id: int, user_id: int) -> list[Bounty]:
"""Get all bounties tracked by a user in a room."""
tracking_data = self._tracking.load(room_id, user_id)
if tracking_data is None:
return []
room_data = self._room.load(room_id)
if room_data is None:
return []
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
return [
bounty_map[t.bounty_id]
for t in tracking_data.tracked
if t.bounty_id in bounty_map
]