Compare commits
1 Commits
43d07eae92
...
feature/ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c17bff110 |
230
core/services.py
230
core/services.py
@@ -3,7 +3,7 @@
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from core.models import Bounty, RoomData, TrackedBounty, TrackingData
|
||||
from core.models import Bounty, Category, RoomData, TrackedBounty, TrackingData
|
||||
from core.ports import RoomStorage, TrackingStorage
|
||||
|
||||
|
||||
@@ -289,6 +289,234 @@ class BountyService:
|
||||
results[bounty_id] = "deleted"
|
||||
return results
|
||||
|
||||
# --- Category Management ---
|
||||
|
||||
def add_category(
|
||||
self,
|
||||
room_id: int,
|
||||
slug: str,
|
||||
name: str,
|
||||
username: str | None,
|
||||
) -> Category:
|
||||
"""Create a new category. Admin only.
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
slug: Category ID (lowercase alphabetic, e.g., "bug")
|
||||
name: Display name (e.g., "Bug Report")
|
||||
username: Requesting admin's username
|
||||
|
||||
Returns:
|
||||
Created Category
|
||||
|
||||
Raises:
|
||||
PermissionError: If not admin
|
||||
ValueError: If slug already exists or invalid
|
||||
"""
|
||||
if not self.is_admin(room_id, username):
|
||||
raise PermissionError("Only admins can add categories.")
|
||||
|
||||
# Validate slug format (lowercase alphabetic only)
|
||||
if not slug or not slug.isalpha() or not slug.islower():
|
||||
raise ValueError(
|
||||
"Category slug must be lowercase alphabetic only (e.g., 'bug', 'feature')."
|
||||
)
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(
|
||||
room_id=room_id, bounties=[], next_id=1, admin_usernames=[], categories=[]
|
||||
)
|
||||
|
||||
# Check for duplicate slug
|
||||
for cat in room_data.categories:
|
||||
if cat.id == slug and cat.deleted_at is None:
|
||||
raise ValueError(f"Category '{slug}' already exists.")
|
||||
|
||||
category = Category(
|
||||
id=slug,
|
||||
name=name,
|
||||
created_at=int(time.time()),
|
||||
deleted_at=None,
|
||||
)
|
||||
room_data.categories.append(category)
|
||||
self._storage.save(room_data)
|
||||
return category
|
||||
|
||||
def delete_category(
|
||||
self,
|
||||
room_id: int,
|
||||
slug: str,
|
||||
username: str | None,
|
||||
) -> bool:
|
||||
"""Soft delete a category. Admin only.
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
slug: Category slug to delete
|
||||
username: Requesting admin's username
|
||||
|
||||
Returns:
|
||||
True if deleted, False if not found
|
||||
"""
|
||||
if not self.is_admin(room_id, username):
|
||||
raise PermissionError("Only admins can delete categories.")
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return False
|
||||
|
||||
for cat in room_data.categories:
|
||||
if cat.id == slug and cat.deleted_at is None:
|
||||
cat.deleted_at = int(time.time())
|
||||
self._storage.save(room_data)
|
||||
return True
|
||||
return False
|
||||
|
||||
def list_categories(self, room_id: int) -> list[Category]:
|
||||
"""List active categories (excludes soft-deleted).
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
|
||||
Returns:
|
||||
List of active categories
|
||||
"""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return []
|
||||
return [c for c in room_data.categories if c.deleted_at is None]
|
||||
|
||||
def get_category(self, room_id: int, slug: str) -> Category | None:
|
||||
"""Get a category by slug (excludes soft-deleted).
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
slug: Category slug
|
||||
|
||||
Returns:
|
||||
Category or None if not found
|
||||
"""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return None
|
||||
for cat in room_data.categories:
|
||||
if cat.id == slug and cat.deleted_at is None:
|
||||
return cat
|
||||
return None
|
||||
|
||||
def _validate_category_exists(self, room_id: int, slug: str) -> None:
|
||||
"""Validate that a category exists (and is not deleted). Raises ValueError if not found."""
|
||||
if not self.get_category(room_id, slug):
|
||||
raise ValueError(f"Category '{slug}' not found.")
|
||||
|
||||
def add_category_to_bounty(
|
||||
self,
|
||||
room_id: int,
|
||||
bounty_id: int,
|
||||
category_slug: str,
|
||||
username: str | None,
|
||||
) -> bool:
|
||||
"""Add category to a bounty. Admin only.
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
bounty_id: Bounty ID
|
||||
category_slug: Category slug to add
|
||||
username: Requesting admin's username
|
||||
|
||||
Returns:
|
||||
True if newly added, False if already exists
|
||||
|
||||
Raises:
|
||||
PermissionError: If not admin
|
||||
ValueError: If bounty or category not found
|
||||
"""
|
||||
if not self.is_admin(room_id, username):
|
||||
raise PermissionError("Only admins can manage bounty categories.")
|
||||
|
||||
bounty = self.get_bounty(room_id, bounty_id)
|
||||
if not bounty:
|
||||
raise ValueError("Bounty not found.")
|
||||
|
||||
self._validate_category_exists(room_id, category_slug)
|
||||
|
||||
if category_slug in bounty.category_ids:
|
||||
return False # Already exists
|
||||
|
||||
bounty.category_ids.append(category_slug)
|
||||
self._storage.update_bounty(room_id, bounty)
|
||||
return True
|
||||
|
||||
def remove_category_from_bounty(
|
||||
self,
|
||||
room_id: int,
|
||||
bounty_id: int,
|
||||
category_slug: str,
|
||||
username: str | None,
|
||||
) -> bool:
|
||||
"""Remove category from a bounty. Admin only.
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
bounty_id: Bounty ID
|
||||
category_slug: Category slug to remove
|
||||
username: Requesting admin's username
|
||||
|
||||
Returns:
|
||||
True if removed, False if not found
|
||||
"""
|
||||
if not self.is_admin(room_id, username):
|
||||
raise PermissionError("Only admins can manage bounty categories.")
|
||||
|
||||
bounty = self.get_bounty(room_id, bounty_id)
|
||||
if not bounty:
|
||||
raise ValueError("Bounty not found.")
|
||||
|
||||
if category_slug not in bounty.category_ids:
|
||||
return False
|
||||
|
||||
bounty.category_ids.remove(category_slug)
|
||||
self._storage.update_bounty(room_id, bounty)
|
||||
return True
|
||||
|
||||
def update_bounty_categories(
|
||||
self,
|
||||
room_id: int,
|
||||
bounty_id: int,
|
||||
category_slugs: list[str],
|
||||
username: str | None,
|
||||
) -> bool:
|
||||
"""Replace all categories on a bounty. Admin only.
|
||||
|
||||
Args:
|
||||
room_id: Room identifier
|
||||
bounty_id: Bounty ID
|
||||
category_slugs: New list of category slugs
|
||||
username: Requesting admin's username
|
||||
|
||||
Returns:
|
||||
True if updated
|
||||
|
||||
Raises:
|
||||
PermissionError: If not admin
|
||||
ValueError: If bounty or any category not found
|
||||
"""
|
||||
if not self.is_admin(room_id, username):
|
||||
raise PermissionError("Only admins can manage bounty categories.")
|
||||
|
||||
bounty = self.get_bounty(room_id, bounty_id)
|
||||
if not bounty:
|
||||
raise ValueError("Bounty not found.")
|
||||
|
||||
# Validate all categories exist
|
||||
for slug in category_slugs:
|
||||
self._validate_category_exists(room_id, slug)
|
||||
|
||||
bounty.category_ids = category_slugs
|
||||
self._storage.update_bounty(room_id, bounty)
|
||||
return True
|
||||
|
||||
|
||||
class TrackingService:
|
||||
"""Service for tracking bounty operations."""
|
||||
|
||||
Reference in New Issue
Block a user