Service layer for category feature (Issue #86): Category Management: - add_category() - Create category (admin only, validates slug format) - delete_category() - Soft delete category (admin only) - list_categories() - List active categories - get_category() - Get category by slug Category-to-Bounty Association: - add_category_to_bounty() - Add category to bounty (admin only) - remove_category_from_bounty() - Remove category from bounty (admin only) - update_bounty_categories() - Replace all categories on bounty (admin only) All methods properly validate permissions, slug format, and existence. Soft delete preserves category data for bounties that reference it.
574 lines
19 KiB
Python
574 lines
19 KiB
Python
"""Pure business logic services for JIGAIDO."""
|
|
|
|
import time
|
|
from typing import Optional
|
|
|
|
from core.models import Bounty, Category, RoomData, TrackedBounty, TrackingData
|
|
from core.ports import RoomStorage, TrackingStorage
|
|
|
|
|
|
class BountyService:
|
|
"""Service for bounty operations in a room.
|
|
|
|
A room is identified by room_id:
|
|
- Negative room_id: Telegram group (e.g., -1001)
|
|
- Positive room_id: DM/personal context (user's Telegram ID)
|
|
|
|
This service handles both group and personal bounties through room_id.
|
|
|
|
Permissions:
|
|
- /add, /edit, /delete: admin only
|
|
- /admin, /admin add, /admin remove: admin only
|
|
- /bounty, /show, /track, /untrack, /my: everyone
|
|
"""
|
|
|
|
def __init__(self, storage: RoomStorage):
|
|
self._storage = storage
|
|
|
|
def is_admin(self, room_id: int, username: str | None) -> bool:
|
|
"""Check if user is admin in a room by username."""
|
|
if not username:
|
|
return False
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return False
|
|
return username in (room_data.admin_usernames or [])
|
|
|
|
def add_admin(
|
|
self, room_id: int, username: str, requesting_username: str | None
|
|
) -> None:
|
|
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
|
|
room_data = self._storage.load(room_id)
|
|
|
|
has_no_admins = room_data is None or not room_data.admin_usernames
|
|
is_self_promotion = requesting_username == username
|
|
|
|
if not self.is_admin(room_id, requesting_username):
|
|
if not (has_no_admins and is_self_promotion):
|
|
raise PermissionError("Only admins can add admins.")
|
|
|
|
if room_data is None:
|
|
room_data = RoomData(
|
|
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
|
|
)
|
|
|
|
admin_usernames = room_data.admin_usernames
|
|
if admin_usernames is None:
|
|
admin_usernames = []
|
|
room_data.admin_usernames = []
|
|
|
|
if username in admin_usernames:
|
|
raise ValueError(f"@{username} is already an admin.")
|
|
|
|
admin_usernames.append(username)
|
|
self._storage.save(room_data)
|
|
|
|
def remove_admin(
|
|
self, room_id: int, username: str, requesting_username: str | None
|
|
) -> None:
|
|
"""Remove an admin from a room. Requires admin permission."""
|
|
if not self.is_admin(room_id, requesting_username):
|
|
raise PermissionError("Only admins can remove admins.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None or not (room_data.admin_usernames or []):
|
|
raise ValueError(f"@{username} is not an admin.")
|
|
|
|
if username not in (room_data.admin_usernames or []):
|
|
raise ValueError(f"@{username} is not an admin.")
|
|
|
|
(room_data.admin_usernames or []).remove(username)
|
|
self._storage.save(room_data)
|
|
|
|
def list_admins(self, room_id: int) -> list[str]:
|
|
"""List all admin usernames in a room."""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return []
|
|
return list(room_data.admin_usernames or [])
|
|
|
|
def set_timezone(
|
|
self, room_id: int, timezone: str, requesting_username: str | None
|
|
) -> None:
|
|
"""Set the timezone for a room. Requires admin permission."""
|
|
if not self.is_admin(room_id, requesting_username):
|
|
raise PermissionError("Only admins can set timezone.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
room_data = RoomData(
|
|
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
|
|
)
|
|
|
|
room_data.timezone = timezone
|
|
self._storage.save(room_data)
|
|
|
|
def get_timezone(self, room_id: int) -> str:
|
|
"""Get the timezone for a room. Returns UTC+0 if not set."""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return "UTC+0"
|
|
return room_data.timezone or "UTC+0"
|
|
|
|
def check_link_unique(
|
|
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
|
|
) -> bool:
|
|
"""Check if a link is unique within a room (not used by another bounty)."""
|
|
if not link:
|
|
return True
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return True
|
|
|
|
for bounty in room_data.bounties:
|
|
if bounty.deleted_at is not None:
|
|
continue
|
|
if bounty.link == link and bounty.id != exclude_bounty_id:
|
|
return False
|
|
return True
|
|
|
|
def add_bounty(
|
|
self,
|
|
room_id: int,
|
|
user_id: int,
|
|
username: str | None,
|
|
text: Optional[str] = None,
|
|
link: Optional[str] = None,
|
|
due_date_ts: Optional[int] = None,
|
|
created_by_username: Optional[str] = None,
|
|
) -> Bounty:
|
|
"""Add a new bounty to the room. Requires admin permission."""
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can add bounties.")
|
|
|
|
if not self.check_link_unique(room_id, link):
|
|
raise ValueError("A bounty with this link already exists in this room.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
|
|
else:
|
|
room_data.next_id += 1
|
|
|
|
bounty = Bounty(
|
|
id=room_data.next_id,
|
|
created_by_user_id=user_id,
|
|
created_by_username=created_by_username,
|
|
text=text,
|
|
link=link,
|
|
due_date_ts=due_date_ts,
|
|
created_at=int(time.time()),
|
|
)
|
|
self._storage.add_bounty(room_id, bounty)
|
|
return bounty
|
|
|
|
def list_bounties(self, room_id: int) -> list[Bounty]:
|
|
"""List all non-deleted bounties in a room."""
|
|
return self._storage.list_bounties(room_id)
|
|
|
|
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
|
|
"""List all soft-deleted bounties in a room. For /recover functionality."""
|
|
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
|
return [b for b in all_bounties if b.deleted_at is not None]
|
|
|
|
def get_deleted_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
|
"""Get a specific soft-deleted bounty by ID."""
|
|
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
|
for b in all_bounties:
|
|
if b.id == bounty_id and b.deleted_at is not None:
|
|
return b
|
|
return None
|
|
|
|
def recover_bounty(self, room_id: int, bounty_id: int, username: str | None) -> str:
|
|
"""Recover a soft-deleted bounty. Admin only.
|
|
|
|
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
|
|
"""
|
|
if not self.is_admin(room_id, username):
|
|
return "permission_denied"
|
|
|
|
bounty = self.get_deleted_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
return "not_found"
|
|
if bounty.deleted_at is None:
|
|
return "not_deleted"
|
|
|
|
bounty.deleted_at = None
|
|
self._storage.update_bounty(room_id, bounty)
|
|
return "recovered"
|
|
|
|
def recover_bounties(
|
|
self, room_id: int, bounty_ids: list[int], username: str | None
|
|
) -> dict[int, str]:
|
|
"""Recover multiple soft-deleted bounties. Admin only.
|
|
|
|
Returns dict of bounty_id -> result ('recovered', 'not_found', 'not_deleted', 'permission_denied')
|
|
"""
|
|
results = {}
|
|
for bounty_id in bounty_ids:
|
|
results[bounty_id] = self.recover_bounty(room_id, bounty_id, username)
|
|
return results
|
|
|
|
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
|
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if bounty and bounty.deleted_at is not None:
|
|
return None
|
|
return bounty
|
|
|
|
def update_bounty(
|
|
self,
|
|
room_id: int,
|
|
bounty_id: int,
|
|
username: str | None,
|
|
text: Optional[str] = None,
|
|
link: Optional[str] = None,
|
|
due_date_ts: Optional[int] = None,
|
|
clear_link: bool = False,
|
|
clear_due: bool = False,
|
|
) -> bool:
|
|
"""Update a bounty. Only admins can update."""
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
return False
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can edit bounties.")
|
|
|
|
if link and not self.check_link_unique(
|
|
room_id, link, exclude_bounty_id=bounty_id
|
|
):
|
|
raise ValueError("A bounty with this link already exists in this room.")
|
|
|
|
updated = Bounty(
|
|
id=bounty.id,
|
|
created_by_user_id=bounty.created_by_user_id,
|
|
text=text if text is not None else bounty.text,
|
|
link=None if clear_link else (link if link is not None else bounty.link),
|
|
due_date_ts=None
|
|
if clear_due
|
|
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
|
|
created_at=bounty.created_at,
|
|
deleted_at=bounty.deleted_at,
|
|
created_by_username=bounty.created_by_username,
|
|
)
|
|
self._storage.update_bounty(room_id, updated)
|
|
return True
|
|
|
|
def delete_bounty(self, room_id: int, bounty_id: int, username: str | None) -> bool:
|
|
"""Soft delete a bounty. Only admins can delete."""
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
return False
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can delete bounties.")
|
|
|
|
bounty.deleted_at = int(time.time())
|
|
self._storage.update_bounty(room_id, bounty)
|
|
return True
|
|
|
|
def delete_bounties(
|
|
self, room_id: int, bounty_ids: list[int], username: str | None
|
|
) -> dict[int, str]:
|
|
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
|
|
|
|
Results can be: 'deleted', 'not_found', 'permission_denied'
|
|
"""
|
|
results = {}
|
|
for bounty_id in bounty_ids:
|
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
results[bounty_id] = "not_found"
|
|
continue
|
|
if not self.is_admin(room_id, username):
|
|
results[bounty_id] = "permission_denied"
|
|
continue
|
|
|
|
bounty.deleted_at = int(time.time())
|
|
self._storage.update_bounty(room_id, bounty)
|
|
results[bounty_id] = "deleted"
|
|
return results
|
|
|
|
# --- Category Management ---
|
|
|
|
def add_category(
|
|
self,
|
|
room_id: int,
|
|
slug: str,
|
|
name: str,
|
|
username: str | None,
|
|
) -> Category:
|
|
"""Create a new category. Admin only.
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
slug: Category ID (lowercase alphabetic, e.g., "bug")
|
|
name: Display name (e.g., "Bug Report")
|
|
username: Requesting admin's username
|
|
|
|
Returns:
|
|
Created Category
|
|
|
|
Raises:
|
|
PermissionError: If not admin
|
|
ValueError: If slug already exists or invalid
|
|
"""
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can add categories.")
|
|
|
|
# Validate slug format (lowercase alphabetic only)
|
|
if not slug or not slug.isalpha() or not slug.islower():
|
|
raise ValueError(
|
|
"Category slug must be lowercase alphabetic only (e.g., 'bug', 'feature')."
|
|
)
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
room_data = RoomData(
|
|
room_id=room_id, bounties=[], next_id=1, admin_usernames=[], categories=[]
|
|
)
|
|
|
|
# Check for duplicate slug
|
|
for cat in room_data.categories:
|
|
if cat.id == slug and cat.deleted_at is None:
|
|
raise ValueError(f"Category '{slug}' already exists.")
|
|
|
|
category = Category(
|
|
id=slug,
|
|
name=name,
|
|
created_at=int(time.time()),
|
|
deleted_at=None,
|
|
)
|
|
room_data.categories.append(category)
|
|
self._storage.save(room_data)
|
|
return category
|
|
|
|
def delete_category(
|
|
self,
|
|
room_id: int,
|
|
slug: str,
|
|
username: str | None,
|
|
) -> bool:
|
|
"""Soft delete a category. Admin only.
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
slug: Category slug to delete
|
|
username: Requesting admin's username
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can delete categories.")
|
|
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return False
|
|
|
|
for cat in room_data.categories:
|
|
if cat.id == slug and cat.deleted_at is None:
|
|
cat.deleted_at = int(time.time())
|
|
self._storage.save(room_data)
|
|
return True
|
|
return False
|
|
|
|
def list_categories(self, room_id: int) -> list[Category]:
|
|
"""List active categories (excludes soft-deleted).
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
|
|
Returns:
|
|
List of active categories
|
|
"""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return []
|
|
return [c for c in room_data.categories if c.deleted_at is None]
|
|
|
|
def get_category(self, room_id: int, slug: str) -> Category | None:
|
|
"""Get a category by slug (excludes soft-deleted).
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
slug: Category slug
|
|
|
|
Returns:
|
|
Category or None if not found
|
|
"""
|
|
room_data = self._storage.load(room_id)
|
|
if room_data is None:
|
|
return None
|
|
for cat in room_data.categories:
|
|
if cat.id == slug and cat.deleted_at is None:
|
|
return cat
|
|
return None
|
|
|
|
def _validate_category_exists(self, room_id: int, slug: str) -> None:
|
|
"""Validate that a category exists (and is not deleted). Raises ValueError if not found."""
|
|
if not self.get_category(room_id, slug):
|
|
raise ValueError(f"Category '{slug}' not found.")
|
|
|
|
def add_category_to_bounty(
|
|
self,
|
|
room_id: int,
|
|
bounty_id: int,
|
|
category_slug: str,
|
|
username: str | None,
|
|
) -> bool:
|
|
"""Add category to a bounty. Admin only.
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
bounty_id: Bounty ID
|
|
category_slug: Category slug to add
|
|
username: Requesting admin's username
|
|
|
|
Returns:
|
|
True if newly added, False if already exists
|
|
|
|
Raises:
|
|
PermissionError: If not admin
|
|
ValueError: If bounty or category not found
|
|
"""
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can manage bounty categories.")
|
|
|
|
bounty = self.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
raise ValueError("Bounty not found.")
|
|
|
|
self._validate_category_exists(room_id, category_slug)
|
|
|
|
if category_slug in bounty.category_ids:
|
|
return False # Already exists
|
|
|
|
bounty.category_ids.append(category_slug)
|
|
self._storage.update_bounty(room_id, bounty)
|
|
return True
|
|
|
|
def remove_category_from_bounty(
|
|
self,
|
|
room_id: int,
|
|
bounty_id: int,
|
|
category_slug: str,
|
|
username: str | None,
|
|
) -> bool:
|
|
"""Remove category from a bounty. Admin only.
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
bounty_id: Bounty ID
|
|
category_slug: Category slug to remove
|
|
username: Requesting admin's username
|
|
|
|
Returns:
|
|
True if removed, False if not found
|
|
"""
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can manage bounty categories.")
|
|
|
|
bounty = self.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
raise ValueError("Bounty not found.")
|
|
|
|
if category_slug not in bounty.category_ids:
|
|
return False
|
|
|
|
bounty.category_ids.remove(category_slug)
|
|
self._storage.update_bounty(room_id, bounty)
|
|
return True
|
|
|
|
def update_bounty_categories(
|
|
self,
|
|
room_id: int,
|
|
bounty_id: int,
|
|
category_slugs: list[str],
|
|
username: str | None,
|
|
) -> bool:
|
|
"""Replace all categories on a bounty. Admin only.
|
|
|
|
Args:
|
|
room_id: Room identifier
|
|
bounty_id: Bounty ID
|
|
category_slugs: New list of category slugs
|
|
username: Requesting admin's username
|
|
|
|
Returns:
|
|
True if updated
|
|
|
|
Raises:
|
|
PermissionError: If not admin
|
|
ValueError: If bounty or any category not found
|
|
"""
|
|
if not self.is_admin(room_id, username):
|
|
raise PermissionError("Only admins can manage bounty categories.")
|
|
|
|
bounty = self.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
raise ValueError("Bounty not found.")
|
|
|
|
# Validate all categories exist
|
|
for slug in category_slugs:
|
|
self._validate_category_exists(room_id, slug)
|
|
|
|
bounty.category_ids = category_slugs
|
|
self._storage.update_bounty(room_id, bounty)
|
|
return True
|
|
|
|
|
|
class TrackingService:
|
|
"""Service for tracking bounty operations."""
|
|
|
|
def __init__(self, tracking_storage: TrackingStorage, room_storage: RoomStorage):
|
|
self._tracking = tracking_storage
|
|
self._room = room_storage
|
|
|
|
def track_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
|
|
"""Start tracking a bounty. Returns True if newly tracked."""
|
|
bounty = self._room.get_bounty(room_id, bounty_id)
|
|
if not bounty:
|
|
raise ValueError("Bounty not found.")
|
|
|
|
tracking_data = self._tracking.load(room_id, user_id)
|
|
if tracking_data is None:
|
|
tracking_data = TrackingData(room_id=room_id, user_id=user_id, tracked=[])
|
|
|
|
for tracked in tracking_data.tracked:
|
|
if tracked.bounty_id == bounty_id:
|
|
return False
|
|
|
|
tracked = TrackedBounty(bounty_id=bounty_id, created_at=int(time.time()))
|
|
self._tracking.track_bounty(room_id, user_id, tracked)
|
|
return True
|
|
|
|
def untrack_bounty(self, room_id: int, user_id: int, bounty_id: int) -> bool:
|
|
"""Stop tracking a bounty. Returns True if was tracking."""
|
|
tracking_data = self._tracking.load(room_id, user_id)
|
|
if tracking_data is None:
|
|
return False
|
|
|
|
for tracked in tracking_data.tracked:
|
|
if tracked.bounty_id == bounty_id:
|
|
self._tracking.untrack_bounty(room_id, user_id, bounty_id)
|
|
return True
|
|
return False
|
|
|
|
def get_tracked_bounties(self, room_id: int, user_id: int) -> list[Bounty]:
|
|
"""Get all bounties tracked by a user in a room."""
|
|
tracking_data = self._tracking.load(room_id, user_id)
|
|
if tracking_data is None:
|
|
return []
|
|
|
|
room_data = self._room.load(room_id)
|
|
if room_data is None:
|
|
return []
|
|
|
|
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
|
|
return [
|
|
bounty_map[t.bounty_id]
|
|
for t in tracking_data.tracked
|
|
if t.bounty_id in bounty_map
|
|
]
|