Compare commits

..

4 Commits

Author SHA1 Message Date
shokollm
d38d47fb79 feat(/bounty): add pagination, sorting, and filtering
- Default shows 5 bounties per page
- /bounty 10 - show 10 bounties
- /bounty all - show all active (exclude overdue >24h)
- /bounty all 10 - show 10 including expired

Filtering:
- Overdue >24h filtered out by default
- 'all' flag includes overdue

Sorting:
- Bounties with due date sorted by due_date_ts (earliest first)
- Bounties without due date shown last, sorted by created_at

Output format updated:
- Header shows 'Showing X of Y bounties'
- Description sliced to 40 chars when showing pagination info
- Date format changed to '4 Apr 2026' style

Fixes #48
2026-04-04 05:54:56 +00:00
42ed551554 Merge pull request 'feat: implement service layer for Phase 2' (#58) from fix/issue-43 into main 2026-04-04 07:36:09 +02:00
shokollm
af7774ef03 feat: implement service layer for Phase 2 - admin management, timezone, soft delete
BountyService:
- Add is_admin(), add_admin(), remove_admin(), list_admins()
- Add set_timezone(), get_timezone()
- Add check_link_unique(), list_deleted_bounties()
- Modify add_bounty() to check link uniqueness and require admin
- Modify update_bounty() to require admin permission (not creator)
- Modify delete_bounty() to perform soft delete (set deleted_at)
- get_bounty() now filters out soft-deleted bounties
- list_bounties() uses storage.list_bounties() which excludes soft-deleted

TrackingService:
- get_tracked_bounties() now filters out soft-deleted bounties

Tests updated to reflect new admin-only permissions and soft delete behavior.
2026-04-04 05:27:40 +00:00
0a64b4f310 Merge pull request 'feat: add list_bounties and list_all_bounties methods to storage adapter' (#57) from fix/issue-42 into main 2026-04-04 07:17:07 +02:00
3 changed files with 281 additions and 56 deletions

View File

@@ -48,21 +48,24 @@ def parse_args(args: list[str]) -> tuple[Optional[str], Optional[str], Optional[
return text, link, due_date_ts
def format_bounty(b, show_id: bool = True) -> str:
def format_bounty(b, show_id: bool = True, slice_length: int = 0) -> str:
parts = []
if show_id:
parts.append(f"[#{b.id}]")
if b.text:
parts.append(b.text)
text = b.text
if slice_length > 0 and len(text) > slice_length:
text = text[:slice_length] + "..."
parts.append(text)
if b.link:
parts.append(f"🔗 {b.link}")
if b.due_date_ts:
due_str = time.strftime("%Y-%m-%d", time.localtime(b.due_date_ts))
due_str = time.strftime("%d %b %Y", time.localtime(b.due_date_ts))
days_left = (b.due_date_ts - int(time.time())) // 86400
if days_left < 0:
parts.append(f"{due_str} (OVERDUE)")
elif days_left == 0:
parts.append(f"{due_str} (TODAY)")
parts.append(f"Today (OVERDUE)")
else:
parts.append(f"{due_str} ({days_left}d)")
if b.created_by_user_id:
@@ -95,13 +98,58 @@ def get_room_id(update: Update) -> int:
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update)
bounties = BOUNTY_SERVICE.list_bounties(room_id)
args = extract_args(update.message.text)
if not bounties:
await update.message.reply_text("No bounties yet.")
show_all = "all" in args
args = [a for a in args if a != "all"]
try:
limit = int(args[0]) if args else 5
except (ValueError, IndexError):
limit = 5
now = int(time.time())
cutoff_24h = now - 86400
all_bounties = BOUNTY_SERVICE.list_bounties(room_id)
def is_expired(b) -> bool:
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
def sort_key(b):
if b.due_date_ts is not None:
return (0, b.due_date_ts)
return (1, b.created_at)
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
filtered_bounties.sort(key=sort_key)
total_count = len(filtered_bounties)
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
if show_all:
await update.message.reply_text("No bounties yet.")
else:
await update.message.reply_text(
"No active bounties. Use /bounty all to show expired."
)
return
lines = [format_bounty(b, show_id=True) for b in bounties]
lines = []
if limit < total_count:
lines.append(f"Showing {limit} of {total_count} bounties:")
slice_length = 40
elif show_all and total_count > limit:
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
slice_length = 40
else:
lines.append(f"Showing {total_count} bounties:")
slice_length = 0
for b in displayed_bounties:
lines.append(format_bounty(b, show_id=True, slice_length=slice_length))
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)

View File

@@ -15,11 +15,103 @@ class BountyService:
- Positive room_id: DM/personal context (user's Telegram ID)
This service handles both group and personal bounties through room_id.
Permissions:
- /add, /edit, /delete: admin only
- /admin, /admin add, /admin remove: admin only
- /bounty, /show, /track, /untrack, /my: everyone
"""
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, user_id: int) -> bool:
"""Check if user is admin in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return False
return user_id in (room_data.admin_user_ids or [])
def add_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
) -> None:
"""Add an admin to a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can add admins.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
)
if admin_user_id not in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).append(admin_user_id)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None:
return
if admin_user_id in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).remove(admin_user_id)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[int]:
"""List all admin user IDs in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_user_ids or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_user_id: int
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
)
room_data.timezone = timezone
self._storage.save(room_data)
def get_timezone(self, room_id: int) -> str:
"""Get the timezone for a room. Returns UTC+0 if not set."""
room_data = self._storage.load(room_id)
if room_data is None:
return "UTC+0"
return room_data.timezone or "UTC+0"
def check_link_unique(
self, room_id: int, link: str | None, exclude_bounty_id: int | None = None
) -> bool:
"""Check if a link is unique within a room (not used by another bounty)."""
if not link:
return True
room_data = self._storage.load(room_id)
if room_data is None:
return True
for bounty in room_data.bounties:
if bounty.deleted_at is not None:
continue
if bounty.link == link and bounty.id != exclude_bounty_id:
return False
return True
def add_bounty(
self,
room_id: int,
@@ -28,7 +120,13 @@ class BountyService:
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
) -> Bounty:
"""Add a new bounty to the room."""
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
raise ValueError("A bounty with this link already exists in this room.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(room_id=room_id, bounties=[], next_id=1)
@@ -47,15 +145,20 @@ class BountyService:
return bounty
def list_bounties(self, room_id: int) -> list[Bounty]:
"""List all bounties in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return room_data.bounties
"""List all non-deleted bounties in a room."""
return self._storage.list_bounties(room_id)
def list_deleted_bounties(self, room_id: int) -> list[Bounty]:
"""List all soft-deleted bounties in a room. For /recover functionality."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
return [b for b in all_bounties if b.deleted_at is not None]
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty by ID."""
return self._storage.get_bounty(room_id, bounty_id)
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if bounty and bounty.deleted_at is not None:
return None
return bounty
def update_bounty(
self,
@@ -68,12 +171,17 @@ class BountyService:
clear_link: bool = False,
clear_due: bool = False,
) -> bool:
"""Update a bounty. Only creator can update."""
"""Update a bounty. Only admins can update."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if bounty.created_by_user_id != user_id:
raise PermissionError("Only the creator can edit this bounty.")
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
room_id, link, exclude_bounty_id=bounty_id
):
raise ValueError("A bounty with this link already exists in this room.")
updated = Bounty(
id=bounty.id,
@@ -84,19 +192,22 @@ class BountyService:
if clear_due
else (due_date_ts if due_date_ts is not None else bounty.due_date_ts),
created_at=bounty.created_at,
deleted_at=bounty.deleted_at,
created_by_username=bounty.created_by_username,
)
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
"""Delete a bounty. Only creator can delete."""
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if bounty.created_by_user_id != user_id:
raise PermissionError("Only the creator can delete this bounty.")
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can delete bounties.")
self._storage.delete_bounty(room_id, bounty_id)
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
return True
@@ -147,7 +258,7 @@ class TrackingService:
if room_data is None:
return []
bounty_map = {b.id: b for b in room_data.bounties}
bounty_map = {b.id: b for b in room_data.bounties if b.deleted_at is None}
return [
bounty_map[t.bounty_id]
for t in tracking_data.tracked

View File

@@ -15,18 +15,32 @@ class TestBountyService:
"""Set up fresh storage and service for each test."""
self.storage = MockRoomStorage()
self.service = BountyService(self.storage)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
def _make_admin(self, room_id: int, user_id: int):
"""Helper to set up a room with an admin user."""
room_data = self.storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
self.storage.save(room_data)
def test_add_bounty_creates_room_if_not_exists(self):
"""Test that add_bounty creates a new room if it doesn't exist."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=123,
user_id=self.admin_user_id,
text="Fix bug",
link="https://github.com/issue/1",
)
assert bounty.id == 1
assert bounty.text == "Fix bug"
assert bounty.created_by_user_id == 123
assert bounty.created_by_user_id == self.admin_user_id
room = self.storage.load(-1001)
assert room is not None
@@ -34,14 +48,25 @@ class TestBountyService:
def test_add_bounty_increments_id(self):
"""Test that add_bounty increments bounty ID for each new bounty."""
b1 = self.service.add_bounty(room_id=-1001, user_id=123, text="First")
b2 = self.service.add_bounty(room_id=-1001, user_id=123, text="Second")
b3 = self.service.add_bounty(room_id=-1001, user_id=123, text="Third")
b1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="First"
)
b2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
)
b3 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Third"
)
assert b1.id == 1
assert b2.id == 2
assert b3.id == 3
def test_add_bounty_requires_admin(self):
"""Test that add_bounty raises PermissionError when non-admin tries to add."""
with pytest.raises(PermissionError, match="Only admins can add bounties"):
self.service.add_bounty(room_id=-1001, user_id=999, text="Not admin")
def test_list_bounties_empty_room(self):
"""Test list_bounties returns empty list for non-existent room."""
bounties = self.service.list_bounties(-1001)
@@ -49,9 +74,15 @@ class TestBountyService:
def test_list_bounties_returns_all_bounties(self):
"""Test list_bounties returns all bounties in a room."""
self.service.add_bounty(room_id=-1001, user_id=123, text="First")
self.service.add_bounty(room_id=-1001, user_id=123, text="Second")
self.service.add_bounty(room_id=-999, user_id=123, text="Other room")
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="First")
self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
)
# Add bounty to different room to verify isolation
self._make_admin(-999, self.admin_user_id)
self.service.add_bounty(
room_id=-999, user_id=self.admin_user_id, text="Other room"
)
bounties = self.service.list_bounties(-1001)
assert len(bounties) == 2
@@ -59,7 +90,9 @@ class TestBountyService:
def test_get_bounty_found(self):
"""Test get_bounty returns bounty when it exists."""
created = self.service.add_bounty(room_id=-1001, user_id=123, text="Test")
created = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Test"
)
found = self.service.get_bounty(-1001, created.id)
assert found is not None
assert found.text == "Test"
@@ -71,31 +104,35 @@ class TestBountyService:
def test_get_bounty_wrong_room(self):
"""Test get_bounty returns None when bounty is in different room."""
self.service.add_bounty(room_id=-1001, user_id=123, text="Test")
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="Test")
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
assert found is None
def test_update_bounty_success(self):
"""Test update_bounty succeeds when creator updates their bounty."""
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="Original")
"""Test update_bounty succeeds when admin updates their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Original"
)
result = self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=123,
user_id=self.admin_user_id,
text="Updated",
)
assert result is True
updated = self.service.get_bounty(-1001, bounty.id)
assert updated.text == "Updated"
def test_update_bounty_not_creator_raises_permission_error(self):
"""Test update_bounty raises PermissionError when non-creator tries to update."""
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="Original")
with pytest.raises(PermissionError, match="Only the creator can edit"):
def test_update_bounty_not_admin_raises_permission_error(self):
"""Test update_bounty raises PermissionError when non-admin tries to update."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Original"
)
with pytest.raises(PermissionError, match="Only admins can edit bounties"):
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=999, # different user
user_id=999, # different user, not admin
text="Hacked",
)
@@ -104,7 +141,7 @@ class TestBountyService:
result = self.service.update_bounty(
room_id=-1001,
bounty_id=999,
user_id=123,
user_id=self.admin_user_id,
text="Updated",
)
assert result is False
@@ -113,14 +150,14 @@ class TestBountyService:
"""Test update_bounty only updates provided fields."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=123,
user_id=self.admin_user_id,
text="Original",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=123,
user_id=self.admin_user_id,
text="Updated only text",
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -131,35 +168,46 @@ class TestBountyService:
"""Test update_bounty can clear link."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=123,
user_id=self.admin_user_id,
text="Test",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=123,
user_id=self.admin_user_id,
clear_link=True,
)
updated = self.service.get_bounty(-1001, bounty.id)
assert updated.link is None
def test_delete_bounty_success(self):
"""Test delete_bounty succeeds when creator deletes their bounty."""
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="To delete")
result = self.service.delete_bounty(-1001, bounty.id, 123)
"""Test delete_bounty soft deletes when admin deletes their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_user_id)
assert result is True
# Soft delete - bounty should not be found via get_bounty
assert self.service.get_bounty(-1001, bounty.id) is None
# But still exists in list_deleted_bounties
deleted = self.service.list_deleted_bounties(-1001)
assert len(deleted) == 1
assert deleted[0].id == bounty.id
def test_delete_bounty_not_creator_raises_permission_error(self):
"""Test delete_bounty raises PermissionError when non-creator tries to delete."""
bounty = self.service.add_bounty(room_id=-1001, user_id=123, text="To delete")
with pytest.raises(PermissionError, match="Only the creator can delete"):
self.service.delete_bounty(-1001, bounty.id, 999) # different user
def test_delete_bounty_not_admin_raises_permission_error(self):
"""Test delete_bounty raises PermissionError when non-admin tries to delete."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
)
with pytest.raises(PermissionError, match="Only admins can delete bounties"):
self.service.delete_bounty(
-1001, bounty.id, 999
) # different user, not admin
def test_delete_bounty_not_found(self):
"""Test delete_bounty returns False when bounty doesn't exist."""
result = self.service.delete_bounty(-1001, 999, 123)
result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
assert result is False
@@ -171,9 +219,27 @@ class TestTrackingService:
self.room_storage = MockRoomStorage()
self.tracking_storage = MockTrackingStorage()
self.service = TrackingService(self.tracking_storage, self.room_storage)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
def _make_admin(self, room_id: int, user_id: int):
"""Helper to set up a room with an admin user."""
room_data = self.room_storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
self.room_storage.save(room_data)
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
"""Helper to add a bounty for tracking tests."""
if self.room_storage.load(room_id) is None or user_id not in (
self.room_storage.load(room_id).admin_user_ids or []
):
self._make_admin(room_id, user_id)
bounty_service = BountyService(self.room_storage)
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)