Compare commits
6 Commits
fix/issue-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6312d94c0b | ||
| 42ed551554 | |||
|
|
af7774ef03 | ||
| 0a64b4f310 | |||
|
|
ed0d31bc04 | ||
| d413f6ce13 |
@@ -140,6 +140,35 @@ class JsonFileRoomStorage:
|
||||
|
||||
return None
|
||||
|
||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||
"""List all non-deleted bounties in a room.
|
||||
|
||||
This is the default method for normal queries - soft-deleted bounties
|
||||
are excluded from results.
|
||||
"""
|
||||
room_data = self.load(room_id)
|
||||
if room_data is None:
|
||||
return []
|
||||
return [b for b in room_data.bounties if b.deleted_at is None]
|
||||
|
||||
def list_all_bounties(
|
||||
self, room_id: int, include_deleted: bool = True
|
||||
) -> list[Bounty]:
|
||||
"""List all bounties including or excluding soft-deleted.
|
||||
|
||||
Args:
|
||||
room_id: The room ID
|
||||
include_deleted: If True, return all bounties including soft-deleted.
|
||||
If False, return only non-deleted bounties.
|
||||
Defaults to True for /recover functionality.
|
||||
"""
|
||||
room_data = self.load(room_id)
|
||||
if room_data is None:
|
||||
return []
|
||||
if include_deleted:
|
||||
return room_data.bounties
|
||||
return [b for b in room_data.bounties if b.deleted_at is None]
|
||||
|
||||
|
||||
class JsonFileTrackingStorage:
|
||||
"""TrackingStorage implementation using JSON files.
|
||||
|
||||
41
cli/main.py
41
cli/main.py
@@ -127,6 +127,40 @@ def cmd_delete(args):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_recover(args):
|
||||
"""List or recover soft-deleted bounties."""
|
||||
bounty_service, _ = create_services()
|
||||
room_id = args.group_id or args.user_id
|
||||
user_id = args.user_id or 0
|
||||
|
||||
if not args.bounty_ids:
|
||||
deleted = bounty_service.list_deleted_bounties(room_id)
|
||||
if not deleted:
|
||||
print("No recoverable bounties")
|
||||
return
|
||||
print("Recoverable bounties:")
|
||||
for b in deleted:
|
||||
from datetime import datetime
|
||||
|
||||
deleted_str = datetime.fromtimestamp(b.deleted_at).strftime("%d %b %Y")
|
||||
print(f" [#{b.id}] {b.text or '(no text)'} | Deleted {deleted_str}")
|
||||
return
|
||||
|
||||
if not bounty_service.is_admin(room_id, user_id):
|
||||
print("Error: Only admins can recover bounties.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
for bounty_id in args.bounty_ids:
|
||||
try:
|
||||
success, msg = bounty_service.recover_bounty(
|
||||
room_id=room_id, bounty_id=bounty_id, user_id=user_id
|
||||
)
|
||||
print(msg)
|
||||
except PermissionError as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_track(args):
|
||||
"""Track a bounty."""
|
||||
_, tracking_service = create_services()
|
||||
@@ -196,6 +230,9 @@ def main():
|
||||
parser_untrack = subparsers.add_parser("untrack", help="Untrack a bounty")
|
||||
parser_untrack.add_argument("bounty_id", type=int, help="Bounty ID to untrack")
|
||||
|
||||
parser_recover = subparsers.add_parser("recover", help="List or recover soft-deleted bounties")
|
||||
parser_recover.add_argument("bounty_ids", nargs="*", type=int, help="Bounty ID(s) to recover (optional)")
|
||||
|
||||
for sp in [
|
||||
parser_add,
|
||||
parser_list,
|
||||
@@ -204,6 +241,7 @@ def main():
|
||||
parser_delete,
|
||||
parser_track,
|
||||
parser_untrack,
|
||||
parser_recover,
|
||||
]:
|
||||
sp.add_argument(
|
||||
"--group-id", type=int, help="Group context (use group room ID)"
|
||||
@@ -222,7 +260,7 @@ def main():
|
||||
print("Error: either --group-id or --user-id is required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if args.command in ("add", "list", "my", "update", "delete"):
|
||||
if args.command in ("add", "list", "my", "update", "delete", "recover"):
|
||||
if not (args.group_id or args.user_id):
|
||||
print("Error: --group-id or --user-id required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
@@ -238,6 +276,7 @@ def main():
|
||||
"delete": cmd_delete,
|
||||
"track": cmd_track,
|
||||
"untrack": cmd_untrack,
|
||||
"recover": cmd_recover,
|
||||
}
|
||||
|
||||
if args.command in command_map:
|
||||
|
||||
@@ -40,6 +40,25 @@ class RoomStorage(Protocol):
|
||||
"""Get a specific bounty from a room by ID."""
|
||||
...
|
||||
|
||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||
"""List all non-deleted bounties in a room.
|
||||
|
||||
Soft-deleted bounties (where deleted_at is not None) are excluded.
|
||||
"""
|
||||
...
|
||||
|
||||
def list_all_bounties(
|
||||
self, room_id: int, include_deleted: bool = True
|
||||
) -> list[Bounty]:
|
||||
"""List all bounties including or excluding soft-deleted.
|
||||
|
||||
Args:
|
||||
room_id: The room ID
|
||||
include_deleted: If True, return all bounties including soft-deleted.
|
||||
If False, return only non-deleted bounties.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class TrackingStorage(Protocol):
|
||||
|
||||
170
core/services.py
170
core/services.py
@@ -15,11 +15,103 @@ class BountyService:
|
||||
- Positive room_id: DM/personal context (user's Telegram ID)
|
||||
|
||||
This service handles both group and personal bounties through room_id.
|
||||
|
||||
Permissions:
|
||||
- /add, /edit, /delete: admin only
|
||||
- /admin, /admin add, /admin remove: admin only
|
||||
- /bounty, /show, /track, /untrack, /my: everyone
|
||||
"""
|
||||
|
||||
def __init__(self, storage: RoomStorage):
|
||||
self._storage = storage
|
||||
|
||||
def is_admin(self, room_id: int, user_id: int) -> bool:
|
||||
"""Check if user is admin in a room."""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return False
|
||||
return user_id in (room_data.admin_user_ids or [])
|
||||
|
||||
def add_admin(
|
||||
self, room_id: int, admin_user_id: int, requesting_user_id: int
|
||||
) -> None:
|
||||
"""Add an admin to a room. Requires admin permission."""
|
||||
if not self.is_admin(room_id, requesting_user_id):
|
||||
raise PermissionError("Only admins can add admins.")
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(
|
||||
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
|
||||
)
|
||||
|
||||
if admin_user_id not in (room_data.admin_user_ids or []):
|
||||
(room_data.admin_user_ids or []).append(admin_user_id)
|
||||
self._storage.save(room_data)
|
||||
|
||||
def remove_admin(
|
||||
self, room_id: int, admin_user_id: int, requesting_user_id: int
|
||||
) -> None:
|
||||
"""Remove an admin from a room. Requires admin permission."""
|
||||
if not self.is_admin(room_id, requesting_user_id):
|
||||
raise PermissionError("Only admins can remove admins.")
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return
|
||||
|
||||
if admin_user_id in (room_data.admin_user_ids or []):
|
||||
(room_data.admin_user_ids or []).remove(admin_user_id)
|
||||
self._storage.save(room_data)
|
||||
|
||||
def list_admins(self, room_id: int) -> list[int]:
|
||||
"""List all admin user IDs in a room."""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return []
|
||||
return list(room_data.admin_user_ids or [])
|
||||
|
||||
def set_timezone(
|
||||
self, room_id: int, timezone: str, requesting_user_id: int
|
||||
) -> None:
|
||||
"""Set the timezone for a room. Requires admin permission."""
|
||||
if not self.is_admin(room_id, requesting_user_id):
|
||||
raise PermissionError("Only admins can set timezone.")
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(
|
||||
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
|
||||
)
|
||||
|
||||
room_data.timezone = timezone
|
||||
self._storage.save(room_data)
|
||||
|
||||
def get_timezone(self, room_id: int) -> str:
|
||||
"""Get the timezone for a room. Returns UTC+0 if not set."""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return "UTC+0"
|
||||
return room_data.timezone or "UTC+0"
|
||||
|
||||
def check_link_unique(
|
||||
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
|
||||
) -> bool:
|
||||
"""Check if a link is unique within a room (not used by another bounty)."""
|
||||
if not link:
|
||||
return True
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return True
|
||||
|
||||
for bounty in room_data.bounties:
|
||||
if bounty.deleted_at is not None:
|
||||
continue
|
||||
if bounty.link == link and bounty.id != exclude_bounty_id:
|
||||
return False
|
||||
return True
|
||||
|
||||
def add_bounty(
|
||||
self,
|
||||
room_id: int,
|
||||
@@ -28,7 +120,13 @@ class BountyService:
|
||||
link: Optional[str] = None,
|
||||
due_date_ts: Optional[int] = None,
|
||||
) -> Bounty:
|
||||
"""Add a new bounty to the room."""
|
||||
"""Add a new bounty to the room. Requires admin permission."""
|
||||
if not self.is_admin(room_id, user_id):
|
||||
raise PermissionError("Only admins can add bounties.")
|
||||
|
||||
if not self.check_link_unique(room_id, link):
|
||||
raise ValueError("A bounty with this link already exists in this room.")
|
||||
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
|
||||
@@ -47,15 +145,20 @@ class BountyService:
|
||||
return bounty
|
||||
|
||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||
"""List all bounties in a room."""
|
||||
room_data = self._storage.load(room_id)
|
||||
if room_data is None:
|
||||
return []
|
||||
return room_data.bounties
|
||||
"""List all non-deleted bounties in a room."""
|
||||
return self._storage.list_bounties(room_id)
|
||||
|
||||
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
|
||||
"""List all soft-deleted bounties in a room. For /recover functionality."""
|
||||
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
||||
return [b for b in all_bounties if b.deleted_at is not None]
|
||||
|
||||
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
||||
"""Get a specific bounty by ID."""
|
||||
return self._storage.get_bounty(room_id, bounty_id)
|
||||
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
|
||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||
if bounty and bounty.deleted_at is not None:
|
||||
return None
|
||||
return bounty
|
||||
|
||||
def update_bounty(
|
||||
self,
|
||||
@@ -68,12 +171,17 @@ class BountyService:
|
||||
clear_link: bool = False,
|
||||
clear_due: bool = False,
|
||||
) -> bool:
|
||||
"""Update a bounty. Only creator can update."""
|
||||
"""Update a bounty. Only admins can update."""
|
||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||
if not bounty:
|
||||
return False
|
||||
if bounty.created_by_user_id != user_id:
|
||||
raise PermissionError("Only the creator can edit this bounty.")
|
||||
if not self.is_admin(room_id, user_id):
|
||||
raise PermissionError("Only admins can edit bounties.")
|
||||
|
||||
if link and not self.check_link_unique(
|
||||
room_id, link, exclude_bounty_id=bounty_id
|
||||
):
|
||||
raise ValueError("A bounty with this link already exists in this room.")
|
||||
|
||||
updated = Bounty(
|
||||
id=bounty.id,
|
||||
@@ -84,21 +192,51 @@ class BountyService:
|
||||
if clear_due
|
||||
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
|
||||
created_at=bounty.created_at,
|
||||
deleted_at=bounty.deleted_at,
|
||||
created_by_username=bounty.created_by_username,
|
||||
)
|
||||
self._storage.update_bounty(room_id, updated)
|
||||
return True
|
||||
|
||||
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
|
||||
"""Delete a bounty. Only creator can delete."""
|
||||
"""Soft delete a bounty. Only admins can delete."""
|
||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||
if not bounty:
|
||||
return False
|
||||
if bounty.created_by_user_id != user_id:
|
||||
raise PermissionError("Only the creator can delete this bounty.")
|
||||
if not self.is_admin(room_id, user_id):
|
||||
raise PermissionError("Only admins can delete bounties.")
|
||||
|
||||
self._storage.delete_bounty(room_id, bounty_id)
|
||||
bounty.deleted_at = int(time.time())
|
||||
self._storage.update_bounty(room_id, bounty)
|
||||
return True
|
||||
|
||||
def recover_bounty(
|
||||
self, room_id: int, bounty_id: int, user_id: int
|
||||
) -> tuple[bool, str]:
|
||||
"""Recover a soft-deleted bounty. Only admins can recover.
|
||||
|
||||
Returns (success, message) tuple.
|
||||
"""
|
||||
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
||||
bounty = None
|
||||
for b in all_bounties:
|
||||
if b.id == bounty_id:
|
||||
bounty = b
|
||||
break
|
||||
|
||||
if not bounty:
|
||||
return False, f"Bounty #{bounty_id} not found."
|
||||
|
||||
if bounty.deleted_at is None:
|
||||
return False, f"Bounty #{bounty_id} is not deleted."
|
||||
|
||||
if not self.is_admin(room_id, user_id):
|
||||
raise PermissionError("Only admins can recover bounties.")
|
||||
|
||||
bounty.deleted_at = None
|
||||
self._storage.update_bounty(room_id, bounty)
|
||||
return True, f"Recovered bounty #{bounty_id}."
|
||||
|
||||
|
||||
class TrackingService:
|
||||
"""Service for tracking bounty operations."""
|
||||
@@ -147,7 +285,7 @@ class TrackingService:
|
||||
if room_data is None:
|
||||
return []
|
||||
|
||||
bounty_map = {b.id: b for b in room_data.bounties}
|
||||
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
|
||||
return [
|
||||
bounty_map[t.bounty_id]
|
||||
for t in tracking_data.tracked
|
||||
|
||||
@@ -48,6 +48,20 @@ class SimpleRoomStorage:
|
||||
return b
|
||||
return None
|
||||
|
||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||
if room_id not in self._rooms:
|
||||
return []
|
||||
return [b for b in self._rooms[room_id].bounties if b.deleted_at is None]
|
||||
|
||||
def list_all_bounties(
|
||||
self, room_id: int, include_deleted: bool = True
|
||||
) -> list[Bounty]:
|
||||
if room_id not in self._rooms:
|
||||
return []
|
||||
if include_deleted:
|
||||
return self._rooms[room_id].bounties
|
||||
return [b for b in self._rooms[room_id].bounties if b.deleted_at is None]
|
||||
|
||||
|
||||
class SimpleTrackingStorage:
|
||||
"""Minimal mock without ensure_tracking - tests if track_bounty works without it.
|
||||
@@ -119,6 +133,20 @@ class MockRoomStorage:
|
||||
return b
|
||||
return None
|
||||
|
||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||
if room_id not in self._rooms:
|
||||
return []
|
||||
return [b for b in self._rooms[room_id].bounties if b.deleted_at is None]
|
||||
|
||||
def list_all_bounties(
|
||||
self, room_id: int, include_deleted: bool = True
|
||||
) -> list[Bounty]:
|
||||
if room_id not in self._rooms:
|
||||
return []
|
||||
if include_deleted:
|
||||
return self._rooms[room_id].bounties
|
||||
return [b for b in self._rooms[room_id].bounties if b.deleted_at is None]
|
||||
|
||||
|
||||
class MockTrackingStorage:
|
||||
"""Mock implementation of TrackingStorage for testing."""
|
||||
|
||||
@@ -15,18 +15,32 @@ class TestBountyService:
|
||||
"""Set up fresh storage and service for each test."""
|
||||
self.storage = MockRoomStorage()
|
||||
self.service = BountyService(self.storage)
|
||||
self.admin_user_id = 123
|
||||
self._make_admin(-1001, self.admin_user_id)
|
||||
|
||||
def _make_admin(self, room_id: int, user_id: int):
|
||||
"""Helper to set up a room with an admin user."""
|
||||
room_data = self.storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(
|
||||
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
|
||||
)
|
||||
if user_id not in (room_data.admin_user_ids or []):
|
||||
room_data.admin_user_ids = room_data.admin_user_ids or []
|
||||
room_data.admin_user_ids.append(user_id)
|
||||
self.storage.save(room_data)
|
||||
|
||||
def test_add_bounty_creates_room_if_not_exists(self):
|
||||
"""Test that add_bounty creates a new room if it doesn't exist."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
text="Fix bug",
|
||||
link="https://github.com/issue/1",
|
||||
)
|
||||
assert bounty.id == 1
|
||||
assert bounty.text == "Fix bug"
|
||||
assert bounty.created_by_user_id == 123
|
||||
assert bounty.created_by_user_id == self.admin_user_id
|
||||
|
||||
room = self.storage.load(-1001)
|
||||
assert room is not None
|
||||
@@ -34,14 +48,25 @@ class TestBountyService:
|
||||
|
||||
def test_add_bounty_increments_id(self):
|
||||
"""Test that add_bounty increments bounty ID for each new bounty."""
|
||||
b1 = self.service.add_bounty(room_id=-1001, user_id=123, text="First")
|
||||
b2 = self.service.add_bounty(room_id=-1001, user_id=123, text="Second")
|
||||
b3 = self.service.add_bounty(room_id=-1001, user_id=123, text="Third")
|
||||
b1 = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="First"
|
||||
)
|
||||
b2 = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="Second"
|
||||
)
|
||||
b3 = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="Third"
|
||||
)
|
||||
|
||||
assert b1.id == 1
|
||||
assert b2.id == 2
|
||||
assert b3.id == 3
|
||||
|
||||
def test_add_bounty_requires_admin(self):
|
||||
"""Test that add_bounty raises PermissionError when non-admin tries to add."""
|
||||
with pytest.raises(PermissionError, match="Only admins can add bounties"):
|
||||
self.service.add_bounty(room_id=-1001, user_id=999, text="Not admin")
|
||||
|
||||
def test_list_bounties_empty_room(self):
|
||||
"""Test list_bounties returns empty list for non-existent room."""
|
||||
bounties = self.service.list_bounties(-1001)
|
||||
@@ -49,9 +74,15 @@ class TestBountyService:
|
||||
|
||||
def test_list_bounties_returns_all_bounties(self):
|
||||
"""Test list_bounties returns all bounties in a room."""
|
||||
self.service.add_bounty(room_id=-1001, user_id=123, text="First")
|
||||
self.service.add_bounty(room_id=-1001, user_id=123, text="Second")
|
||||
self.service.add_bounty(room_id=-999, user_id=123, text="Other room")
|
||||
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="First")
|
||||
self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="Second"
|
||||
)
|
||||
# Add bounty to different room to verify isolation
|
||||
self._make_admin(-999, self.admin_user_id)
|
||||
self.service.add_bounty(
|
||||
room_id=-999, user_id=self.admin_user_id, text="Other room"
|
||||
)
|
||||
|
||||
bounties = self.service.list_bounties(-1001)
|
||||
assert len(bounties) == 2
|
||||
@@ -59,7 +90,9 @@ class TestBountyService:
|
||||
|
||||
def test_get_bounty_found(self):
|
||||
"""Test get_bounty returns bounty when it exists."""
|
||||
created = self.service.add_bounty(room_id=-1001, user_id=123, text="Test")
|
||||
created = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="Test"
|
||||
)
|
||||
found = self.service.get_bounty(-1001, created.id)
|
||||
assert found is not None
|
||||
assert found.text == "Test"
|
||||
@@ -71,31 +104,35 @@ class TestBountyService:
|
||||
|
||||
def test_get_bounty_wrong_room(self):
|
||||
"""Test get_bounty returns None when bounty is in different room."""
|
||||
self.service.add_bounty(room_id=-1001, user_id=123, text="Test")
|
||||
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="Test")
|
||||
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
|
||||
assert found is None
|
||||
|
||||
def test_update_bounty_success(self):
|
||||
"""Test update_bounty succeeds when creator updates their bounty."""
|
||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="Original")
|
||||
"""Test update_bounty succeeds when admin updates their bounty."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="Original"
|
||||
)
|
||||
result = self.service.update_bounty(
|
||||
room_id=-1001,
|
||||
bounty_id=bounty.id,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
text="Updated",
|
||||
)
|
||||
assert result is True
|
||||
updated = self.service.get_bounty(-1001, bounty.id)
|
||||
assert updated.text == "Updated"
|
||||
|
||||
def test_update_bounty_not_creator_raises_permission_error(self):
|
||||
"""Test update_bounty raises PermissionError when non-creator tries to update."""
|
||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="Original")
|
||||
with pytest.raises(PermissionError, match="Only the creator can edit"):
|
||||
def test_update_bounty_not_admin_raises_permission_error(self):
|
||||
"""Test update_bounty raises PermissionError when non-admin tries to update."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="Original"
|
||||
)
|
||||
with pytest.raises(PermissionError, match="Only admins can edit bounties"):
|
||||
self.service.update_bounty(
|
||||
room_id=-1001,
|
||||
bounty_id=bounty.id,
|
||||
user_id=999, # different user
|
||||
user_id=999, # different user, not admin
|
||||
text="Hacked",
|
||||
)
|
||||
|
||||
@@ -104,7 +141,7 @@ class TestBountyService:
|
||||
result = self.service.update_bounty(
|
||||
room_id=-1001,
|
||||
bounty_id=999,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
text="Updated",
|
||||
)
|
||||
assert result is False
|
||||
@@ -113,14 +150,14 @@ class TestBountyService:
|
||||
"""Test update_bounty only updates provided fields."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
text="Original",
|
||||
link="https://original.link",
|
||||
)
|
||||
self.service.update_bounty(
|
||||
room_id=-1001,
|
||||
bounty_id=bounty.id,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
text="Updated only text",
|
||||
)
|
||||
updated = self.service.get_bounty(-1001, bounty.id)
|
||||
@@ -131,35 +168,46 @@ class TestBountyService:
|
||||
"""Test update_bounty can clear link."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
text="Test",
|
||||
link="https://original.link",
|
||||
)
|
||||
self.service.update_bounty(
|
||||
room_id=-1001,
|
||||
bounty_id=bounty.id,
|
||||
user_id=123,
|
||||
user_id=self.admin_user_id,
|
||||
clear_link=True,
|
||||
)
|
||||
updated = self.service.get_bounty(-1001, bounty.id)
|
||||
assert updated.link is None
|
||||
|
||||
def test_delete_bounty_success(self):
|
||||
"""Test delete_bounty succeeds when creator deletes their bounty."""
|
||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="To delete")
|
||||
result = self.service.delete_bounty(-1001, bounty.id, 123)
|
||||
"""Test delete_bounty soft deletes when admin deletes their bounty."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="To delete"
|
||||
)
|
||||
result = self.service.delete_bounty(-1001, bounty.id, self.admin_user_id)
|
||||
assert result is True
|
||||
# Soft delete - bounty should not be found via get_bounty
|
||||
assert self.service.get_bounty(-1001, bounty.id) is None
|
||||
# But still exists in list_deleted_bounties
|
||||
deleted = self.service.list_deleted_bounties(-1001)
|
||||
assert len(deleted) == 1
|
||||
assert deleted[0].id == bounty.id
|
||||
|
||||
def test_delete_bounty_not_creator_raises_permission_error(self):
|
||||
"""Test delete_bounty raises PermissionError when non-creator tries to delete."""
|
||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="To delete")
|
||||
with pytest.raises(PermissionError, match="Only the creator can delete"):
|
||||
self.service.delete_bounty(-1001, bounty.id, 999) # different user
|
||||
def test_delete_bounty_not_admin_raises_permission_error(self):
|
||||
"""Test delete_bounty raises PermissionError when non-admin tries to delete."""
|
||||
bounty = self.service.add_bounty(
|
||||
room_id=-1001, user_id=self.admin_user_id, text="To delete"
|
||||
)
|
||||
with pytest.raises(PermissionError, match="Only admins can delete bounties"):
|
||||
self.service.delete_bounty(
|
||||
-1001, bounty.id, 999
|
||||
) # different user, not admin
|
||||
|
||||
def test_delete_bounty_not_found(self):
|
||||
"""Test delete_bounty returns False when bounty doesn't exist."""
|
||||
result = self.service.delete_bounty(-1001, 999, 123)
|
||||
result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
|
||||
assert result is False
|
||||
|
||||
|
||||
@@ -171,9 +219,27 @@ class TestTrackingService:
|
||||
self.room_storage = MockRoomStorage()
|
||||
self.tracking_storage = MockTrackingStorage()
|
||||
self.service = TrackingService(self.tracking_storage, self.room_storage)
|
||||
self.admin_user_id = 123
|
||||
self._make_admin(-1001, self.admin_user_id)
|
||||
|
||||
def _make_admin(self, room_id: int, user_id: int):
|
||||
"""Helper to set up a room with an admin user."""
|
||||
room_data = self.room_storage.load(room_id)
|
||||
if room_data is None:
|
||||
room_data = RoomData(
|
||||
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
|
||||
)
|
||||
if user_id not in (room_data.admin_user_ids or []):
|
||||
room_data.admin_user_ids = room_data.admin_user_ids or []
|
||||
room_data.admin_user_ids.append(user_id)
|
||||
self.room_storage.save(room_data)
|
||||
|
||||
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
|
||||
"""Helper to add a bounty for tracking tests."""
|
||||
if self.room_storage.load(room_id) is None or user_id not in (
|
||||
self.room_storage.load(room_id).admin_user_ids or []
|
||||
):
|
||||
self._make_admin(room_id, user_id)
|
||||
bounty_service = BountyService(self.room_storage)
|
||||
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user