Compare commits
4 Commits
fix/issue-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6312d94c0b | ||
| 42ed551554 | |||
|
|
af7774ef03 | ||
| 0a64b4f310 |
41
cli/main.py
41
cli/main.py
@@ -127,6 +127,40 @@ def cmd_delete(args):
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_recover(args):
|
||||||
|
"""List or recover soft-deleted bounties."""
|
||||||
|
bounty_service, _ = create_services()
|
||||||
|
room_id = args.group_id or args.user_id
|
||||||
|
user_id = args.user_id or 0
|
||||||
|
|
||||||
|
if not args.bounty_ids:
|
||||||
|
deleted = bounty_service.list_deleted_bounties(room_id)
|
||||||
|
if not deleted:
|
||||||
|
print("No recoverable bounties")
|
||||||
|
return
|
||||||
|
print("Recoverable bounties:")
|
||||||
|
for b in deleted:
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
deleted_str = datetime.fromtimestamp(b.deleted_at).strftime("%d %b %Y")
|
||||||
|
print(f" [#{b.id}] {b.text or '(no text)'} | Deleted {deleted_str}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not bounty_service.is_admin(room_id, user_id):
|
||||||
|
print("Error: Only admins can recover bounties.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
for bounty_id in args.bounty_ids:
|
||||||
|
try:
|
||||||
|
success, msg = bounty_service.recover_bounty(
|
||||||
|
room_id=room_id, bounty_id=bounty_id, user_id=user_id
|
||||||
|
)
|
||||||
|
print(msg)
|
||||||
|
except PermissionError as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def cmd_track(args):
|
def cmd_track(args):
|
||||||
"""Track a bounty."""
|
"""Track a bounty."""
|
||||||
_, tracking_service = create_services()
|
_, tracking_service = create_services()
|
||||||
@@ -196,6 +230,9 @@ def main():
|
|||||||
parser_untrack = subparsers.add_parser("untrack", help="Untrack a bounty")
|
parser_untrack = subparsers.add_parser("untrack", help="Untrack a bounty")
|
||||||
parser_untrack.add_argument("bounty_id", type=int, help="Bounty ID to untrack")
|
parser_untrack.add_argument("bounty_id", type=int, help="Bounty ID to untrack")
|
||||||
|
|
||||||
|
parser_recover = subparsers.add_parser("recover", help="List or recover soft-deleted bounties")
|
||||||
|
parser_recover.add_argument("bounty_ids", nargs="*", type=int, help="Bounty ID(s) to recover (optional)")
|
||||||
|
|
||||||
for sp in [
|
for sp in [
|
||||||
parser_add,
|
parser_add,
|
||||||
parser_list,
|
parser_list,
|
||||||
@@ -204,6 +241,7 @@ def main():
|
|||||||
parser_delete,
|
parser_delete,
|
||||||
parser_track,
|
parser_track,
|
||||||
parser_untrack,
|
parser_untrack,
|
||||||
|
parser_recover,
|
||||||
]:
|
]:
|
||||||
sp.add_argument(
|
sp.add_argument(
|
||||||
"--group-id", type=int, help="Group context (use group room ID)"
|
"--group-id", type=int, help="Group context (use group room ID)"
|
||||||
@@ -222,7 +260,7 @@ def main():
|
|||||||
print("Error: either --group-id or --user-id is required", file=sys.stderr)
|
print("Error: either --group-id or --user-id is required", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
if args.command in ("add", "list", "my", "update", "delete"):
|
if args.command in ("add", "list", "my", "update", "delete", "recover"):
|
||||||
if not (args.group_id or args.user_id):
|
if not (args.group_id or args.user_id):
|
||||||
print("Error: --group-id or --user-id required", file=sys.stderr)
|
print("Error: --group-id or --user-id required", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@@ -238,6 +276,7 @@ def main():
|
|||||||
"delete": cmd_delete,
|
"delete": cmd_delete,
|
||||||
"track": cmd_track,
|
"track": cmd_track,
|
||||||
"untrack": cmd_untrack,
|
"untrack": cmd_untrack,
|
||||||
|
"recover": cmd_recover,
|
||||||
}
|
}
|
||||||
|
|
||||||
if args.command in command_map:
|
if args.command in command_map:
|
||||||
|
|||||||
170
core/services.py
170
core/services.py
@@ -15,11 +15,103 @@ class BountyService:
|
|||||||
- Positive room_id: DM/personal context (user's Telegram ID)
|
- Positive room_id: DM/personal context (user's Telegram ID)
|
||||||
|
|
||||||
This service handles both group and personal bounties through room_id.
|
This service handles both group and personal bounties through room_id.
|
||||||
|
|
||||||
|
Permissions:
|
||||||
|
- /add, /edit, /delete: admin only
|
||||||
|
- /admin, /admin add, /admin remove: admin only
|
||||||
|
- /bounty, /show, /track, /untrack, /my: everyone
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, storage: RoomStorage):
|
def __init__(self, storage: RoomStorage):
|
||||||
self._storage = storage
|
self._storage = storage
|
||||||
|
|
||||||
|
def is_admin(self, room_id: int, user_id: int) -> bool:
|
||||||
|
"""Check if user is admin in a room."""
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return False
|
||||||
|
return user_id in (room_data.admin_user_ids or [])
|
||||||
|
|
||||||
|
def add_admin(
|
||||||
|
self, room_id: int, admin_user_id: int, requesting_user_id: int
|
||||||
|
) -> None:
|
||||||
|
"""Add an admin to a room. Requires admin permission."""
|
||||||
|
if not self.is_admin(room_id, requesting_user_id):
|
||||||
|
raise PermissionError("Only admins can add admins.")
|
||||||
|
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
room_data = RoomData(
|
||||||
|
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
|
||||||
|
)
|
||||||
|
|
||||||
|
if admin_user_id not in (room_data.admin_user_ids or []):
|
||||||
|
(room_data.admin_user_ids or []).append(admin_user_id)
|
||||||
|
self._storage.save(room_data)
|
||||||
|
|
||||||
|
def remove_admin(
|
||||||
|
self, room_id: int, admin_user_id: int, requesting_user_id: int
|
||||||
|
) -> None:
|
||||||
|
"""Remove an admin from a room. Requires admin permission."""
|
||||||
|
if not self.is_admin(room_id, requesting_user_id):
|
||||||
|
raise PermissionError("Only admins can remove admins.")
|
||||||
|
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if admin_user_id in (room_data.admin_user_ids or []):
|
||||||
|
(room_data.admin_user_ids or []).remove(admin_user_id)
|
||||||
|
self._storage.save(room_data)
|
||||||
|
|
||||||
|
def list_admins(self, room_id: int) -> list[int]:
|
||||||
|
"""List all admin user IDs in a room."""
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return []
|
||||||
|
return list(room_data.admin_user_ids or [])
|
||||||
|
|
||||||
|
def set_timezone(
|
||||||
|
self, room_id: int, timezone: str, requesting_user_id: int
|
||||||
|
) -> None:
|
||||||
|
"""Set the timezone for a room. Requires admin permission."""
|
||||||
|
if not self.is_admin(room_id, requesting_user_id):
|
||||||
|
raise PermissionError("Only admins can set timezone.")
|
||||||
|
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
room_data = RoomData(
|
||||||
|
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
|
||||||
|
)
|
||||||
|
|
||||||
|
room_data.timezone = timezone
|
||||||
|
self._storage.save(room_data)
|
||||||
|
|
||||||
|
def get_timezone(self, room_id: int) -> str:
|
||||||
|
"""Get the timezone for a room. Returns UTC+0 if not set."""
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return "UTC+0"
|
||||||
|
return room_data.timezone or "UTC+0"
|
||||||
|
|
||||||
|
def check_link_unique(
|
||||||
|
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
|
||||||
|
) -> bool:
|
||||||
|
"""Check if a link is unique within a room (not used by another bounty)."""
|
||||||
|
if not link:
|
||||||
|
return True
|
||||||
|
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return True
|
||||||
|
|
||||||
|
for bounty in room_data.bounties:
|
||||||
|
if bounty.deleted_at is not None:
|
||||||
|
continue
|
||||||
|
if bounty.link == link and bounty.id != exclude_bounty_id:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
def add_bounty(
|
def add_bounty(
|
||||||
self,
|
self,
|
||||||
room_id: int,
|
room_id: int,
|
||||||
@@ -28,7 +120,13 @@ class BountyService:
|
|||||||
link: Optional[str] = None,
|
link: Optional[str] = None,
|
||||||
due_date_ts: Optional[int] = None,
|
due_date_ts: Optional[int] = None,
|
||||||
) -> Bounty:
|
) -> Bounty:
|
||||||
"""Add a new bounty to the room."""
|
"""Add a new bounty to the room. Requires admin permission."""
|
||||||
|
if not self.is_admin(room_id, user_id):
|
||||||
|
raise PermissionError("Only admins can add bounties.")
|
||||||
|
|
||||||
|
if not self.check_link_unique(room_id, link):
|
||||||
|
raise ValueError("A bounty with this link already exists in this room.")
|
||||||
|
|
||||||
room_data = self._storage.load(room_id)
|
room_data = self._storage.load(room_id)
|
||||||
if room_data is None:
|
if room_data is None:
|
||||||
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
|
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
|
||||||
@@ -47,15 +145,20 @@ class BountyService:
|
|||||||
return bounty
|
return bounty
|
||||||
|
|
||||||
def list_bounties(self, room_id: int) -> list[Bounty]:
|
def list_bounties(self, room_id: int) -> list[Bounty]:
|
||||||
"""List all bounties in a room."""
|
"""List all non-deleted bounties in a room."""
|
||||||
room_data = self._storage.load(room_id)
|
return self._storage.list_bounties(room_id)
|
||||||
if room_data is None:
|
|
||||||
return []
|
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
|
||||||
return room_data.bounties
|
"""List all soft-deleted bounties in a room. For /recover functionality."""
|
||||||
|
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
||||||
|
return [b for b in all_bounties if b.deleted_at is not None]
|
||||||
|
|
||||||
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
|
||||||
"""Get a specific bounty by ID."""
|
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
|
||||||
return self._storage.get_bounty(room_id, bounty_id)
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||||
|
if bounty and bounty.deleted_at is not None:
|
||||||
|
return None
|
||||||
|
return bounty
|
||||||
|
|
||||||
def update_bounty(
|
def update_bounty(
|
||||||
self,
|
self,
|
||||||
@@ -68,12 +171,17 @@ class BountyService:
|
|||||||
clear_link: bool = False,
|
clear_link: bool = False,
|
||||||
clear_due: bool = False,
|
clear_due: bool = False,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Update a bounty. Only creator can update."""
|
"""Update a bounty. Only admins can update."""
|
||||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||||
if not bounty:
|
if not bounty:
|
||||||
return False
|
return False
|
||||||
if bounty.created_by_user_id != user_id:
|
if not self.is_admin(room_id, user_id):
|
||||||
raise PermissionError("Only the creator can edit this bounty.")
|
raise PermissionError("Only admins can edit bounties.")
|
||||||
|
|
||||||
|
if link and not self.check_link_unique(
|
||||||
|
room_id, link, exclude_bounty_id=bounty_id
|
||||||
|
):
|
||||||
|
raise ValueError("A bounty with this link already exists in this room.")
|
||||||
|
|
||||||
updated = Bounty(
|
updated = Bounty(
|
||||||
id=bounty.id,
|
id=bounty.id,
|
||||||
@@ -84,21 +192,51 @@ class BountyService:
|
|||||||
if clear_due
|
if clear_due
|
||||||
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
|
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
|
||||||
created_at=bounty.created_at,
|
created_at=bounty.created_at,
|
||||||
|
deleted_at=bounty.deleted_at,
|
||||||
|
created_by_username=bounty.created_by_username,
|
||||||
)
|
)
|
||||||
self._storage.update_bounty(room_id, updated)
|
self._storage.update_bounty(room_id, updated)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
|
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
|
||||||
"""Delete a bounty. Only creator can delete."""
|
"""Soft delete a bounty. Only admins can delete."""
|
||||||
bounty = self._storage.get_bounty(room_id, bounty_id)
|
bounty = self._storage.get_bounty(room_id, bounty_id)
|
||||||
if not bounty:
|
if not bounty:
|
||||||
return False
|
return False
|
||||||
if bounty.created_by_user_id != user_id:
|
if not self.is_admin(room_id, user_id):
|
||||||
raise PermissionError("Only the creator can delete this bounty.")
|
raise PermissionError("Only admins can delete bounties.")
|
||||||
|
|
||||||
self._storage.delete_bounty(room_id, bounty_id)
|
bounty.deleted_at = int(time.time())
|
||||||
|
self._storage.update_bounty(room_id, bounty)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def recover_bounty(
|
||||||
|
self, room_id: int, bounty_id: int, user_id: int
|
||||||
|
) -> tuple[bool, str]:
|
||||||
|
"""Recover a soft-deleted bounty. Only admins can recover.
|
||||||
|
|
||||||
|
Returns (success, message) tuple.
|
||||||
|
"""
|
||||||
|
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
|
||||||
|
bounty = None
|
||||||
|
for b in all_bounties:
|
||||||
|
if b.id == bounty_id:
|
||||||
|
bounty = b
|
||||||
|
break
|
||||||
|
|
||||||
|
if not bounty:
|
||||||
|
return False, f"Bounty #{bounty_id} not found."
|
||||||
|
|
||||||
|
if bounty.deleted_at is None:
|
||||||
|
return False, f"Bounty #{bounty_id} is not deleted."
|
||||||
|
|
||||||
|
if not self.is_admin(room_id, user_id):
|
||||||
|
raise PermissionError("Only admins can recover bounties.")
|
||||||
|
|
||||||
|
bounty.deleted_at = None
|
||||||
|
self._storage.update_bounty(room_id, bounty)
|
||||||
|
return True, f"Recovered bounty #{bounty_id}."
|
||||||
|
|
||||||
|
|
||||||
class TrackingService:
|
class TrackingService:
|
||||||
"""Service for tracking bounty operations."""
|
"""Service for tracking bounty operations."""
|
||||||
@@ -147,7 +285,7 @@ class TrackingService:
|
|||||||
if room_data is None:
|
if room_data is None:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
bounty_map = {b.id: b for b in room_data.bounties}
|
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
|
||||||
return [
|
return [
|
||||||
bounty_map[t.bounty_id]
|
bounty_map[t.bounty_id]
|
||||||
for t in tracking_data.tracked
|
for t in tracking_data.tracked
|
||||||
|
|||||||
@@ -15,18 +15,32 @@ class TestBountyService:
|
|||||||
"""Set up fresh storage and service for each test."""
|
"""Set up fresh storage and service for each test."""
|
||||||
self.storage = MockRoomStorage()
|
self.storage = MockRoomStorage()
|
||||||
self.service = BountyService(self.storage)
|
self.service = BountyService(self.storage)
|
||||||
|
self.admin_user_id = 123
|
||||||
|
self._make_admin(-1001, self.admin_user_id)
|
||||||
|
|
||||||
|
def _make_admin(self, room_id: int, user_id: int):
|
||||||
|
"""Helper to set up a room with an admin user."""
|
||||||
|
room_data = self.storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
room_data = RoomData(
|
||||||
|
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
|
||||||
|
)
|
||||||
|
if user_id not in (room_data.admin_user_ids or []):
|
||||||
|
room_data.admin_user_ids = room_data.admin_user_ids or []
|
||||||
|
room_data.admin_user_ids.append(user_id)
|
||||||
|
self.storage.save(room_data)
|
||||||
|
|
||||||
def test_add_bounty_creates_room_if_not_exists(self):
|
def test_add_bounty_creates_room_if_not_exists(self):
|
||||||
"""Test that add_bounty creates a new room if it doesn't exist."""
|
"""Test that add_bounty creates a new room if it doesn't exist."""
|
||||||
bounty = self.service.add_bounty(
|
bounty = self.service.add_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
text="Fix bug",
|
text="Fix bug",
|
||||||
link="https://github.com/issue/1",
|
link="https://github.com/issue/1",
|
||||||
)
|
)
|
||||||
assert bounty.id == 1
|
assert bounty.id == 1
|
||||||
assert bounty.text == "Fix bug"
|
assert bounty.text == "Fix bug"
|
||||||
assert bounty.created_by_user_id == 123
|
assert bounty.created_by_user_id == self.admin_user_id
|
||||||
|
|
||||||
room = self.storage.load(-1001)
|
room = self.storage.load(-1001)
|
||||||
assert room is not None
|
assert room is not None
|
||||||
@@ -34,14 +48,25 @@ class TestBountyService:
|
|||||||
|
|
||||||
def test_add_bounty_increments_id(self):
|
def test_add_bounty_increments_id(self):
|
||||||
"""Test that add_bounty increments bounty ID for each new bounty."""
|
"""Test that add_bounty increments bounty ID for each new bounty."""
|
||||||
b1 = self.service.add_bounty(room_id=-1001, user_id=123, text="First")
|
b1 = self.service.add_bounty(
|
||||||
b2 = self.service.add_bounty(room_id=-1001, user_id=123, text="Second")
|
room_id=-1001, user_id=self.admin_user_id, text="First"
|
||||||
b3 = self.service.add_bounty(room_id=-1001, user_id=123, text="Third")
|
)
|
||||||
|
b2 = self.service.add_bounty(
|
||||||
|
room_id=-1001, user_id=self.admin_user_id, text="Second"
|
||||||
|
)
|
||||||
|
b3 = self.service.add_bounty(
|
||||||
|
room_id=-1001, user_id=self.admin_user_id, text="Third"
|
||||||
|
)
|
||||||
|
|
||||||
assert b1.id == 1
|
assert b1.id == 1
|
||||||
assert b2.id == 2
|
assert b2.id == 2
|
||||||
assert b3.id == 3
|
assert b3.id == 3
|
||||||
|
|
||||||
|
def test_add_bounty_requires_admin(self):
|
||||||
|
"""Test that add_bounty raises PermissionError when non-admin tries to add."""
|
||||||
|
with pytest.raises(PermissionError, match="Only admins can add bounties"):
|
||||||
|
self.service.add_bounty(room_id=-1001, user_id=999, text="Not admin")
|
||||||
|
|
||||||
def test_list_bounties_empty_room(self):
|
def test_list_bounties_empty_room(self):
|
||||||
"""Test list_bounties returns empty list for non-existent room."""
|
"""Test list_bounties returns empty list for non-existent room."""
|
||||||
bounties = self.service.list_bounties(-1001)
|
bounties = self.service.list_bounties(-1001)
|
||||||
@@ -49,9 +74,15 @@ class TestBountyService:
|
|||||||
|
|
||||||
def test_list_bounties_returns_all_bounties(self):
|
def test_list_bounties_returns_all_bounties(self):
|
||||||
"""Test list_bounties returns all bounties in a room."""
|
"""Test list_bounties returns all bounties in a room."""
|
||||||
self.service.add_bounty(room_id=-1001, user_id=123, text="First")
|
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="First")
|
||||||
self.service.add_bounty(room_id=-1001, user_id=123, text="Second")
|
self.service.add_bounty(
|
||||||
self.service.add_bounty(room_id=-999, user_id=123, text="Other room")
|
room_id=-1001, user_id=self.admin_user_id, text="Second"
|
||||||
|
)
|
||||||
|
# Add bounty to different room to verify isolation
|
||||||
|
self._make_admin(-999, self.admin_user_id)
|
||||||
|
self.service.add_bounty(
|
||||||
|
room_id=-999, user_id=self.admin_user_id, text="Other room"
|
||||||
|
)
|
||||||
|
|
||||||
bounties = self.service.list_bounties(-1001)
|
bounties = self.service.list_bounties(-1001)
|
||||||
assert len(bounties) == 2
|
assert len(bounties) == 2
|
||||||
@@ -59,7 +90,9 @@ class TestBountyService:
|
|||||||
|
|
||||||
def test_get_bounty_found(self):
|
def test_get_bounty_found(self):
|
||||||
"""Test get_bounty returns bounty when it exists."""
|
"""Test get_bounty returns bounty when it exists."""
|
||||||
created = self.service.add_bounty(room_id=-1001, user_id=123, text="Test")
|
created = self.service.add_bounty(
|
||||||
|
room_id=-1001, user_id=self.admin_user_id, text="Test"
|
||||||
|
)
|
||||||
found = self.service.get_bounty(-1001, created.id)
|
found = self.service.get_bounty(-1001, created.id)
|
||||||
assert found is not None
|
assert found is not None
|
||||||
assert found.text == "Test"
|
assert found.text == "Test"
|
||||||
@@ -71,31 +104,35 @@ class TestBountyService:
|
|||||||
|
|
||||||
def test_get_bounty_wrong_room(self):
|
def test_get_bounty_wrong_room(self):
|
||||||
"""Test get_bounty returns None when bounty is in different room."""
|
"""Test get_bounty returns None when bounty is in different room."""
|
||||||
self.service.add_bounty(room_id=-1001, user_id=123, text="Test")
|
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="Test")
|
||||||
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
|
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
|
||||||
assert found is None
|
assert found is None
|
||||||
|
|
||||||
def test_update_bounty_success(self):
|
def test_update_bounty_success(self):
|
||||||
"""Test update_bounty succeeds when creator updates their bounty."""
|
"""Test update_bounty succeeds when admin updates their bounty."""
|
||||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="Original")
|
bounty = self.service.add_bounty(
|
||||||
|
room_id=-1001, user_id=self.admin_user_id, text="Original"
|
||||||
|
)
|
||||||
result = self.service.update_bounty(
|
result = self.service.update_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
bounty_id=bounty.id,
|
bounty_id=bounty.id,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
text="Updated",
|
text="Updated",
|
||||||
)
|
)
|
||||||
assert result is True
|
assert result is True
|
||||||
updated = self.service.get_bounty(-1001, bounty.id)
|
updated = self.service.get_bounty(-1001, bounty.id)
|
||||||
assert updated.text == "Updated"
|
assert updated.text == "Updated"
|
||||||
|
|
||||||
def test_update_bounty_not_creator_raises_permission_error(self):
|
def test_update_bounty_not_admin_raises_permission_error(self):
|
||||||
"""Test update_bounty raises PermissionError when non-creator tries to update."""
|
"""Test update_bounty raises PermissionError when non-admin tries to update."""
|
||||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="Original")
|
bounty = self.service.add_bounty(
|
||||||
with pytest.raises(PermissionError, match="Only the creator can edit"):
|
room_id=-1001, user_id=self.admin_user_id, text="Original"
|
||||||
|
)
|
||||||
|
with pytest.raises(PermissionError, match="Only admins can edit bounties"):
|
||||||
self.service.update_bounty(
|
self.service.update_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
bounty_id=bounty.id,
|
bounty_id=bounty.id,
|
||||||
user_id=999, # different user
|
user_id=999, # different user, not admin
|
||||||
text="Hacked",
|
text="Hacked",
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -104,7 +141,7 @@ class TestBountyService:
|
|||||||
result = self.service.update_bounty(
|
result = self.service.update_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
bounty_id=999,
|
bounty_id=999,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
text="Updated",
|
text="Updated",
|
||||||
)
|
)
|
||||||
assert result is False
|
assert result is False
|
||||||
@@ -113,14 +150,14 @@ class TestBountyService:
|
|||||||
"""Test update_bounty only updates provided fields."""
|
"""Test update_bounty only updates provided fields."""
|
||||||
bounty = self.service.add_bounty(
|
bounty = self.service.add_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
text="Original",
|
text="Original",
|
||||||
link="https://original.link",
|
link="https://original.link",
|
||||||
)
|
)
|
||||||
self.service.update_bounty(
|
self.service.update_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
bounty_id=bounty.id,
|
bounty_id=bounty.id,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
text="Updated only text",
|
text="Updated only text",
|
||||||
)
|
)
|
||||||
updated = self.service.get_bounty(-1001, bounty.id)
|
updated = self.service.get_bounty(-1001, bounty.id)
|
||||||
@@ -131,35 +168,46 @@ class TestBountyService:
|
|||||||
"""Test update_bounty can clear link."""
|
"""Test update_bounty can clear link."""
|
||||||
bounty = self.service.add_bounty(
|
bounty = self.service.add_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
text="Test",
|
text="Test",
|
||||||
link="https://original.link",
|
link="https://original.link",
|
||||||
)
|
)
|
||||||
self.service.update_bounty(
|
self.service.update_bounty(
|
||||||
room_id=-1001,
|
room_id=-1001,
|
||||||
bounty_id=bounty.id,
|
bounty_id=bounty.id,
|
||||||
user_id=123,
|
user_id=self.admin_user_id,
|
||||||
clear_link=True,
|
clear_link=True,
|
||||||
)
|
)
|
||||||
updated = self.service.get_bounty(-1001, bounty.id)
|
updated = self.service.get_bounty(-1001, bounty.id)
|
||||||
assert updated.link is None
|
assert updated.link is None
|
||||||
|
|
||||||
def test_delete_bounty_success(self):
|
def test_delete_bounty_success(self):
|
||||||
"""Test delete_bounty succeeds when creator deletes their bounty."""
|
"""Test delete_bounty soft deletes when admin deletes their bounty."""
|
||||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="To delete")
|
bounty = self.service.add_bounty(
|
||||||
result = self.service.delete_bounty(-1001, bounty.id, 123)
|
room_id=-1001, user_id=self.admin_user_id, text="To delete"
|
||||||
|
)
|
||||||
|
result = self.service.delete_bounty(-1001, bounty.id, self.admin_user_id)
|
||||||
assert result is True
|
assert result is True
|
||||||
|
# Soft delete - bounty should not be found via get_bounty
|
||||||
assert self.service.get_bounty(-1001, bounty.id) is None
|
assert self.service.get_bounty(-1001, bounty.id) is None
|
||||||
|
# But still exists in list_deleted_bounties
|
||||||
|
deleted = self.service.list_deleted_bounties(-1001)
|
||||||
|
assert len(deleted) == 1
|
||||||
|
assert deleted[0].id == bounty.id
|
||||||
|
|
||||||
def test_delete_bounty_not_creator_raises_permission_error(self):
|
def test_delete_bounty_not_admin_raises_permission_error(self):
|
||||||
"""Test delete_bounty raises PermissionError when non-creator tries to delete."""
|
"""Test delete_bounty raises PermissionError when non-admin tries to delete."""
|
||||||
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="To delete")
|
bounty = self.service.add_bounty(
|
||||||
with pytest.raises(PermissionError, match="Only the creator can delete"):
|
room_id=-1001, user_id=self.admin_user_id, text="To delete"
|
||||||
self.service.delete_bounty(-1001, bounty.id, 999) # different user
|
)
|
||||||
|
with pytest.raises(PermissionError, match="Only admins can delete bounties"):
|
||||||
|
self.service.delete_bounty(
|
||||||
|
-1001, bounty.id, 999
|
||||||
|
) # different user, not admin
|
||||||
|
|
||||||
def test_delete_bounty_not_found(self):
|
def test_delete_bounty_not_found(self):
|
||||||
"""Test delete_bounty returns False when bounty doesn't exist."""
|
"""Test delete_bounty returns False when bounty doesn't exist."""
|
||||||
result = self.service.delete_bounty(-1001, 999, 123)
|
result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
|
||||||
assert result is False
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
@@ -171,9 +219,27 @@ class TestTrackingService:
|
|||||||
self.room_storage = MockRoomStorage()
|
self.room_storage = MockRoomStorage()
|
||||||
self.tracking_storage = MockTrackingStorage()
|
self.tracking_storage = MockTrackingStorage()
|
||||||
self.service = TrackingService(self.tracking_storage, self.room_storage)
|
self.service = TrackingService(self.tracking_storage, self.room_storage)
|
||||||
|
self.admin_user_id = 123
|
||||||
|
self._make_admin(-1001, self.admin_user_id)
|
||||||
|
|
||||||
|
def _make_admin(self, room_id: int, user_id: int):
|
||||||
|
"""Helper to set up a room with an admin user."""
|
||||||
|
room_data = self.room_storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
room_data = RoomData(
|
||||||
|
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
|
||||||
|
)
|
||||||
|
if user_id not in (room_data.admin_user_ids or []):
|
||||||
|
room_data.admin_user_ids = room_data.admin_user_ids or []
|
||||||
|
room_data.admin_user_ids.append(user_id)
|
||||||
|
self.room_storage.save(room_data)
|
||||||
|
|
||||||
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
|
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
|
||||||
"""Helper to add a bounty for tracking tests."""
|
"""Helper to add a bounty for tracking tests."""
|
||||||
|
if self.room_storage.load(room_id) is None or user_id not in (
|
||||||
|
self.room_storage.load(room_id).admin_user_ids or []
|
||||||
|
):
|
||||||
|
self._make_admin(room_id, user_id)
|
||||||
bounty_service = BountyService(self.room_storage)
|
bounty_service = BountyService(self.room_storage)
|
||||||
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)
|
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user