Compare commits

..

1 Commits

Author SHA1 Message Date
shokollm
5dee75d7d0 feat: implement /admin add @username command
- Add cmd_admin handler for /admin add|remove @username
- Add _find_user_id_by_username helper to resolve usernames from bounty creators
- Register admin command handler in bot.py
- Add 'admin' to bot command list
- Fix pre-existing duplicate due_str bug in cmd_add
- Fixes #51
2026-04-04 08:15:20 +00:00
20 changed files with 335 additions and 2228 deletions

10
SPEC.md
View File

@@ -8,7 +8,7 @@
JIGAIDO is a Telegram bot that lets groups and individuals track bounties — tasks, obligations, and deadlines — with optional due dates and personal tracking.
- **Group mode**: Each Telegram group has its own bounty list. Admins can add/edit/delete bounties. Anyone can track.
- **Group mode**: Each Telegram group has its own bounty list. Anyone can add bounties. Only creator can edit/delete.
- **DM mode**: Personal bounty list. Anyone can manage their own bounties.
- **Tracking**: Users can track any bounty (group or personal) to their tracking list.
- **Due dates**: Free-form text (`"april 15"`, `"in 3 days"`, `"tomorrow"`) parsed at add time, stored as Unix timestamp. If unparseable, stored as `NULL`.
@@ -122,9 +122,9 @@ Data is stored at `~/.jigaido/` (home directory), NOT inside the repository.
|---|---|---|
| `/bounty` | anyone | List all bounties in this group |
| `/my` | anyone | List bounties tracked by you in this group |
| `/add <text> [link] [due date]` | admin only | Add a new bounty to the group |
| `/edit <bounty_id> [text] [link] [due_date]` | admin only | Edit an existing bounty |
| `/delete <bounty_id>` | admin only | Delete a bounty |
| `/add <text> [link] [due date]` | anyone | Add a new bounty to the group |
| `/edit <bounty_id> [text] [link] [due_date]` | creator only | Edit an existing bounty |
| `/delete <bounty_id>` | creator only | Delete a bounty |
| `/track <bounty_id>` | anyone | Track a group bounty |
| `/untrack <bounty_id>` | anyone | Stop tracking a bounty |
@@ -170,7 +170,7 @@ Stored as Unix timestamp. User-facing display can be localized/converted to any
## Error Handling
- Unknown command → help text with available commands
- `/add`/`/edit`/`/delete` by non-admin → "⛔ Only admins can add/edit/delete bounties."
- `/edit`/`/delete` by non-creator → "⛔ Only the creator can edit/delete this bounty."
- `/track` already tracked → "Already tracking" (idempotent)
- `/untrack` not tracked → "Not tracking" (idempotent)
- Bounty not found → "Bounty not found"

View File

@@ -11,7 +11,8 @@ import os
import tempfile
from pathlib import Path
from core.models import Bounty, Category, RoomData, TrackingData, TrackedBounty
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
from core.ports import RoomStorage, TrackingStorage
class JsonFileRoomStorage:
@@ -57,28 +58,16 @@ class JsonFileRoomStorage:
created_by_user_id=b["created_by_user_id"],
deleted_at=b.get("deleted_at"),
created_by_username=b.get("created_by_username"),
category_ids=b.get("category_ids", []),
)
for b in data.get("bounties", [])
]
categories = [
Category(
id=c["id"],
name=c["name"],
created_at=c["created_at"],
deleted_at=c.get("deleted_at"),
)
for c in data.get("categories", [])
]
return RoomData(
room_id=data["room_id"],
bounties=bounties,
next_id=data["next_id"],
timezone=data.get("timezone"),
admin_usernames=data.get("admin_usernames", []),
categories=categories,
admin_user_ids=data.get("admin_user_ids", []),
)
def save(self, room_data: RoomData) -> None:
@@ -87,16 +76,7 @@ class JsonFileRoomStorage:
"room_id": room_data.room_id,
"next_id": room_data.next_id,
"timezone": room_data.timezone,
"admin_usernames": room_data.admin_usernames or [],
"categories": [
{
"id": c.id,
"name": c.name,
"created_at": c.created_at,
"deleted_at": c.deleted_at,
}
for c in room_data.categories
],
"admin_user_ids": room_data.admin_user_ids or [],
"bounties": [
{
"id": b.id,
@@ -107,7 +87,6 @@ class JsonFileRoomStorage:
"created_by_user_id": b.created_by_user_id,
"deleted_at": b.deleted_at,
"created_by_username": b.created_by_username,
"category_ids": b.category_ids,
}
for b in room_data.bounties
],
@@ -140,6 +119,15 @@ class JsonFileRoomStorage:
self.save(room_data)
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
"""Delete a bounty from a room."""
room_data = self.load(room_id)
if room_data is None:
return
room_data.bounties = [b for b in room_data.bounties if b.id != bounty_id]
self.save(room_data)
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty from a room by ID."""
room_data = self.load(room_id)

View File

@@ -1,30 +1,21 @@
"""JIGAIDO Telegram bot entrypoint."""
import logging
import os
import sys
sys.path.insert(0, "/home/shoko/repositories/jigaido")
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
)
from telegram.ext import Application, CommandHandler, MessageHandler, filters
from commands import (
cmd_add,
cmd_admin,
cmd_bounty,
cmd_category,
cmd_delete,
cmd_delete_message,
cmd_edit,
cmd_help,
cmd_my,
cmd_recover,
cmd_show,
cmd_start,
cmd_timezone,
cmd_track,
cmd_untrack,
cmd_update,
@@ -36,13 +27,7 @@ logging.basicConfig(
)
log = logging.getLogger(__name__)
from config import config # noqa: E402
BOT_TOKEN = config.bot_token or ""
async def error_handler(update, context):
log.error(f"Error: {context.error}")
BOT_TOKEN = os.environ.get("JIGAIDO_BOT_TOKEN", "")
def build_app() -> Application:
@@ -59,14 +44,9 @@ def build_app() -> Application:
app.add_handler(CommandHandler("track", cmd_track))
app.add_handler(CommandHandler("untrack", cmd_untrack))
app.add_handler(CommandHandler("show", cmd_show))
app.add_handler(CommandHandler("timezone", cmd_timezone))
app.add_handler(CommandHandler("admin", cmd_admin))
app.add_handler(CommandHandler("recover", cmd_recover))
app.add_handler(CommandHandler("category", cmd_category))
app.add_handler(CallbackQueryHandler(cmd_delete_message))
app.add_error_handler(error_handler)
app.add_handler(MessageHandler(filters.COMMAND, cmd_help))
return app
@@ -81,18 +61,13 @@ async def post_init(app: Application) -> None:
("track", "Track a bounty"),
("untrack", "Stop tracking"),
("show", "Show bounty details"),
("timezone", "Get/set room timezone"),
("admin", "Manage admins"),
("recover", "Recover deleted bounties"),
("category", "Manage categories"),
("help", "Show help"),
]
)
def main() -> None:
import asyncio
if not BOT_TOKEN:
log.error("JIGAIDO_BOT_TOKEN environment variable not set.")
sys.exit(1)
@@ -101,11 +76,6 @@ def main() -> None:
app.post_init = post_init
log.info("JIGAIDO starting...")
# Python 3.14 compatibility: ensure event loop exists
try:
asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
app.run_polling(drop_pending_updates=True)

File diff suppressed because it is too large Load Diff

View File

@@ -1,28 +0,0 @@
#!/usr/bin/env python3
import asyncio
import os
import sys
# Run from the telegram-bot directory so local imports work
os.chdir("/home/shoko/repositories/jigaido/apps/telegram-bot")
sys.path.insert(0, "/home/shoko/repositories/jigaido")
# Import main from the local bot module
import bot as bot_module # noqa: E402
if __name__ == "__main__":
if not bot_module.BOT_TOKEN:
bot_module.log.error("JIGAIDO_BOT_TOKEN environment variable not set.")
sys.exit(1)
app = bot_module.build_app()
app.post_init = bot_module.post_init
bot_module.log.info("JIGAIDO starting...")
# PTB v20+ app.run_polling() is async - use asyncio.get_event_loop() + run_until_complete
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(app.run_polling(drop_pending_updates=True))
finally:
loop.close()

View File

@@ -1,8 +1,10 @@
"""Pytest fixtures for telegram-bot tests."""
import sys
import tempfile
from pathlib import Path
import pytest
# Add the app directory to path so imports work when running pytest
sys.path.insert(0, str(Path(__file__).parent.parent))

View File

@@ -1,11 +1,11 @@
"""Tests for commands.py — parsing, formatting, and command handlers."""
import time
from unittest.mock import MagicMock, patch, AsyncMock
from unittest.mock import MagicMock, patch, AsyncMock, sentinel
import pytest
from telegram import Update, Message, User, Chat
from telegram import Update, Message, User, Chat, CallbackQuery
from telegram.ext import ContextTypes
from commands import (
@@ -52,97 +52,75 @@ class TestExtractArgs:
class TestParseArgs:
def test_text_only(self):
text, link, due, _, _ = parse_args(["hello", "world"])
text, link, due = parse_args(["hello", "world"])
assert text == "hello world"
assert link is None
assert due is None
def test_link_extracted(self):
text, link, due, _, _ = parse_args(["hello", "https://example.com"])
text, link, due = parse_args(["hello", "https://example.com"])
# "hello" is non-link non-date → becomes text; only the URL becomes link
assert text == "hello"
assert link == "https://example.com"
assert due is None
def test_text_and_link(self):
text, link, due, _, _ = parse_args(["hello", "world", "https://example.com"])
text, link, due = parse_args(["hello", "world", "https://example.com"])
assert text == "hello world"
assert link == "https://example.com"
def test_due_date_parsed(self):
text, link, due, _, _ = parse_args(["hello", "tomorrow"])
text, link, due = parse_args(["hello", "tomorrow"])
assert text == "hello"
assert due is not None
# Should be some time in the future
assert due > int(time.time())
def test_all_three(self):
text, link, due, _, _ = parse_args(["hello", "https://example.com", "tomorrow"])
text, link, due = parse_args(["hello", "https://example.com", "tomorrow"])
assert text == "hello"
assert link == "https://example.com"
assert due is not None
def test_http_and_https_both_detected(self):
_, link1, _, _, _ = parse_args(["http://example.com"])
_, link2, _, _, _ = parse_args(["https://example.com"])
_, link1, _ = parse_args(["http://example.com"])
_, link2, _ = parse_args(["https://example.com"])
assert link1 == "http://example.com"
assert link2 == "https://example.com"
def test_non_url_non_date_becomes_text(self):
text, link, due, _, _ = parse_args(["fix", "the", "bug"])
text, link, due = parse_args(["fix", "the", "bug"])
assert text == "fix the bug"
assert link is None
assert due is None
def test_multiple_links_first_only(self):
_, link, _, _, _ = parse_args(["text", "https://first.com", "https://second.com"])
_, link, _ = parse_args(["text", "https://first.com", "https://second.com"])
assert link == "https://first.com"
def test_due_date_after_link(self):
text, link, due, _, _ = parse_args(["task", "https://example.com", "in 5 days"])
text, link, due = parse_args(["task", "https://example.com", "in 5 days"])
assert text == "task"
assert link == "https://example.com"
assert due is not None
def test_empty_args(self):
text, link, due, _, _ = parse_args([])
text, link, due = parse_args([])
assert text is None
assert link is None
assert due is None
def test_date_parser_failure_returns_none(self):
# "asdfjkl" is not parseable → goes to text
text, link, due, _, _ = parse_args(["hello", "asdfjkl"])
text, link, due = parse_args(["hello", "asdfjkl"])
assert text == "hello asdfjkl"
assert due is None
def test_link_takes_first_match(self):
# Even if it's not a valid URL, starts with https://
_, link, _, _, _ = parse_args(["skip", "https://not-real.but-still-a-link"])
_, link, _ = parse_args(["skip", "https://not-real.but-still-a-link"])
assert link == "https://not-real.but-still-a-link"
def test_url_without_scheme_normalized_to_https(self):
"""URLs without scheme should get https:// prefix."""
_, link, _, _, _ = parse_args(["github.com/user/repo"])
assert link == "https://github.com/user/repo"
def test_url_without_scheme_github_normalized(self):
text, link, _, _, _ = parse_args(["Fix bug", "github.com/owner/repo"])
assert text == "Fix bug"
assert link == "https://github.com/owner/repo"
def test_url_with_explicit_https_unchanged(self):
_, link, _, _, _ = parse_args(["task", "https://example.com/page"])
assert link == "https://example.com/page"
def test_url_with_http_unchanged(self):
_, link, _, _, _ = parse_args(["task", "http://example.com/page"])
assert link == "http://example.com/page"
def test_url_link_flag_without_scheme_normalized(self):
_, link, _, _, _ = parse_args(["-link", "example.com/path"])
assert link == "https://example.com/path"
class TestFormatBounty:
def _row(
@@ -390,6 +368,7 @@ class TestCmdAdd:
@pytest.mark.asyncio
async def test_add_needs_text_or_link(self):
update = create_mock_update(message_text="/add")
ctx = MagicMock(spec=ContextTypes.DEFAULT_TYPE)
_, link, _ = parse_args([])
if not "test" and not link:

View File

@@ -2,6 +2,7 @@
import argparse
import sys
from pathlib import Path
import dateparser

View File

@@ -13,21 +13,7 @@ class Config:
def __init__(self):
self.data_dir: Path = self._resolve_data_dir()
self.bot_token: Optional[str] = self._resolve_bot_token()
def _resolve_bot_token(self) -> Optional[str]:
env_token = os.environ.get("JIGAIDO_BOT_TOKEN")
if env_token:
return env_token
config_file = Path("~/.jigaido/config.json").expanduser()
if config_file.exists():
with open(config_file) as f:
config_data = json.load(f)
if "JIGAIDO_BOT_TOKEN" in config_data:
return config_data["JIGAIDO_BOT_TOKEN"]
return None
self.bot_token: Optional[str] = os.environ.get("JIGAIDO_BOT_TOKEN")
def _resolve_data_dir(self) -> Path:
env_dir = os.environ.get("JIGAIDO_DATA_DIR")
@@ -49,6 +35,3 @@ class Config:
config = Config()
config = Config()

View File

@@ -1,20 +1,6 @@
"""Domain dataclasses for JIGAIDO bounty tracker."""
from dataclasses import dataclass, field
@dataclass
class Category:
"""A category for organizing bounties in a room.
Categories are per-room and support soft delete.
The id (slug) must be lowercase alphabetic only (e.g., "bug", "feature").
"""
id: str # slug: lowercase alphabetic only, e.g., "bug", "feature"
name: str # display name: e.g., "Bug", "Feature"
created_at: int
deleted_at: int | None = None # soft delete
from dataclasses import dataclass
@dataclass
@@ -26,8 +12,6 @@ class Bounty:
The deleted_at field indicates soft-delete: None means not deleted,
a value means deleted at that Unix timestamp.
The category_ids field lists category slugs associated with this bounty.
"""
id: int
@@ -38,7 +22,6 @@ class Bounty:
created_by_user_id: int
deleted_at: int | None = None
created_by_username: str | None = None
category_ids: list[str] = field(default_factory=list)
@dataclass
@@ -61,22 +44,18 @@ class RoomData:
The next_id field is used to generate unique bounty IDs within this room.
The timezone field stores the room's timezone (e.g., "Asia/Jakarta"), default UTC+0.
The admin_usernames field lists usernames who have admin privileges in this room.
The categories field contains all categories for organizing bounties in this room.
The admin_user_ids field lists users who have admin privileges in this room.
"""
room_id: int
bounties: list[Bounty]
next_id: int
timezone: str | None = None
admin_usernames: list[str] | None = None
categories: list[Category] = field(default_factory=list)
admin_user_ids: list[int] | None = None
def __post_init__(self):
if self.admin_usernames is None:
self.admin_usernames = []
if self.categories is None:
self.categories = []
if self.admin_user_ids is None:
self.admin_user_ids = []
@dataclass

View File

@@ -32,6 +32,10 @@ class RoomStorage(Protocol):
"""Update an existing bounty in a room."""
...
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
"""Delete a bounty from a room."""
...
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty from a room by ID."""
...

View File

@@ -3,7 +3,7 @@
import time
from typing import Optional
from core.models import Bounty, Category, RoomData, TrackedBounty, TrackingData
from core.models import Bounty, RoomData, TrackedBounty, TrackingData
from core.ports import RoomStorage, TrackingStorage
@@ -25,79 +25,63 @@ class BountyService:
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, username: str | None) -> bool:
"""Check if user is admin in a room by username."""
if not username:
return False
def is_admin(self, room_id: int, user_id: int) -> bool:
"""Check if user is admin in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return False
return username in (room_data.admin_usernames or [])
return user_id in (room_data.admin_user_ids or [])
def add_admin(
self, room_id: int, username: str, requesting_username: str | None
self, room_id: int, admin_user_id: int, requesting_user_id: int
) -> None:
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
room_data = self._storage.load(room_id)
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
"""Add an admin to a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can add admins.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
)
admin_usernames = room_data.admin_usernames
if admin_usernames is None:
admin_usernames = []
room_data.admin_usernames = []
if username in admin_usernames:
raise ValueError(f"@{username} is already an admin.")
admin_usernames.append(username)
if admin_user_id not in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).append(admin_user_id)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, username: str, requesting_username: str | None
self, room_id: int, admin_user_id: int, requesting_user_id: int
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_username):
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None or not (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
if room_data is None:
return
if username not in (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
(room_data.admin_usernames or []).remove(username)
if admin_user_id in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).remove(admin_user_id)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[str]:
"""List all admin usernames in a room."""
def list_admins(self, room_id: int) -> list[int]:
"""List all admin user IDs in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_usernames or [])
return list(room_data.admin_user_ids or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_username: str | None
self, room_id: int, timezone: str, requesting_user_id: int
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_username):
if not self.is_admin(room_id, requesting_user_id):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
)
room_data.timezone = timezone
@@ -132,14 +116,12 @@ class BountyService:
self,
room_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
created_by_username: Optional[str] = None,
) -> Bounty:
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, username):
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
@@ -154,7 +136,6 @@ class BountyService:
bounty = Bounty(
id=room_data.next_id,
created_by_user_id=user_id,
created_by_username=created_by_username,
text=text,
link=link,
due_date_ts=due_date_ts,
@@ -172,44 +153,6 @@ class BountyService:
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
return [b for b in all_bounties if b.deleted_at is not None]
def get_deleted_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific soft-deleted bounty by ID."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
for b in all_bounties:
if b.id == bounty_id and b.deleted_at is not None:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, username: str | None) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, username):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
if not bounty:
return "not_found"
if bounty.deleted_at is None:
return "not_deleted"
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty)
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
Returns dict of bounty_id -> result ('recovered', 'not_found', 'not_deleted', 'permission_denied')
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, username)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty by ID. Excludes soft-deleted bounties."""
bounty = self._storage.get_bounty(room_id, bounty_id)
@@ -221,7 +164,7 @@ class BountyService:
self,
room_id: int,
bounty_id: int,
username: str | None,
user_id: int,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
@@ -232,7 +175,7 @@ class BountyService:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, username):
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
@@ -255,268 +198,18 @@ class BountyService:
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, username: str | None) -> bool:
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, username):
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can delete bounties.")
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
return True
def delete_bounties(
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
Results can be: 'deleted', 'not_found', 'permission_denied'
"""
results = {}
for bounty_id in bounty_ids:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, username):
results[bounty_id] = "permission_denied"
continue
bounty.deleted_at = int(time.time())
self._storage.update_bounty(room_id, bounty)
results[bounty_id] = "deleted"
return results
# --- Category Management ---
def add_category(
self,
room_id: int,
slug: str,
name: str,
username: str | None,
) -> Category:
"""Create a new category. Admin only.
Args:
room_id: Room identifier
slug: Category ID (lowercase alphabetic, e.g., "bug")
name: Display name (e.g., "Bug Report")
username: Requesting admin's username
Returns:
Created Category
Raises:
PermissionError: If not admin
ValueError: If slug already exists or invalid
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can add categories.")
# Validate slug format (lowercase alphabetic only)
if not slug or not slug.isalpha() or not slug.islower():
raise ValueError(
"Category slug must be lowercase alphabetic only (e.g., 'bug', 'feature')."
)
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_usernames=[], categories=[]
)
# Check for duplicate slug
for cat in room_data.categories:
if cat.id == slug and cat.deleted_at is None:
raise ValueError(f"Category '{slug}' already exists.")
category = Category(
id=slug,
name=name,
created_at=int(time.time()),
deleted_at=None,
)
room_data.categories.append(category)
self._storage.save(room_data)
return category
def delete_category(
self,
room_id: int,
slug: str,
username: str | None,
) -> bool:
"""Soft delete a category. Admin only.
Args:
room_id: Room identifier
slug: Category slug to delete
username: Requesting admin's username
Returns:
True if deleted, False if not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can delete categories.")
room_data = self._storage.load(room_id)
if room_data is None:
return False
for cat in room_data.categories:
if cat.id == slug and cat.deleted_at is None:
cat.deleted_at = int(time.time())
self._storage.save(room_data)
return True
return False
def list_categories(self, room_id: int) -> list[Category]:
"""List active categories (excludes soft-deleted).
Args:
room_id: Room identifier
Returns:
List of active categories
"""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return [c for c in room_data.categories if c.deleted_at is None]
def get_category(self, room_id: int, slug: str) -> Category | None:
"""Get a category by slug (excludes soft-deleted).
Args:
room_id: Room identifier
slug: Category slug
Returns:
Category or None if not found
"""
room_data = self._storage.load(room_id)
if room_data is None:
return None
for cat in room_data.categories:
if cat.id == slug and cat.deleted_at is None:
return cat
return None
def _validate_category_exists(self, room_id: int, slug: str) -> None:
"""Validate that a category exists (and is not deleted). Raises ValueError if not found."""
if not self.get_category(room_id, slug):
raise ValueError(f"Category '{slug}' not found.")
def add_category_to_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None,
) -> bool:
"""Add category to a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to add
username: Requesting admin's username
Returns:
True if newly added, False if already exists
Raises:
PermissionError: If not admin
ValueError: If bounty or category not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can manage bounty categories.")
bounty = self.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
self._validate_category_exists(room_id, category_slug)
if category_slug in bounty.category_ids:
return False # Already exists
bounty.category_ids.append(category_slug)
self._storage.update_bounty(room_id, bounty)
return True
def remove_category_from_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None,
) -> bool:
"""Remove category from a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to remove
username: Requesting admin's username
Returns:
True if removed, False if not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can manage bounty categories.")
bounty = self.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
if category_slug not in bounty.category_ids:
return False
bounty.category_ids.remove(category_slug)
self._storage.update_bounty(room_id, bounty)
return True
def update_bounty_categories(
self,
room_id: int,
bounty_id: int,
category_slugs: list[str],
username: str | None,
) -> bool:
"""Replace all categories on a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slugs: New list of category slugs
username: Requesting admin's username
Returns:
True if updated
Raises:
PermissionError: If not admin
ValueError: If bounty or any category not found
"""
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can manage bounty categories.")
bounty = self.get_bounty(room_id, bounty_id)
if not bounty:
raise ValueError("Bounty not found.")
# Validate all categories exist
for slug in category_slugs:
self._validate_category_exists(room_id, slug)
bounty.category_ids = category_slugs
self._storage.update_bounty(room_id, bounty)
return True
class TrackingService:
"""Service for tracking bounty operations."""

View File

@@ -1,544 +0,0 @@
# JIGAIDO Audit & Feature Specification
> Document created: 2026-04-09
> Purpose: Repository audit findings and category feature specification
---
# Part I: Repository Audit
## 1.1 Current Architecture
JIGAIDO follows **hexagonal architecture** with clear separation:
```
jigaido/
├── core/ # Domain layer (pure Python, no deps)
│ ├── models.py # Domain dataclasses
│ ├── ports.py # Storage interfaces
│ └── services.py # Business logic
├── adapters/
│ └── storage/
│ └── json_file.py # JSON file persistence
├── apps/
│ └── telegram-bot/
│ ├── bot.py # Bot entrypoint
│ └── commands.py # Command handlers
├── tests/ # Unit tests (98 tests passing)
├── config.py # Configuration
├── SPEC.md # Original design spec
└── README.md # Overview
```
## 1.2 Features Implemented
| Feature | Status | Location |
|---------|--------|----------|
| Group bounty management | ✅ Done | `BountyService` |
| Personal DM bounties | ✅ Done | Same service, different room_id |
| Admin management | ✅ Done | `add_admin`, `remove_admin`, `list_admins` |
| Soft delete (recoverable) | ✅ Done | `delete_bounty`, `recover_bounty` |
| Due date with timezone | ✅ Done | `dateparser` + `ZoneInfo` |
| Link deduplication | ✅ Done | `check_link_unique` |
| Tracking/untracking | ✅ Done | `TrackingService` |
| `/track` in groups only | ✅ Done | Command handler |
| Expired bounty filtering | ✅ Done | 24h cutoff logic |
| Timezone per room | ✅ Done | `set_timezone`, `get_timezone` |
## 1.3 Bugs & Issues Found
### Bug 1: Admin Promotion Logic Edge Case
**Location**: `core/services.py` - `add_admin()`
**Issue**: The first admin can self-promote, but if the Telegram group creator joins later, they won't be recognized as admin since they're not in `admin_usernames`.
**Code**:
```python
# core/services.py:44-49
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
```
**Recommendation**: Document this behavior or enhance to auto-detect Telegram group creator.
---
### Bug 2: Hard Delete Method in Storage
**Location**: `adapters/storage/json_file.py:113-119`
**Issue**: `delete_bounty()` in storage does hard delete, while `BountyService.delete_bounty()` does soft delete. The storage method is unused but confusing.
**Code**:
```python
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
"""Delete a bounty from a room."""
room_data = self.load(room_id)
if room_data is None:
return
room_data.bounties = [b for b in room_data.bounties if b.id != bounty_id]
self.save(room_data)
```
**Recommendation**: Remove or mark as deprecated.
---
### Issue 3: Spec vs Code Inconsistency
**Location**: `SPEC.md` vs `commands.py`
**Issue**: SPEC.md says anyone can `/add` in groups, but code requires admin.
| Command | SPEC.md | Code |
|---------|---------|------|
| `/add` | anyone | admin only |
| `/edit` | creator only | admin only |
| `/delete` | creator only | admin only |
**Recommendation**: Update SPEC.md to reflect actual implementation.
---
### Issue 4: No Input Sanitization
**Location**: `commands.py` - `parse_args()`
**Issue**: Links without proper scheme (e.g., `github.com/user/repo`) are accepted as-is.
**Recommendation**: Consider normalizing URL scheme or validating format.
---
### Issue 5: No Rate Limiting
**Location**: `/add` command
**Issue**: No limit on:
- Number of bounties per room
- Text length
- Request rate
**Recommendation**: Add rate limiting for production use.
---
## 1.4 Test Status
```bash
$ PYTHONPATH=. python -m pytest tests/
======================== 98 passed, 1 warning ========================
```
**Note**: Tests require `PYTHONPATH=.` to run. Consider adding `pytest.ini` or `pyproject.toml`.
---
# Part II: Category Feature Specification
## 2.1 Overview
Add category support to JIGAIDO to allow filtering and organizing bounties.
**Goals**:
- Admin-only category management (create, delete)
- Multiple categories per bounty (no duplicates)
- Filter bounties by category
- Show categories on `/show` command
- Backward compatibility (existing bounties work without categories)
## 2.2 Data Model
### Category
```python
@dataclass
class Category:
"""A category for organizing bounties in a room."""
id: str # slug: lowercase alphabetic only, e.g., "bug", "feature"
name: str # display name: e.g., "Bug", "Feature"
created_at: int
deleted_at: int | None = None # soft delete
```
**Constraints**:
- `id` (slug): lowercase alphabetic only, no symbols, e.g., `^[a-z]+$`
- `name`: human-readable display
- Unique within room (slug must be unique)
- Soft delete preserves data
### Bounty (Modified)
```python
@dataclass
class Bounty:
# ... existing fields ...
id: int
text: str | None
link: str | None
due_date_ts: int | None
created_at: int
created_by_user_id: int
deleted_at: int | None = None
created_by_username: str | None = None
category_ids: list[str] = field(default_factory=list) # NEW
```
### RoomData (Modified)
```python
@dataclass
class RoomData:
room_id: int
bounties: list[Bounty]
next_id: int
timezone: str | None = None
admin_usernames: list[str] | None = None
categories: list[Category] = field(default_factory=list) # NEW
```
## 2.3 Category Scope
- **Per room**: Same as bounties, each room (group/DM) has independent categories
- **Admin only**: Only admins can create/delete categories
- **User access**: Regular users can only filter by category
## 2.4 Service Layer API
All methods require admin permission unless specified otherwise.
### Category Management
```python
class BountyService:
# ... existing methods ...
# --- Category Management ---
def add_category(
self,
room_id: int,
slug: str,
name: str,
username: str | None
) -> Category:
"""Create a new category. Admin only.
Args:
room_id: Room identifier
slug: Category ID (lowercase alphabetic, e.g., "bug")
name: Display name (e.g., "Bug Report")
username: Requesting admin's username
Returns:
Created Category
Raises:
PermissionError: If not admin
ValueError: If slug already exists or invalid
"""
...
def delete_category(
self,
room_id: int,
slug: str,
username: str | None
) -> bool:
"""Soft delete a category. Admin only.
Args:
room_id: Room identifier
slug: Category slug to delete
username: Requesting admin's username
Returns:
True if deleted, False if not found
"""
...
def list_categories(self, room_id: int) -> list[Category]:
"""List active categories (excludes soft-deleted).
Args:
room_id: Room identifier
Returns:
List of active categories
"""
...
def get_category(self, room_id: int, slug: str) -> Category | None:
"""Get a category by slug (excludes soft-deleted).
Args:
room_id: Room identifier
slug: Category slug
Returns:
Category or None if not found
"""
...
```
### Category-to-Bounty Association
```python
def add_category_to_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None
) -> bool:
"""Add category to a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to add
username: Requesting admin's username
Returns:
True if newly added, False if already exists
Raises:
PermissionError: If not admin
ValueError: If bounty or category not found
"""
...
def remove_category_from_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None
) -> bool:
"""Remove category from a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to remove
username: Requesting admin's username
Returns:
True if removed, False if not found
"""
...
def update_bounty_categories(
self,
room_id: int,
bounty_id: int,
category_slugs: list[str],
username: str | None
) -> bool:
"""Replace all categories on a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slugs: New list of category slugs
username: Requesting admin's username
Returns:
True if updated
Raises:
PermissionError: If not admin
ValueError: If bounty or any category not found
"""
...
```
### Bounty Listing with Category Filter
```python
def list_bounties(
self,
room_id: int,
category_slugs: list[str] | None = None,
include_expired: bool = False
) -> list[Bounty]:
"""List bounties with optional category filtering.
Args:
room_id: Room identifier
category_slugs: If provided, filter by ANY of these categories (OR)
include_expired: If True, include bounties past due date
Returns:
List of non-deleted bounties, sorted by due date
"""
...
```
## 2.5 Filtering Logic
- **Single category**: `/bounty -c bug` → bounties with "bug"
- **Multiple categories (OR)**: `/bounty -c bug,feature` → bounties with "bug" OR "feature"
- **No filter**: `/bounty` → all bounties (current behavior)
## 2.6 Command Syntax
### Category Management
```
/category - list categories
/category add <slug> <name> - create category (admin)
/category delete <slug> - soft delete category (admin)
```
### Bounty with Category
```
/add <text> [link] [date] -cat <slug> - add with category
/add <text> [link] [date] -cat <slug1>,<slug2> - add with multiple categories
/update <id> -cat <slug> - add category to bounty
/update <id> -cat <slug1>,<slug2> - set categories (replace all)
/update <id> -cat - - clear all categories
/update <id> -remove-cat <slug> - remove specific category
```
### Bounty Listing with Filter
```
/bounty - all bounties (current)
/bounty -c <slug> - filter by category
/bounty -c <slug1>,<slug2> - filter by multiple categories (OR)
/bounty all - show expired (current)
/bounty all -c <slug> - show expired + filter by category
```
### Show Bounty
```
/show <id> - show bounty details with categories
```
## 2.7 Display Format
### `/category` output
```
Categories:
- bug → Bug Report
- feature → Feature Request
- docs → Documentation
```
### `/show <id>` output (with categories)
```
[#1] Fix login bug
🔗 https://github.com/...
📅 15 April 2026 14:30 (Asia/Jakarta)
📂 Categories: bug | feature
👤 @username
📌 Created: 2026-04-01 10:00
```
### `/bounty -c bug` output
```
Filtering with 🐛 bug category:
Showing 3 of 10 bounties:
[#5] ...
[#1] ...
[#3] ...
```
### `/bounty -c bug,feature` output
```
Filtering with 🐛 bug, ✨ feature categories:
Showing 5 of 10 bounties:
[#5] ...
[#1] ...
```
## 2.8 Edge Cases
| Scenario | Behavior |
|----------|----------|
| Delete category | Soft delete - existing bounties keep category in data, but filter won't find it |
| Filter by deleted category | Show "No bounties with this category" or error |
| Add duplicate category to bounty | No-op, return False |
| Add invalid slug (uppercase/symbols) | Reject with validation error |
| Category slug conflict | Reject with "Category already exists" |
| Bounty without categories | `category_ids = []` (backward compatible) |
## 2.9 Test Cases to Add
```python
# Category Management
def test_add_category_requires_admin():
def test_add_category_duplicate_slug_fails():
def test_add_category_invalid_slug_fails():
def test_add_category_valid():
def test_delete_category_soft_deletes():
def test_deleted_category_not_listed():
def test_deleted_category_still_in_bounty_data():
def test_list_categories_empty():
def test_list_categories_returns_active():
def test_get_category_not_found():
def test_get_category_deleted_returns_none():
# Category-to-Bounty
def test_add_category_to_bounty():
def test_add_duplicate_category_to_bounty_noop():
def test_add_category_to_bounty_invalid_category():
def test_remove_category_from_bounty():
def test_remove_category_not_on_bounty_returns_false():
def test_update_bounty_categories_replace_all():
def test_update_bounty_categories_validates():
# Bounty Listing with Filter
def test_list_bounties_filter_by_single_category():
def test_list_bounties_filter_by_multiple_categories_or():
def test_list_bounties_no_category_returns_all():
def test_list_bounties_category_excludes_deleted_bounties():
```
---
# Part III: Implementation Checklist
## Models Layer
- [ ] Add `Category` dataclass to `core/models.py`
- [ ] Add `category_ids` field to `Bounty` dataclass
- [ ] Add `categories` field to `RoomData` dataclass
## Storage Layer
- [ ] Update `JsonFileRoomStorage.load()` to deserialize categories
- [ ] Update `JsonFileRoomStorage.save()` to serialize categories
## Service Layer
- [ ] Implement `add_category()`
- [ ] Implement `delete_category()`
- [ ] Implement `list_categories()`
- [ ] Implement `get_category()`
- [ ] Implement `add_category_to_bounty()`
- [ ] Implement `remove_category_from_bounty()`
- [ ] Implement `update_bounty_categories()`
- [ ] Update `list_bounties()` to support category filter
## Command Layer
- [ ] Add `/category` command handler
- [ ] Add `-cat` flag parsing to `/add`
- [ ] Add `-cat` and `-remove-cat` flags to `/update`
- [ ] Add `-c` flag to `/bounty` for category filter
- [ ] Update `/show` to display categories
## Tests
- [ ] Add category management tests
- [ ] Add category-to-bounty tests
- [ ] Add category filter tests
## Documentation
- [ ] Update README.md with category feature
- [ ] Update command help text
---
# Part IV: Open Questions
1. **Category icon/emoji**: Should categories have optional emoji? (Not in initial spec, can add later)
2. **Category reactivation**: Should soft-deleted categories be reactable? (Not in initial spec)
3. **Bulk category operations**: Should we support `/category add bulk`? (Not in initial spec)
---
*End of Audit & Specification Document*

View File

@@ -1,4 +0,0 @@
[pytest]
testpaths = tests
pythonpath = .
asyncio_default_fixture_loop_scope = function

View File

@@ -3,6 +3,7 @@
import pytest
from unittest.mock import patch, MagicMock
from io import StringIO
import sys
from core.models import Bounty
from core.ports import RoomStorage, TrackingStorage
@@ -236,7 +237,7 @@ class TestCLIValidation:
main()
mock_bounty_service.update_bounty.assert_called_once()
call_kwargs = mock_bounty_service.update_bounty.call_args
assert call_kwargs.kwargs.get("clear_link") is True
assert call_kwargs.kwargs.get("clear_link") == True
def test_update_clear_due_flag(self):
"""Test update with --clear-due flag."""
@@ -254,7 +255,7 @@ class TestCLIValidation:
main()
mock_bounty_service.update_bounty.assert_called_once()
call_kwargs = mock_bounty_service.update_bounty.call_args
assert call_kwargs.kwargs.get("clear_due") is True
assert call_kwargs.kwargs.get("clear_due") == True
class TestCLIOutput:

View File

@@ -2,9 +2,11 @@
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from config import Config, DEFAULT_DATA_DIR
@@ -45,28 +47,11 @@ class TestConfigDataDir:
assert cfg.bot_token == "test_token_123"
def test_bot_token_none_when_not_set(self):
"""Test that bot_token is None when JIGAIDO_BOT_TOKEN not set and no config file."""
"""Test that bot_token is None when JIGAIDO_BOT_TOKEN not set."""
with patch.dict(os.environ, {}, clear=True):
with patch("pathlib.Path.exists", return_value=False):
cfg = Config()
assert cfg.bot_token is None
def test_bot_token_from_config_file(self):
"""Test that bot_token is read from config file when env var not set."""
config_dir = Path.home() / ".jigaido"
config_file = config_dir / "config.json"
with patch.dict(os.environ, {}, clear=True):
with patch("pathlib.Path.expanduser", return_value=config_file):
with patch("pathlib.Path.exists", return_value=True):
with patch("builtins.open", create=True) as mock_open:
mock_open.return_value.__enter__ = lambda s: s
mock_open.return_value.__exit__ = lambda *a: None
mock_open.return_value.read = lambda: (
'{"JIGAIDO_BOT_TOKEN": "config_token"}'
)
cfg = Config()
assert cfg.bot_token == "config_token"
class TestConfigEnsureDataDir:
def test_ensure_data_dir_creates_directory(self, tmp_path):

View File

@@ -1,9 +1,11 @@
"""Tests for adapters/storage/json_file.py — JSON file storage adapter."""
import json
import os
import tempfile
from pathlib import Path
import pytest
from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
@@ -97,6 +99,15 @@ class TestJsonFileRoomStorage:
assert self.storage.load(-1001) is None
def test_delete_bounty(self):
"""Test that delete_bounty removes a bounty."""
bounty = self._create_bounty(id=1)
self.storage.add_bounty(-1001, bounty)
self.storage.delete_bounty(-1001, 1)
loaded = self.storage.load(-1001)
assert len(loaded.bounties) == 0
def test_get_bounty_found(self):
"""Test that get_bounty returns the bounty when found."""
bounty = self._create_bounty(id=1)

View File

@@ -1,6 +1,8 @@
"""Tests for core/models.py — domain dataclasses."""
import time
import pytest
from core.models import (
Bounty,
@@ -120,7 +122,7 @@ class TestRoomData:
assert rd.bounties == []
assert rd.next_id == 1
assert rd.timezone is None
assert rd.admin_usernames == []
assert rd.admin_user_ids == []
def test_create_dm_room_data(self):
rd = RoomData(
@@ -132,7 +134,7 @@ class TestRoomData:
assert rd.bounties == []
assert rd.next_id == 1
assert rd.timezone is None
assert rd.admin_usernames == []
assert rd.admin_user_ids == []
def test_room_data_with_bounties(self):
b = Bounty(
@@ -154,18 +156,18 @@ class TestRoomData:
bounties=[],
next_id=1,
timezone="Asia/Jakarta",
admin_usernames=["alice", "bob"],
admin_user_ids=[123, 456],
)
assert rd.timezone == "Asia/Jakarta"
assert rd.admin_usernames == ["alice", "bob"]
assert rd.admin_user_ids == [123, 456]
def test_room_data_admin_usernames_defaults_to_empty_list(self):
def test_room_data_admin_user_ids_defaults_to_empty_list(self):
rd = RoomData(
room_id=-1001,
bounties=[],
next_id=1,
)
assert rd.admin_usernames == []
assert rd.admin_user_ids == []
class TestTrackingData:

View File

@@ -1,5 +1,6 @@
"""Tests for core/ports.py — storage interfaces."""
import pytest
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
from core.ports import RoomStorage, TrackingStorage
@@ -34,6 +35,12 @@ class SimpleRoomStorage:
self._rooms[room_id].bounties[i] = bounty
break
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
if room_id in self._rooms:
self._rooms[room_id].bounties = [
b for b in self._rooms[room_id].bounties if b.id != bounty_id
]
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
if room_id in self._rooms:
for b in self._rooms[room_id].bounties:
@@ -113,6 +120,12 @@ class MockRoomStorage:
self._rooms[room_id].bounties[i] = bounty
break
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
if room_id in self._rooms:
self._rooms[room_id].bounties = [
b for b in self._rooms[room_id].bounties if b.id != bounty_id
]
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
if room_id in self._rooms:
for b in self._rooms[room_id].bounties:
@@ -230,6 +243,19 @@ class TestRoomStorage:
assert result is not None
assert result.text == "Updated"
def test_delete_bounty(self):
storage = MockRoomStorage()
bounty = Bounty(
id=1,
text="Test",
link=None,
due_date_ts=None,
created_at=0,
created_by_user_id=123,
)
storage.add_bounty(-1001, bounty)
storage.delete_bounty(-1001, 1)
assert storage.get_bounty(-1001, 1) is None
class TestTrackingStorage:

View File

@@ -1,8 +1,9 @@
"""Tests for core/services.py — business logic services."""
import pytest
from unittest.mock import MagicMock
from core.models import RoomData
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
from core.services import BountyService, TrackingService
from tests.test_ports import MockRoomStorage, MockTrackingStorage
@@ -14,34 +15,32 @@ class TestBountyService:
"""Set up fresh storage and service for each test."""
self.storage = MockRoomStorage()
self.service = BountyService(self.storage)
self.admin_username = "admin"
self._make_admin(-1001, self.admin_username)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
def _make_admin(self, room_id: int, username: str):
def _make_admin(self, room_id: int, user_id: int):
"""Helper to set up a room with an admin user."""
room_data = self.storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
)
if username not in (room_data.admin_usernames or []):
room_data.admin_usernames = room_data.admin_usernames or []
room_data.admin_usernames.append(username)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
self.storage.save(room_data)
def test_add_bounty_creates_room_if_not_exists(self):
"""Test that add_bounty creates a new room if it doesn't exist."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=123,
username=self.admin_username,
user_id=self.admin_user_id,
text="Fix bug",
link="https://github.com/issue/1",
created_by_username=self.admin_username,
)
assert bounty.id == 1
assert bounty.text == "Fix bug"
assert bounty.created_by_user_id == 123
assert bounty.created_by_user_id == self.admin_user_id
room = self.storage.load(-1001)
assert room is not None
@@ -50,13 +49,13 @@ class TestBountyService:
def test_add_bounty_increments_id(self):
"""Test that add_bounty increments bounty ID for each new bounty."""
b1 = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="First"
room_id=-1001, user_id=self.admin_user_id, text="First"
)
b2 = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Second"
room_id=-1001, user_id=self.admin_user_id, text="Second"
)
b3 = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Third"
room_id=-1001, user_id=self.admin_user_id, text="Third"
)
assert b1.id == 1
@@ -66,9 +65,7 @@ class TestBountyService:
def test_add_bounty_requires_admin(self):
"""Test that add_bounty raises PermissionError when non-admin tries to add."""
with pytest.raises(PermissionError, match="Only admins can add bounties"):
self.service.add_bounty(
room_id=-1001, user_id=999, username="nonadmin", text="Not admin"
)
self.service.add_bounty(room_id=-1001, user_id=999, text="Not admin")
def test_list_bounties_empty_room(self):
"""Test list_bounties returns empty list for non-existent room."""
@@ -77,16 +74,14 @@ class TestBountyService:
def test_list_bounties_returns_all_bounties(self):
"""Test list_bounties returns all bounties in a room."""
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="First")
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="First"
)
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Second"
room_id=-1001, user_id=self.admin_user_id, text="Second"
)
# Add bounty to different room to verify isolation
self._make_admin(-999, "otheradmin")
self._make_admin(-999, self.admin_user_id)
self.service.add_bounty(
room_id=-999, user_id=456, username="otheradmin", text="Other room"
room_id=-999, user_id=self.admin_user_id, text="Other room"
)
bounties = self.service.list_bounties(-1001)
@@ -96,7 +91,7 @@ class TestBountyService:
def test_get_bounty_found(self):
"""Test get_bounty returns bounty when it exists."""
created = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Test"
room_id=-1001, user_id=self.admin_user_id, text="Test"
)
found = self.service.get_bounty(-1001, created.id)
assert found is not None
@@ -109,21 +104,19 @@ class TestBountyService:
def test_get_bounty_wrong_room(self):
"""Test get_bounty returns None when bounty is in different room."""
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Test"
)
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="Test")
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
assert found is None
def test_update_bounty_success(self):
"""Test update_bounty succeeds when admin updates their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Original"
room_id=-1001, user_id=self.admin_user_id, text="Original"
)
result = self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
username=self.admin_username,
user_id=self.admin_user_id,
text="Updated",
)
assert result is True
@@ -133,13 +126,13 @@ class TestBountyService:
def test_update_bounty_not_admin_raises_permission_error(self):
"""Test update_bounty raises PermissionError when non-admin tries to update."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Original"
room_id=-1001, user_id=self.admin_user_id, text="Original"
)
with pytest.raises(PermissionError, match="Only admins can edit bounties"):
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
username="nonadmin", # different user, not admin
user_id=999, # different user, not admin
text="Hacked",
)
@@ -148,7 +141,7 @@ class TestBountyService:
result = self.service.update_bounty(
room_id=-1001,
bounty_id=999,
username=self.admin_username,
user_id=self.admin_user_id,
text="Updated",
)
assert result is False
@@ -157,15 +150,14 @@ class TestBountyService:
"""Test update_bounty only updates provided fields."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=123,
username=self.admin_username,
user_id=self.admin_user_id,
text="Original",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
username=self.admin_username,
user_id=self.admin_user_id,
text="Updated only text",
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -176,15 +168,14 @@ class TestBountyService:
"""Test update_bounty can clear link."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=123,
username=self.admin_username,
user_id=self.admin_user_id,
text="Test",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
username=self.admin_username,
user_id=self.admin_user_id,
clear_link=True,
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -193,9 +184,9 @@ class TestBountyService:
def test_delete_bounty_success(self):
"""Test delete_bounty soft deletes when admin deletes their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
room_id=-1001, user_id=self.admin_user_id, text="To delete"
)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_username)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_user_id)
assert result is True
# Soft delete - bounty should not be found via get_bounty
assert self.service.get_bounty(-1001, bounty.id) is None
@@ -207,56 +198,18 @@ class TestBountyService:
def test_delete_bounty_not_admin_raises_permission_error(self):
"""Test delete_bounty raises PermissionError when non-admin tries to delete."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
room_id=-1001, user_id=self.admin_user_id, text="To delete"
)
with pytest.raises(PermissionError, match="Only admins can delete bounties"):
self.service.delete_bounty(-1001, bounty.id, "nonadmin")
self.service.delete_bounty(
-1001, bounty.id, 999
) # different user, not admin
def test_delete_bounty_not_found(self):
"""Test delete_bounty returns False when bounty doesn't exist."""
result = self.service.delete_bounty(-1001, 999, self.admin_username)
result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
assert result is False
def test_delete_bounties_multi_id_success(self):
"""Test delete_bounties returns individual results for multiple bounties."""
bounty1 = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="To delete 1"
)
bounty2 = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="To delete 2"
)
results = self.service.delete_bounties(
-1001, [bounty1.id, bounty2.id], self.admin_username
)
assert results == {bounty1.id: "deleted", bounty2.id: "deleted"}
# Verify both are soft deleted
assert self.service.get_bounty(-1001, bounty1.id) is None
assert self.service.get_bounty(-1001, bounty2.id) is None
def test_delete_bounties_mixed_results(self):
"""Test delete_bounties returns not_found for non-existent bounties."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
results = self.service.delete_bounties(
-1001, [bounty.id, 999, 888], self.admin_username
)
assert results == {bounty.id: "deleted", 999: "not_found", 888: "not_found"}
def test_delete_bounties_permission_denied(self):
"""Test delete_bounties returns permission_denied for non-admin users."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
results = self.service.delete_bounties(
-1001,
[bounty.id],
"nonadmin", # non-admin user
)
assert results == {bounty.id: "permission_denied"}
# Verify bounty was NOT deleted
assert self.service.get_bounty(-1001, bounty.id) is not None
class TestTrackingService:
"""Unit tests for TrackingService."""
@@ -266,31 +219,29 @@ class TestTrackingService:
self.room_storage = MockRoomStorage()
self.tracking_storage = MockTrackingStorage()
self.service = TrackingService(self.tracking_storage, self.room_storage)
self.admin_username = "admin"
self._make_admin(-1001, self.admin_username)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
def _make_admin(self, room_id: int, username: str):
def _make_admin(self, room_id: int, user_id: int):
"""Helper to set up a room with an admin user."""
room_data = self.room_storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
)
if username not in (room_data.admin_usernames or []):
room_data.admin_usernames = room_data.admin_usernames or []
room_data.admin_usernames.append(username)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
self.room_storage.save(room_data)
def _add_bounty(self, room_id=-1001, username="admin", text="Test bounty"):
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
"""Helper to add a bounty for tracking tests."""
if self.room_storage.load(room_id) is None or username not in (
self.room_storage.load(room_id).admin_usernames or []
if self.room_storage.load(room_id) is None or user_id not in (
self.room_storage.load(room_id).admin_user_ids or []
):
self._make_admin(room_id, username)
self._make_admin(room_id, user_id)
bounty_service = BountyService(self.room_storage)
return bounty_service.add_bounty(
room_id=room_id, user_id=123, username=username, text=text
)
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)
def test_track_bounty_success(self):
"""Test track_bounty successfully tracks a bounty."""
@@ -378,13 +329,11 @@ class TestTrackingService:
def test_get_tracked_bounties_ignores_deleted_bounties(self):
"""Test get_tracked_bounties ignores bounties that were deleted."""
bounty_service = BountyService(self.room_storage)
bounty = bounty_service.add_bounty(
room_id=-1001, user_id=123, username="admin", text="To delete"
)
bounty = bounty_service.add_bounty(room_id=-1001, user_id=123, text="To delete")
self.service.track_bounty(-1001, 123456, bounty.id)
# Delete the bounty
bounty_service.delete_bounty(-1001, bounty.id, "admin")
bounty_service.delete_bounty(-1001, bounty.id, 123)
tracked = self.service.get_tracked_bounties(-1001, 123456)
assert len(tracked) == 0 # deleted bounty not returned