Compare commits

..

7 Commits

Author SHA1 Message Date
43d07eae92 Merge pull request 'feat: Category Feature - Models & Storage (#85)' (#90) from feature/category-models-storage into main 2026-04-09 12:35:50 +02:00
shokollm
66d2a9fb86 feat: add Category model and update storage for categories
Models & Storage layer for category feature (Issue #85):

- Add Category dataclass to core/models.py
  - id (slug): lowercase alphabetic only
  - name: display name
  - created_at: Unix timestamp
  - deleted_at: soft delete timestamp (None if active)

- Add category_ids field to Bounty dataclass
  - list[str] for multiple categories per bounty
  - Default empty list for backward compatibility

- Add categories field to RoomData dataclass
  - list[Category] for room-level categories
  - Default empty list

- Update JsonFileRoomStorage to serialize/deserialize:
  - Category fields (id, name, created_at, deleted_at)
  - Bounty.category_ids
  - RoomData.categories

Backward compatible: existing data without categories works fine.
2026-04-09 10:23:33 +00:00
shokollm
235a89653f test: add pytest.ini for easier test running
- Set testpaths = tests
- Set pythonpath = . (no need for PYTHONPATH=. prefix)
- Set asyncio_default_fixture_loop_scope = function
2026-04-09 10:15:27 +00:00
shokollm
2158f71fd0 feat: normalize URLs without scheme to https://
- Add normalize_url() helper function in commands.py
- Automatically prefix URLs without scheme (e.g. github.com → https://github.com)
- Applies to both -link flag and auto-detected URLs
- Add 5 new tests for URL normalization
- Fix existing tests to handle 5-value return from parse_args()

Examples:
  /add Fix bug github.com/user/repo
  → stored as: https://github.com/user/repo
2026-04-09 10:10:06 +00:00
shokollm
4885be0752 fix: cleanup codebase and sync SPEC with actual permissions
Phase 1: Ruff lint fixes
- Remove unused imports across all files
- Remove unused variables (now_utc, tz, ctx)
- Fix f-string without placeholders
- Fix E402 import order with noqa comments

Phase 2: Remove confusing hard delete from storage
- Removed delete_bounty() from RoomStorage Protocol (never used by app)
- Removed delete_bounty() from JsonFileRoomStorage (was hard delete)
- Removed corresponding tests (hard delete was never used)

Phase 3: Sync SPEC.md with actual code behavior
- Updated overview: admins can add/edit/delete (not 'anyone' + 'creator')
- Updated command table: /add, /edit, /delete are admin only
- Updated error handling messages

Test results: 96 passed (2 hard delete tests removed)
2026-04-09 10:01:02 +00:00
shokollm
75122b3ee2 docs: add audit and category spec from feynman 2026-04-09 09:30:42 +00:00
shokollm
8494b4621c feat: switch admin identification from user_id to username
- Replace admin_user_ids (list[int]) with admin_usernames (list[str])
- Update all service methods to use username for permission checks
- Add delete button to bot responses for message cleanup
- Update tests to match new implementation

Note: Breaking change - existing data files need fresh start
2026-04-09 08:02:36 +00:00
19 changed files with 1126 additions and 310 deletions

10
SPEC.md
View File

@@ -8,7 +8,7 @@
JIGAIDO is a Telegram bot that lets groups and individuals track bounties — tasks, obligations, and deadlines — with optional due dates and personal tracking.
- **Group mode**: Each Telegram group has its own bounty list. Anyone can add bounties. Only creator can edit/delete.
- **Group mode**: Each Telegram group has its own bounty list. Admins can add/edit/delete bounties. Anyone can track.
- **DM mode**: Personal bounty list. Anyone can manage their own bounties.
- **Tracking**: Users can track any bounty (group or personal) to their tracking list.
- **Due dates**: Free-form text (`"april 15"`, `"in 3 days"`, `"tomorrow"`) parsed at add time, stored as Unix timestamp. If unparseable, stored as `NULL`.
@@ -122,9 +122,9 @@ Data is stored at `~/.jigaido/` (home directory), NOT inside the repository.
|---|---|---|
| `/bounty` | anyone | List all bounties in this group |
| `/my` | anyone | List bounties tracked by you in this group |
| `/add <text> [link] [due date]` | anyone | Add a new bounty to the group |
| `/edit <bounty_id> [text] [link] [due_date]` | creator only | Edit an existing bounty |
| `/delete <bounty_id>` | creator only | Delete a bounty |
| `/add <text> [link] [due date]` | admin only | Add a new bounty to the group |
| `/edit <bounty_id> [text] [link] [due_date]` | admin only | Edit an existing bounty |
| `/delete <bounty_id>` | admin only | Delete a bounty |
| `/track <bounty_id>` | anyone | Track a group bounty |
| `/untrack <bounty_id>` | anyone | Stop tracking a bounty |
@@ -170,7 +170,7 @@ Stored as Unix timestamp. User-facing display can be localized/converted to any
## Error Handling
- Unknown command → help text with available commands
- `/edit`/`/delete` by non-creator → "⛔ Only the creator can edit/delete this bounty."
- `/add`/`/edit`/`/delete` by non-admin → "⛔ Only admins can add/edit/delete bounties."
- `/track` already tracked → "Already tracking" (idempotent)
- `/untrack` not tracked → "Not tracking" (idempotent)
- Bounty not found → "Bounty not found"

View File

@@ -11,8 +11,7 @@ import os
import tempfile
from pathlib import Path
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
from core.ports import RoomStorage, TrackingStorage
from core.models import Bounty, Category, RoomData, TrackingData, TrackedBounty
class JsonFileRoomStorage:
@@ -58,16 +57,28 @@ class JsonFileRoomStorage:
created_by_user_id=b["created_by_user_id"],
deleted_at=b.get("deleted_at"),
created_by_username=b.get("created_by_username"),
category_ids=b.get("category_ids", []),
)
for b in data.get("bounties", [])
]
categories = [
Category(
id=c["id"],
name=c["name"],
created_at=c["created_at"],
deleted_at=c.get("deleted_at"),
)
for c in data.get("categories", [])
]
return RoomData(
room_id=data["room_id"],
bounties=bounties,
next_id=data["next_id"],
timezone=data.get("timezone"),
admin_user_ids=data.get("admin_user_ids", []),
admin_usernames=data.get("admin_usernames", []),
categories=categories,
)
def save(self, room_data: RoomData) -> None:
@@ -76,7 +87,16 @@ class JsonFileRoomStorage:
"room_id": room_data.room_id,
"next_id": room_data.next_id,
"timezone": room_data.timezone,
"admin_user_ids": room_data.admin_user_ids or [],
"admin_usernames": room_data.admin_usernames or [],
"categories": [
{
"id": c.id,
"name": c.name,
"created_at": c.created_at,
"deleted_at": c.deleted_at,
}
for c in room_data.categories
],
"bounties": [
{
"id": b.id,
@@ -87,6 +107,7 @@ class JsonFileRoomStorage:
"created_by_user_id": b.created_by_user_id,
"deleted_at": b.deleted_at,
"created_by_username": b.created_by_username,
"category_ids": b.category_ids,
}
for b in room_data.bounties
],
@@ -119,15 +140,6 @@ class JsonFileRoomStorage:
self.save(room_data)
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
"""Delete a bounty from a room."""
room_data = self.load(room_id)
if room_data is None:
return
room_data.bounties = [b for b in room_data.bounties if b.id != bounty_id]
self.save(room_data)
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty from a room by ID."""
room_data = self.load(room_id)

View File

@@ -1,7 +1,6 @@
"""JIGAIDO Telegram bot entrypoint."""
import logging
import os
import sys
sys.path.insert(0, "/home/shoko/repositories/jigaido")
@@ -9,9 +8,7 @@ sys.path.insert(0, "/home/shoko/repositories/jigaido")
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
filters,
)
from commands import (
@@ -38,7 +35,7 @@ logging.basicConfig(
)
log = logging.getLogger(__name__)
from config import config
from config import config # noqa: E402
BOT_TOKEN = config.bot_token or ""

View File

@@ -1,8 +1,7 @@
"""Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
import time
from datetime import datetime, timezone
from functools import wraps
from datetime import datetime
from typing import Optional
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
@@ -82,6 +81,15 @@ def parse_args(
return True
return False
def normalize_url(url: str) -> str:
"""Normalize URL by adding https:// prefix if missing."""
if not url:
return url
if url.startswith("http://") or url.startswith("https://"):
return url
# Add https:// for URLs without scheme
return f"https://{url}"
def is_time(s: str) -> bool:
if not s or ":" not in s:
return False
@@ -107,7 +115,7 @@ def parse_args(
if arg == "-link":
if i + 1 < len(args) and not args[i + 1].startswith("-"):
link = args[i + 1]
link = normalize_url(args[i + 1])
i += 2
else:
clear_link = True
@@ -130,7 +138,7 @@ def parse_args(
clear_date = True
i += 1
elif not link and is_url(arg):
link = arg
link = normalize_url(arg)
i += 1
elif due_date_ts is None:
due_date_ts = parse_date_with_tz(arg)
@@ -175,7 +183,6 @@ def format_bounty(
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
now_utc = int(time.time())
dt_now = datetime.now(tz)
dt_due = datetime.fromtimestamp(b.due_date_ts, tz=tz)
@@ -285,11 +292,19 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if show_all:
await update.message.reply_text("No bounties yet.")
await update.message.reply_text(
"No bounties yet.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text(
"No active bounties. Use /bounty all to show expired."
"No active bounties. Use /bounty all to show expired.",
reply_markup=reply_markup,
)
return
@@ -371,29 +386,57 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
bounty = BOUNTY_SERVICE.add_bounty(
room_id=room_id,
user_id=user_id,
username=username,
text=text,
link=link,
due_date_ts=due_date_ts,
created_by_username=username,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
except ValueError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
due_str = ""
if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}",
disable_web_page_preview=True,
reply_markup=reply_markup,
)
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can edit bounties.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
@@ -411,18 +454,18 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Invalid bounty ID.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
room_id = get_room_id(update)
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
text, link, due_date_ts, clear_link, clear_date = parse_args(args[1:], timezone_str)
if (
not text
@@ -431,19 +474,33 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
and not clear_link
and not clear_date
):
await update.message.reply_text("Nothing to update.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Nothing to update.",
reply_markup=reply_markup,
)
return
old_bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
if not old_bounty:
await update.message.reply_text("Bounty not found.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Bounty not found.",
reply_markup=reply_markup,
)
return
try:
success = BOUNTY_SERVICE.update_bounty(
room_id=room_id,
bounty_id=bounty_id,
user_id=user_id,
username=username,
text=text,
link=link,
due_date_ts=due_date_ts,
@@ -451,10 +508,18 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
clear_due=clear_date,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
except ValueError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
if success:
@@ -485,11 +550,23 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
changes.append(f"Date: {old_date} → (cleared)")
if changes:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty #{bounty_id} updated:\n" + "\n".join(changes)
f"✅ Bounty #{bounty_id} updated:\n" + "\n".join(changes),
reply_markup=reply_markup,
)
else:
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty #{bounty_id} updated.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text("Bounty not found.")
@@ -498,6 +575,22 @@ cmd_edit = cmd_update
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can delete bounties.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /delete <bounty_id> [bounty_id ...]")
@@ -506,20 +599,28 @@ async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try:
bounty_ids = [int(arg) for arg in args]
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Invalid bounty ID.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
room_id = get_room_id(update)
try:
results = BOUNTY_SERVICE.delete_bounties(
room_id=room_id,
bounty_ids=bounty_ids,
user_id=user_id,
username=username,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
response_lines = []
@@ -531,12 +632,24 @@ async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
await update.message.reply_text("\n".join(response_lines))
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(response_lines), reply_markup=reply_markup
)
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if not is_group(update):
await update.message.reply_text("⛔ /track is only available in groups.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ /track is only available in groups.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
@@ -555,16 +668,38 @@ async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try:
if TRACKING_SERVICE.track_bounty(room_id, user_id, bounty_id):
await update.message.reply_text(f"✅ Now tracking bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Now tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text(f"Already tracking bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Already tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
except ValueError as e:
await update.message.reply_text(str(e))
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if not is_group(update):
await update.message.reply_text("⛔ /untrack is only available in groups.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ /untrack is only available in groups.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
@@ -582,22 +717,46 @@ async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update)
if TRACKING_SERVICE.untrack_bounty(room_id, user_id, bounty_id):
await update.message.reply_text(f"✅ Untracked bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Untracked bounty #{bounty_id}.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text("Not tracking bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Not tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if is_group(update):
try:
chat_member = await ctx.bot.get_chat_member(room_id, user_id)
if chat_member.status == "creator" and not BOUNTY_SERVICE.is_admin(
room_id, user_id
room_id, username
):
BOUNTY_SERVICE.add_admin(room_id, user_id, user_id)
BOUNTY_SERVICE.add_admin(room_id, username, username)
keyboard = [
[
InlineKeyboardButton(
"🗑️ Delete", callback_data=f"del_msg:{user_id}"
)
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
@@ -605,34 +764,46 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
return
except Exception:
pass
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
else:
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
BOUNTY_SERVICE.add_admin(room_id, user_id, user_id)
if not BOUNTY_SERVICE.is_admin(room_id, username):
BOUNTY_SERVICE.add_admin(room_id, username, username)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO activated.\n\n"
"Personal bounty list ready.\n"
"/bounty — list your bounties\n"
"/add — create a bounty\n"
"/my — your tracked bounties"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /show <bounty_id>")
@@ -680,19 +851,44 @@ async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
created_str = dt_created.strftime("%Y-%m-%d %H:%M")
lines.append(f"📌 Created: {created_str}")
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines), disable_web_page_preview=True, parse_mode=ParseMode.HTML
"\n".join(lines),
disable_web_page_preview=True,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can change timezone.",
reply_markup=reply_markup,
)
return
if not args:
current_tz = BOUNTY_SERVICE.get_timezone(room_id)
await update.message.reply_text(f"Current timezone: {current_tz}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Current timezone: {current_tz}",
reply_markup=reply_markup,
)
return
timezone_str = args[0]
@@ -706,21 +902,39 @@ async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
return
try:
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, user_id)
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, username)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
await update.message.reply_text(f"✅ Timezone set to {timezone_str}.")
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Timezone set to {timezone_str}.",
reply_markup=reply_markup,
)
async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
if not args:
@@ -746,7 +960,16 @@ async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
)
lines.append(f"[#{b.id}] {text}{link_str} | 🗑️ Deleted {deleted_str}")
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
reply_markup=reply_markup,
)
return
try:
@@ -755,7 +978,7 @@ async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Invalid bounty ID.")
return
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, user_id)
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, username)
response_lines = []
for bounty_id, result in results.items():
@@ -768,55 +991,54 @@ async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
await update.message.reply_text("\n".join(response_lines))
async def _find_user_id_by_username(
ctx: ContextTypes.DEFAULT_TYPE, username: str
) -> int | None:
"""Find user_id by username using Telegram API."""
import logging
log = logging.getLogger(__name__)
try:
chat = await ctx.bot.get_chat(f"@{username}")
log.info(f"Found user {username}: {chat.id}")
return chat.id
except Exception as e:
log.error(f"Failed to find user @{username}: {e}")
return None
def _find_username_by_user_id(room_id: int, user_id: int) -> str | None:
"""Find username by user_id from bounty creators in the room."""
bounties = BOUNTY_SERVICE.list_bounties(room_id)
for bounty in bounties:
if bounty.created_by_user_id == user_id:
return bounty.created_by_username
return None
user_id = get_user_id(update)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(response_lines), reply_markup=reply_markup
)
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
effective_user = update.effective_user
requesting_username = effective_user.username or effective_user.first_name or None
if not args:
admins = BOUNTY_SERVICE.list_admins(get_room_id(update))
if not admins:
await update.message.reply_text("No admins in this room.")
return
admin_mentions = []
for admin_id in admins:
username = _find_username_by_user_id(get_room_id(update), admin_id)
if username:
admin_mentions.append(
f'<a href="tg://user?id={admin_id}">@{username}</a>'
)
else:
admin_mentions.append(
f'<a href="tg://user?id={admin_id}">{admin_id}</a>'
)
if not BOUNTY_SERVICE.is_admin(room_id, requesting_username):
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions),
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
admins = BOUNTY_SERVICE.list_admins(room_id)
if not admins:
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"No admins in this room.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
admin_mentions = [f"@{admin}" for admin in admins]
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions),
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
return
@@ -840,36 +1062,51 @@ async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
username = raw_username[1:]
target_username = raw_username[1:]
user_id = get_user_id(update)
room_id = get_room_id(update)
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
return
target_user_id = await _find_user_id_by_username(ctx, username)
if target_user_id is None:
await update.message.reply_text(f"⛔ User @{username} not found.")
if not BOUNTY_SERVICE.is_admin(room_id, requesting_username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
try:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if subcommand == "add":
BOUNTY_SERVICE.add_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is now an admin.")
BOUNTY_SERVICE.add_admin(room_id, target_username, requesting_username)
await update.message.reply_text(
f"✅ @{target_username} is now an admin.",
reply_markup=reply_markup,
)
elif subcommand == "remove":
BOUNTY_SERVICE.remove_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is no longer an admin.")
except PermissionError as e:
await update.message.reply_text(f"{e}")
BOUNTY_SERVICE.remove_admin(room_id, target_username, requesting_username)
await update.message.reply_text(
f"✅ @{target_username} is no longer an admin.",
reply_markup=reply_markup,
)
except (PermissionError, ValueError) as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
is_admin = BOUNTY_SERVICE.is_admin(room_id, user_id)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
is_admin = BOUNTY_SERVICE.is_admin(room_id, username)
if is_admin:
lines = [
@@ -906,4 +1143,10 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
"/help — show this message",
]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
reply_markup=reply_markup,
)

View File

@@ -8,7 +8,7 @@ os.chdir("/home/shoko/repositories/jigaido/apps/telegram-bot")
sys.path.insert(0, "/home/shoko/repositories/jigaido")
# Import main from the local bot module
import bot as bot_module
import bot as bot_module # noqa: E402
if __name__ == "__main__":
if not bot_module.BOT_TOKEN:

View File

@@ -1,10 +1,8 @@
"""Pytest fixtures for telegram-bot tests."""
import sys
import tempfile
from pathlib import Path
import pytest
# Add the app directory to path so imports work when running pytest
sys.path.insert(0, str(Path(__file__).parent.parent))

View File

@@ -1,11 +1,11 @@
"""Tests for commands.py — parsing, formatting, and command handlers."""
import time
from unittest.mock import MagicMock, patch, AsyncMock, sentinel
from unittest.mock import MagicMock, patch, AsyncMock
import pytest
from telegram import Update, Message, User, Chat, CallbackQuery
from telegram import Update, Message, User, Chat
from telegram.ext import ContextTypes
from commands import (
@@ -52,75 +52,97 @@ class TestExtractArgs:
class TestParseArgs:
def test_text_only(self):
text, link, due = parse_args(["hello", "world"])
text, link, due, _, _ = parse_args(["hello", "world"])
assert text == "hello world"
assert link is None
assert due is None
def test_link_extracted(self):
text, link, due = parse_args(["hello", "https://example.com"])
text, link, due, _, _ = parse_args(["hello", "https://example.com"])
# "hello" is non-link non-date → becomes text; only the URL becomes link
assert text == "hello"
assert link == "https://example.com"
assert due is None
def test_text_and_link(self):
text, link, due = parse_args(["hello", "world", "https://example.com"])
text, link, due, _, _ = parse_args(["hello", "world", "https://example.com"])
assert text == "hello world"
assert link == "https://example.com"
def test_due_date_parsed(self):
text, link, due = parse_args(["hello", "tomorrow"])
text, link, due, _, _ = parse_args(["hello", "tomorrow"])
assert text == "hello"
assert due is not None
# Should be some time in the future
assert due > int(time.time())
def test_all_three(self):
text, link, due = parse_args(["hello", "https://example.com", "tomorrow"])
text, link, due, _, _ = parse_args(["hello", "https://example.com", "tomorrow"])
assert text == "hello"
assert link == "https://example.com"
assert due is not None
def test_http_and_https_both_detected(self):
_, link1, _ = parse_args(["http://example.com"])
_, link2, _ = parse_args(["https://example.com"])
_, link1, _, _, _ = parse_args(["http://example.com"])
_, link2, _, _, _ = parse_args(["https://example.com"])
assert link1 == "http://example.com"
assert link2 == "https://example.com"
def test_non_url_non_date_becomes_text(self):
text, link, due = parse_args(["fix", "the", "bug"])
text, link, due, _, _ = parse_args(["fix", "the", "bug"])
assert text == "fix the bug"
assert link is None
assert due is None
def test_multiple_links_first_only(self):
_, link, _ = parse_args(["text", "https://first.com", "https://second.com"])
_, link, _, _, _ = parse_args(["text", "https://first.com", "https://second.com"])
assert link == "https://first.com"
def test_due_date_after_link(self):
text, link, due = parse_args(["task", "https://example.com", "in 5 days"])
text, link, due, _, _ = parse_args(["task", "https://example.com", "in 5 days"])
assert text == "task"
assert link == "https://example.com"
assert due is not None
def test_empty_args(self):
text, link, due = parse_args([])
text, link, due, _, _ = parse_args([])
assert text is None
assert link is None
assert due is None
def test_date_parser_failure_returns_none(self):
# "asdfjkl" is not parseable → goes to text
text, link, due = parse_args(["hello", "asdfjkl"])
text, link, due, _, _ = parse_args(["hello", "asdfjkl"])
assert text == "hello asdfjkl"
assert due is None
def test_link_takes_first_match(self):
# Even if it's not a valid URL, starts with https://
_, link, _ = parse_args(["skip", "https://not-real.but-still-a-link"])
_, link, _, _, _ = parse_args(["skip", "https://not-real.but-still-a-link"])
assert link == "https://not-real.but-still-a-link"
def test_url_without_scheme_normalized_to_https(self):
"""URLs without scheme should get https:// prefix."""
_, link, _, _, _ = parse_args(["github.com/user/repo"])
assert link == "https://github.com/user/repo"
def test_url_without_scheme_github_normalized(self):
text, link, _, _, _ = parse_args(["Fix bug", "github.com/owner/repo"])
assert text == "Fix bug"
assert link == "https://github.com/owner/repo"
def test_url_with_explicit_https_unchanged(self):
_, link, _, _, _ = parse_args(["task", "https://example.com/page"])
assert link == "https://example.com/page"
def test_url_with_http_unchanged(self):
_, link, _, _, _ = parse_args(["task", "http://example.com/page"])
assert link == "http://example.com/page"
def test_url_link_flag_without_scheme_normalized(self):
_, link, _, _, _ = parse_args(["-link", "example.com/path"])
assert link == "https://example.com/path"
class TestFormatBounty:
def _row(
@@ -368,7 +390,6 @@ class TestCmdAdd:
@pytest.mark.asyncio
async def test_add_needs_text_or_link(self):
update = create_mock_update(message_text="/add")
ctx = MagicMock(spec=ContextTypes.DEFAULT_TYPE)
_, link, _ = parse_args([])
if not "test" and not link:

View File

@@ -2,7 +2,6 @@
import argparse
import sys
from pathlib import Path
import dateparser

View File

@@ -1,6 +1,20 @@
"""Domain dataclasses for JIGAIDO bounty tracker."""
from dataclasses import dataclass
from dataclasses import dataclass, field
@dataclass
class Category:
"""A category for organizing bounties in a room.
Categories are per-room and support soft delete.
The id (slug) must be lowercase alphabetic only (e.g., "bug", "feature").
"""
id: str # slug: lowercase alphabetic only, e.g., "bug", "feature"
name: str # display name: e.g., "Bug", "Feature"
created_at: int
deleted_at: int | None = None # soft delete
@dataclass
@@ -12,6 +26,8 @@ class Bounty:
The deleted_at field indicates soft-delete: None means not deleted,
a value means deleted at that Unix timestamp.
The category_ids field lists category slugs associated with this bounty.
"""
id: int
@@ -22,6 +38,7 @@ class Bounty:
created_by_user_id: int
deleted_at: int | None = None
created_by_username: str | None = None
category_ids: list[str] = field(default_factory=list)
@dataclass
@@ -44,18 +61,22 @@ class RoomData:
The next_id field is used to generate unique bounty IDs within this room.
The timezone field stores the room's timezone (e.g., "Asia/Jakarta"), default UTC+0.
The admin_user_ids field lists users who have admin privileges in this room.
The admin_usernames field lists usernames who have admin privileges in this room.
The categories field contains all categories for organizing bounties in this room.
"""
room_id: int
bounties: list[Bounty]
next_id: int
timezone: str | None = None
admin_user_ids: list[int] | None = None
admin_usernames: list[str] | None = None
categories: list[Category] = field(default_factory=list)
def __post_init__(self):
if self.admin_user_ids is None:
self.admin_user_ids = []
if self.admin_usernames is None:
self.admin_usernames = []
if self.categories is None:
self.categories = []
@dataclass

View File

@@ -32,10 +32,6 @@ class RoomStorage(Protocol):
"""Update an existing bounty in a room."""
...
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
"""Delete a bounty from a room."""
...
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty from a room by ID."""
...

View File

@@ -25,68 +25,79 @@ class BountyService:
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, user_id: int) -> bool:
"""Check if user is admin in a room."""
def is_admin(self, room_id: int, username: str | None) -> bool:
"""Check if user is admin in a room by username."""
if not username:
return False
room_data = self._storage.load(room_id)
if room_data is None:
return False
return user_id in (room_data.admin_user_ids or [])
return username in (room_data.admin_usernames or [])
def add_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
room_data = self._storage.load(room_id)
has_no_admins = room_data is None or not room_data.admin_user_ids
is_self_promotion = requesting_user_id == admin_user_id
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_user_id):
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
if room_data is None or room_data.admin_user_ids is None:
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
if admin_user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids.append(admin_user_id)
admin_usernames = room_data.admin_usernames
if admin_usernames is None:
admin_usernames = []
room_data.admin_usernames = []
if username in admin_usernames:
raise ValueError(f"@{username} is already an admin.")
admin_usernames.append(username)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None:
return
if room_data is None or not (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
if admin_user_id in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).remove(admin_user_id)
if username not in (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
(room_data.admin_usernames or []).remove(username)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[int]:
"""List all admin user IDs in a room."""
def list_admins(self, room_id: int) -> list[str]:
"""List all admin usernames in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_user_ids or [])
return list(room_data.admin_usernames or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_user_id: int
self, room_id: int, timezone: str, requesting_username: str | None
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
room_data.timezone = timezone
@@ -121,13 +132,14 @@ class BountyService:
self,
room_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
created_by_username: Optional[str] = None,
) -> Bounty:
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
@@ -168,12 +180,12 @@ class BountyService:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, user_id: int) -> str:
def recover_bounty(self, room_id: int, bounty_id: int, username: str | None) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
@@ -187,7 +199,7 @@ class BountyService:
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
@@ -195,7 +207,7 @@ class BountyService:
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, user_id)
results[bounty_id] = self.recover_bounty(room_id, bounty_id, username)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
@@ -209,7 +221,7 @@ class BountyService:
self,
room_id: int,
bounty_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
@@ -220,7 +232,7 @@ class BountyService:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
@@ -243,12 +255,12 @@ class BountyService:
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
def delete_bounty(self, room_id: int, bounty_id: int, username: str | None) -> bool:
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can delete bounties.")
bounty.deleted_at = int(time.time())
@@ -256,7 +268,7 @@ class BountyService:
return True
def delete_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
@@ -268,7 +280,7 @@ class BountyService:
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
results[bounty_id] = "permission_denied"
continue

544
docs/AUDIT_AND_SPEC.md Normal file
View File

@@ -0,0 +1,544 @@
# JIGAIDO Audit & Feature Specification
> Document created: 2026-04-09
> Purpose: Repository audit findings and category feature specification
---
# Part I: Repository Audit
## 1.1 Current Architecture
JIGAIDO follows **hexagonal architecture** with clear separation:
```
jigaido/
├── core/ # Domain layer (pure Python, no deps)
│ ├── models.py # Domain dataclasses
│ ├── ports.py # Storage interfaces
│ └── services.py # Business logic
├── adapters/
│ └── storage/
│ └── json_file.py # JSON file persistence
├── apps/
│ └── telegram-bot/
│ ├── bot.py # Bot entrypoint
│ └── commands.py # Command handlers
├── tests/ # Unit tests (98 tests passing)
├── config.py # Configuration
├── SPEC.md # Original design spec
└── README.md # Overview
```
## 1.2 Features Implemented
| Feature | Status | Location |
|---------|--------|----------|
| Group bounty management | ✅ Done | `BountyService` |
| Personal DM bounties | ✅ Done | Same service, different room_id |
| Admin management | ✅ Done | `add_admin`, `remove_admin`, `list_admins` |
| Soft delete (recoverable) | ✅ Done | `delete_bounty`, `recover_bounty` |
| Due date with timezone | ✅ Done | `dateparser` + `ZoneInfo` |
| Link deduplication | ✅ Done | `check_link_unique` |
| Tracking/untracking | ✅ Done | `TrackingService` |
| `/track` in groups only | ✅ Done | Command handler |
| Expired bounty filtering | ✅ Done | 24h cutoff logic |
| Timezone per room | ✅ Done | `set_timezone`, `get_timezone` |
## 1.3 Bugs & Issues Found
### Bug 1: Admin Promotion Logic Edge Case
**Location**: `core/services.py` - `add_admin()`
**Issue**: The first admin can self-promote, but if the Telegram group creator joins later, they won't be recognized as admin since they're not in `admin_usernames`.
**Code**:
```python
# core/services.py:44-49
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
```
**Recommendation**: Document this behavior or enhance to auto-detect Telegram group creator.
---
### Bug 2: Hard Delete Method in Storage
**Location**: `adapters/storage/json_file.py:113-119`
**Issue**: `delete_bounty()` in storage does hard delete, while `BountyService.delete_bounty()` does soft delete. The storage method is unused but confusing.
**Code**:
```python
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
"""Delete a bounty from a room."""
room_data = self.load(room_id)
if room_data is None:
return
room_data.bounties = [b for b in room_data.bounties if b.id != bounty_id]
self.save(room_data)
```
**Recommendation**: Remove or mark as deprecated.
---
### Issue 3: Spec vs Code Inconsistency
**Location**: `SPEC.md` vs `commands.py`
**Issue**: SPEC.md says anyone can `/add` in groups, but code requires admin.
| Command | SPEC.md | Code |
|---------|---------|------|
| `/add` | anyone | admin only |
| `/edit` | creator only | admin only |
| `/delete` | creator only | admin only |
**Recommendation**: Update SPEC.md to reflect actual implementation.
---
### Issue 4: No Input Sanitization
**Location**: `commands.py` - `parse_args()`
**Issue**: Links without proper scheme (e.g., `github.com/user/repo`) are accepted as-is.
**Recommendation**: Consider normalizing URL scheme or validating format.
---
### Issue 5: No Rate Limiting
**Location**: `/add` command
**Issue**: No limit on:
- Number of bounties per room
- Text length
- Request rate
**Recommendation**: Add rate limiting for production use.
---
## 1.4 Test Status
```bash
$ PYTHONPATH=. python -m pytest tests/
======================== 98 passed, 1 warning ========================
```
**Note**: Tests require `PYTHONPATH=.` to run. Consider adding `pytest.ini` or `pyproject.toml`.
---
# Part II: Category Feature Specification
## 2.1 Overview
Add category support to JIGAIDO to allow filtering and organizing bounties.
**Goals**:
- Admin-only category management (create, delete)
- Multiple categories per bounty (no duplicates)
- Filter bounties by category
- Show categories on `/show` command
- Backward compatibility (existing bounties work without categories)
## 2.2 Data Model
### Category
```python
@dataclass
class Category:
"""A category for organizing bounties in a room."""
id: str # slug: lowercase alphabetic only, e.g., "bug", "feature"
name: str # display name: e.g., "Bug", "Feature"
created_at: int
deleted_at: int | None = None # soft delete
```
**Constraints**:
- `id` (slug): lowercase alphabetic only, no symbols, e.g., `^[a-z]+$`
- `name`: human-readable display
- Unique within room (slug must be unique)
- Soft delete preserves data
### Bounty (Modified)
```python
@dataclass
class Bounty:
# ... existing fields ...
id: int
text: str | None
link: str | None
due_date_ts: int | None
created_at: int
created_by_user_id: int
deleted_at: int | None = None
created_by_username: str | None = None
category_ids: list[str] = field(default_factory=list) # NEW
```
### RoomData (Modified)
```python
@dataclass
class RoomData:
room_id: int
bounties: list[Bounty]
next_id: int
timezone: str | None = None
admin_usernames: list[str] | None = None
categories: list[Category] = field(default_factory=list) # NEW
```
## 2.3 Category Scope
- **Per room**: Same as bounties, each room (group/DM) has independent categories
- **Admin only**: Only admins can create/delete categories
- **User access**: Regular users can only filter by category
## 2.4 Service Layer API
All methods require admin permission unless specified otherwise.
### Category Management
```python
class BountyService:
# ... existing methods ...
# --- Category Management ---
def add_category(
self,
room_id: int,
slug: str,
name: str,
username: str | None
) -> Category:
"""Create a new category. Admin only.
Args:
room_id: Room identifier
slug: Category ID (lowercase alphabetic, e.g., "bug")
name: Display name (e.g., "Bug Report")
username: Requesting admin's username
Returns:
Created Category
Raises:
PermissionError: If not admin
ValueError: If slug already exists or invalid
"""
...
def delete_category(
self,
room_id: int,
slug: str,
username: str | None
) -> bool:
"""Soft delete a category. Admin only.
Args:
room_id: Room identifier
slug: Category slug to delete
username: Requesting admin's username
Returns:
True if deleted, False if not found
"""
...
def list_categories(self, room_id: int) -> list[Category]:
"""List active categories (excludes soft-deleted).
Args:
room_id: Room identifier
Returns:
List of active categories
"""
...
def get_category(self, room_id: int, slug: str) -> Category | None:
"""Get a category by slug (excludes soft-deleted).
Args:
room_id: Room identifier
slug: Category slug
Returns:
Category or None if not found
"""
...
```
### Category-to-Bounty Association
```python
def add_category_to_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None
) -> bool:
"""Add category to a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to add
username: Requesting admin's username
Returns:
True if newly added, False if already exists
Raises:
PermissionError: If not admin
ValueError: If bounty or category not found
"""
...
def remove_category_from_bounty(
self,
room_id: int,
bounty_id: int,
category_slug: str,
username: str | None
) -> bool:
"""Remove category from a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slug: Category slug to remove
username: Requesting admin's username
Returns:
True if removed, False if not found
"""
...
def update_bounty_categories(
self,
room_id: int,
bounty_id: int,
category_slugs: list[str],
username: str | None
) -> bool:
"""Replace all categories on a bounty. Admin only.
Args:
room_id: Room identifier
bounty_id: Bounty ID
category_slugs: New list of category slugs
username: Requesting admin's username
Returns:
True if updated
Raises:
PermissionError: If not admin
ValueError: If bounty or any category not found
"""
...
```
### Bounty Listing with Category Filter
```python
def list_bounties(
self,
room_id: int,
category_slugs: list[str] | None = None,
include_expired: bool = False
) -> list[Bounty]:
"""List bounties with optional category filtering.
Args:
room_id: Room identifier
category_slugs: If provided, filter by ANY of these categories (OR)
include_expired: If True, include bounties past due date
Returns:
List of non-deleted bounties, sorted by due date
"""
...
```
## 2.5 Filtering Logic
- **Single category**: `/bounty -c bug` → bounties with "bug"
- **Multiple categories (OR)**: `/bounty -c bug,feature` → bounties with "bug" OR "feature"
- **No filter**: `/bounty` → all bounties (current behavior)
## 2.6 Command Syntax
### Category Management
```
/category - list categories
/category add <slug> <name> - create category (admin)
/category delete <slug> - soft delete category (admin)
```
### Bounty with Category
```
/add <text> [link] [date] -cat <slug> - add with category
/add <text> [link] [date] -cat <slug1>,<slug2> - add with multiple categories
/update <id> -cat <slug> - add category to bounty
/update <id> -cat <slug1>,<slug2> - set categories (replace all)
/update <id> -cat - - clear all categories
/update <id> -remove-cat <slug> - remove specific category
```
### Bounty Listing with Filter
```
/bounty - all bounties (current)
/bounty -c <slug> - filter by category
/bounty -c <slug1>,<slug2> - filter by multiple categories (OR)
/bounty all - show expired (current)
/bounty all -c <slug> - show expired + filter by category
```
### Show Bounty
```
/show <id> - show bounty details with categories
```
## 2.7 Display Format
### `/category` output
```
Categories:
- bug → Bug Report
- feature → Feature Request
- docs → Documentation
```
### `/show <id>` output (with categories)
```
[#1] Fix login bug
🔗 https://github.com/...
📅 15 April 2026 14:30 (Asia/Jakarta)
📂 Categories: bug | feature
👤 @username
📌 Created: 2026-04-01 10:00
```
### `/bounty -c bug` output
```
Filtering with 🐛 bug category:
Showing 3 of 10 bounties:
[#5] ...
[#1] ...
[#3] ...
```
### `/bounty -c bug,feature` output
```
Filtering with 🐛 bug, ✨ feature categories:
Showing 5 of 10 bounties:
[#5] ...
[#1] ...
```
## 2.8 Edge Cases
| Scenario | Behavior |
|----------|----------|
| Delete category | Soft delete - existing bounties keep category in data, but filter won't find it |
| Filter by deleted category | Show "No bounties with this category" or error |
| Add duplicate category to bounty | No-op, return False |
| Add invalid slug (uppercase/symbols) | Reject with validation error |
| Category slug conflict | Reject with "Category already exists" |
| Bounty without categories | `category_ids = []` (backward compatible) |
## 2.9 Test Cases to Add
```python
# Category Management
def test_add_category_requires_admin():
def test_add_category_duplicate_slug_fails():
def test_add_category_invalid_slug_fails():
def test_add_category_valid():
def test_delete_category_soft_deletes():
def test_deleted_category_not_listed():
def test_deleted_category_still_in_bounty_data():
def test_list_categories_empty():
def test_list_categories_returns_active():
def test_get_category_not_found():
def test_get_category_deleted_returns_none():
# Category-to-Bounty
def test_add_category_to_bounty():
def test_add_duplicate_category_to_bounty_noop():
def test_add_category_to_bounty_invalid_category():
def test_remove_category_from_bounty():
def test_remove_category_not_on_bounty_returns_false():
def test_update_bounty_categories_replace_all():
def test_update_bounty_categories_validates():
# Bounty Listing with Filter
def test_list_bounties_filter_by_single_category():
def test_list_bounties_filter_by_multiple_categories_or():
def test_list_bounties_no_category_returns_all():
def test_list_bounties_category_excludes_deleted_bounties():
```
---
# Part III: Implementation Checklist
## Models Layer
- [ ] Add `Category` dataclass to `core/models.py`
- [ ] Add `category_ids` field to `Bounty` dataclass
- [ ] Add `categories` field to `RoomData` dataclass
## Storage Layer
- [ ] Update `JsonFileRoomStorage.load()` to deserialize categories
- [ ] Update `JsonFileRoomStorage.save()` to serialize categories
## Service Layer
- [ ] Implement `add_category()`
- [ ] Implement `delete_category()`
- [ ] Implement `list_categories()`
- [ ] Implement `get_category()`
- [ ] Implement `add_category_to_bounty()`
- [ ] Implement `remove_category_from_bounty()`
- [ ] Implement `update_bounty_categories()`
- [ ] Update `list_bounties()` to support category filter
## Command Layer
- [ ] Add `/category` command handler
- [ ] Add `-cat` flag parsing to `/add`
- [ ] Add `-cat` and `-remove-cat` flags to `/update`
- [ ] Add `-c` flag to `/bounty` for category filter
- [ ] Update `/show` to display categories
## Tests
- [ ] Add category management tests
- [ ] Add category-to-bounty tests
- [ ] Add category filter tests
## Documentation
- [ ] Update README.md with category feature
- [ ] Update command help text
---
# Part IV: Open Questions
1. **Category icon/emoji**: Should categories have optional emoji? (Not in initial spec, can add later)
2. **Category reactivation**: Should soft-deleted categories be reactable? (Not in initial spec)
3. **Bulk category operations**: Should we support `/category add bulk`? (Not in initial spec)
---
*End of Audit & Specification Document*

4
pytest.ini Normal file
View File

@@ -0,0 +1,4 @@
[pytest]
testpaths = tests
pythonpath = .
asyncio_default_fixture_loop_scope = function

View File

@@ -3,7 +3,6 @@
import pytest
from unittest.mock import patch, MagicMock
from io import StringIO
import sys
from core.models import Bounty
from core.ports import RoomStorage, TrackingStorage
@@ -237,7 +236,7 @@ class TestCLIValidation:
main()
mock_bounty_service.update_bounty.assert_called_once()
call_kwargs = mock_bounty_service.update_bounty.call_args
assert call_kwargs.kwargs.get("clear_link") == True
assert call_kwargs.kwargs.get("clear_link") is True
def test_update_clear_due_flag(self):
"""Test update with --clear-due flag."""
@@ -255,7 +254,7 @@ class TestCLIValidation:
main()
mock_bounty_service.update_bounty.assert_called_once()
call_kwargs = mock_bounty_service.update_bounty.call_args
assert call_kwargs.kwargs.get("clear_due") == True
assert call_kwargs.kwargs.get("clear_due") is True
class TestCLIOutput:

View File

@@ -2,11 +2,9 @@
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from config import Config, DEFAULT_DATA_DIR

View File

@@ -1,11 +1,9 @@
"""Tests for adapters/storage/json_file.py — JSON file storage adapter."""
import json
import os
import tempfile
from pathlib import Path
import pytest
from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
@@ -99,15 +97,6 @@ class TestJsonFileRoomStorage:
assert self.storage.load(-1001) is None
def test_delete_bounty(self):
"""Test that delete_bounty removes a bounty."""
bounty = self._create_bounty(id=1)
self.storage.add_bounty(-1001, bounty)
self.storage.delete_bounty(-1001, 1)
loaded = self.storage.load(-1001)
assert len(loaded.bounties) == 0
def test_get_bounty_found(self):
"""Test that get_bounty returns the bounty when found."""
bounty = self._create_bounty(id=1)

View File

@@ -1,8 +1,6 @@
"""Tests for core/models.py — domain dataclasses."""
import time
import pytest
from core.models import (
Bounty,
@@ -122,7 +120,7 @@ class TestRoomData:
assert rd.bounties == []
assert rd.next_id == 1
assert rd.timezone is None
assert rd.admin_user_ids == []
assert rd.admin_usernames == []
def test_create_dm_room_data(self):
rd = RoomData(
@@ -134,7 +132,7 @@ class TestRoomData:
assert rd.bounties == []
assert rd.next_id == 1
assert rd.timezone is None
assert rd.admin_user_ids == []
assert rd.admin_usernames == []
def test_room_data_with_bounties(self):
b = Bounty(
@@ -156,18 +154,18 @@ class TestRoomData:
bounties=[],
next_id=1,
timezone="Asia/Jakarta",
admin_user_ids=[123, 456],
admin_usernames=["alice", "bob"],
)
assert rd.timezone == "Asia/Jakarta"
assert rd.admin_user_ids == [123, 456]
assert rd.admin_usernames == ["alice", "bob"]
def test_room_data_admin_user_ids_defaults_to_empty_list(self):
def test_room_data_admin_usernames_defaults_to_empty_list(self):
rd = RoomData(
room_id=-1001,
bounties=[],
next_id=1,
)
assert rd.admin_user_ids == []
assert rd.admin_usernames == []
class TestTrackingData:

View File

@@ -1,6 +1,5 @@
"""Tests for core/ports.py — storage interfaces."""
import pytest
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
from core.ports import RoomStorage, TrackingStorage
@@ -35,12 +34,6 @@ class SimpleRoomStorage:
self._rooms[room_id].bounties[i] = bounty
break
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
if room_id in self._rooms:
self._rooms[room_id].bounties = [
b for b in self._rooms[room_id].bounties if b.id != bounty_id
]
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
if room_id in self._rooms:
for b in self._rooms[room_id].bounties:
@@ -120,12 +113,6 @@ class MockRoomStorage:
self._rooms[room_id].bounties[i] = bounty
break
def delete_bounty(self, room_id: int, bounty_id: int) -> None:
if room_id in self._rooms:
self._rooms[room_id].bounties = [
b for b in self._rooms[room_id].bounties if b.id != bounty_id
]
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
if room_id in self._rooms:
for b in self._rooms[room_id].bounties:
@@ -243,19 +230,6 @@ class TestRoomStorage:
assert result is not None
assert result.text == "Updated"
def test_delete_bounty(self):
storage = MockRoomStorage()
bounty = Bounty(
id=1,
text="Test",
link=None,
due_date_ts=None,
created_at=0,
created_by_user_id=123,
)
storage.add_bounty(-1001, bounty)
storage.delete_bounty(-1001, 1)
assert storage.get_bounty(-1001, 1) is None
class TestTrackingStorage:

View File

@@ -1,9 +1,8 @@
"""Tests for core/services.py — business logic services."""
import pytest
from unittest.mock import MagicMock
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
from core.models import RoomData
from core.services import BountyService, TrackingService
from tests.test_ports import MockRoomStorage, MockTrackingStorage
@@ -15,32 +14,34 @@ class TestBountyService:
"""Set up fresh storage and service for each test."""
self.storage = MockRoomStorage()
self.service = BountyService(self.storage)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
self.admin_username = "admin"
self._make_admin(-1001, self.admin_username)
def _make_admin(self, room_id: int, user_id: int):
def _make_admin(self, room_id: int, username: str):
"""Helper to set up a room with an admin user."""
room_data = self.storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
if username not in (room_data.admin_usernames or []):
room_data.admin_usernames = room_data.admin_usernames or []
room_data.admin_usernames.append(username)
self.storage.save(room_data)
def test_add_bounty_creates_room_if_not_exists(self):
"""Test that add_bounty creates a new room if it doesn't exist."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=self.admin_user_id,
user_id=123,
username=self.admin_username,
text="Fix bug",
link="https://github.com/issue/1",
created_by_username=self.admin_username,
)
assert bounty.id == 1
assert bounty.text == "Fix bug"
assert bounty.created_by_user_id == self.admin_user_id
assert bounty.created_by_user_id == 123
room = self.storage.load(-1001)
assert room is not None
@@ -49,13 +50,13 @@ class TestBountyService:
def test_add_bounty_increments_id(self):
"""Test that add_bounty increments bounty ID for each new bounty."""
b1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="First"
room_id=-1001, user_id=123, username=self.admin_username, text="First"
)
b2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
room_id=-1001, user_id=123, username=self.admin_username, text="Second"
)
b3 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Third"
room_id=-1001, user_id=123, username=self.admin_username, text="Third"
)
assert b1.id == 1
@@ -65,7 +66,9 @@ class TestBountyService:
def test_add_bounty_requires_admin(self):
"""Test that add_bounty raises PermissionError when non-admin tries to add."""
with pytest.raises(PermissionError, match="Only admins can add bounties"):
self.service.add_bounty(room_id=-1001, user_id=999, text="Not admin")
self.service.add_bounty(
room_id=-1001, user_id=999, username="nonadmin", text="Not admin"
)
def test_list_bounties_empty_room(self):
"""Test list_bounties returns empty list for non-existent room."""
@@ -74,14 +77,16 @@ class TestBountyService:
def test_list_bounties_returns_all_bounties(self):
"""Test list_bounties returns all bounties in a room."""
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="First")
self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
room_id=-1001, user_id=123, username=self.admin_username, text="First"
)
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Second"
)
# Add bounty to different room to verify isolation
self._make_admin(-999, self.admin_user_id)
self._make_admin(-999, "otheradmin")
self.service.add_bounty(
room_id=-999, user_id=self.admin_user_id, text="Other room"
room_id=-999, user_id=456, username="otheradmin", text="Other room"
)
bounties = self.service.list_bounties(-1001)
@@ -91,7 +96,7 @@ class TestBountyService:
def test_get_bounty_found(self):
"""Test get_bounty returns bounty when it exists."""
created = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Test"
room_id=-1001, user_id=123, username=self.admin_username, text="Test"
)
found = self.service.get_bounty(-1001, created.id)
assert found is not None
@@ -104,19 +109,21 @@ class TestBountyService:
def test_get_bounty_wrong_room(self):
"""Test get_bounty returns None when bounty is in different room."""
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="Test")
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Test"
)
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
assert found is None
def test_update_bounty_success(self):
"""Test update_bounty succeeds when admin updates their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Original"
room_id=-1001, user_id=123, username=self.admin_username, text="Original"
)
result = self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=self.admin_user_id,
username=self.admin_username,
text="Updated",
)
assert result is True
@@ -126,13 +133,13 @@ class TestBountyService:
def test_update_bounty_not_admin_raises_permission_error(self):
"""Test update_bounty raises PermissionError when non-admin tries to update."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Original"
room_id=-1001, user_id=123, username=self.admin_username, text="Original"
)
with pytest.raises(PermissionError, match="Only admins can edit bounties"):
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=999, # different user, not admin
username="nonadmin", # different user, not admin
text="Hacked",
)
@@ -141,7 +148,7 @@ class TestBountyService:
result = self.service.update_bounty(
room_id=-1001,
bounty_id=999,
user_id=self.admin_user_id,
username=self.admin_username,
text="Updated",
)
assert result is False
@@ -150,14 +157,15 @@ class TestBountyService:
"""Test update_bounty only updates provided fields."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=self.admin_user_id,
user_id=123,
username=self.admin_username,
text="Original",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=self.admin_user_id,
username=self.admin_username,
text="Updated only text",
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -168,14 +176,15 @@ class TestBountyService:
"""Test update_bounty can clear link."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=self.admin_user_id,
user_id=123,
username=self.admin_username,
text="Test",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=self.admin_user_id,
username=self.admin_username,
clear_link=True,
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -184,9 +193,9 @@ class TestBountyService:
def test_delete_bounty_success(self):
"""Test delete_bounty soft deletes when admin deletes their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_user_id)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_username)
assert result is True
# Soft delete - bounty should not be found via get_bounty
assert self.service.get_bounty(-1001, bounty.id) is None
@@ -198,28 +207,26 @@ class TestBountyService:
def test_delete_bounty_not_admin_raises_permission_error(self):
"""Test delete_bounty raises PermissionError when non-admin tries to delete."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
with pytest.raises(PermissionError, match="Only admins can delete bounties"):
self.service.delete_bounty(
-1001, bounty.id, 999
) # different user, not admin
self.service.delete_bounty(-1001, bounty.id, "nonadmin")
def test_delete_bounty_not_found(self):
"""Test delete_bounty returns False when bounty doesn't exist."""
result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
result = self.service.delete_bounty(-1001, 999, self.admin_username)
assert result is False
def test_delete_bounties_multi_id_success(self):
"""Test delete_bounties returns individual results for multiple bounties."""
bounty1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 1"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete 1"
)
bounty2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 2"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete 2"
)
results = self.service.delete_bounties(
-1001, [bounty1.id, bounty2.id], self.admin_user_id
-1001, [bounty1.id, bounty2.id], self.admin_username
)
assert results == {bounty1.id: "deleted", bounty2.id: "deleted"}
# Verify both are soft deleted
@@ -229,22 +236,22 @@ class TestBountyService:
def test_delete_bounties_mixed_results(self):
"""Test delete_bounties returns not_found for non-existent bounties."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
results = self.service.delete_bounties(
-1001, [bounty.id, 999, 888], self.admin_user_id
-1001, [bounty.id, 999, 888], self.admin_username
)
assert results == {bounty.id: "deleted", 999: "not_found", 888: "not_found"}
def test_delete_bounties_permission_denied(self):
"""Test delete_bounties returns permission_denied for non-admin users."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
results = self.service.delete_bounties(
-1001,
[bounty.id],
999, # non-admin user
"nonadmin", # non-admin user
)
assert results == {bounty.id: "permission_denied"}
# Verify bounty was NOT deleted
@@ -259,29 +266,31 @@ class TestTrackingService:
self.room_storage = MockRoomStorage()
self.tracking_storage = MockTrackingStorage()
self.service = TrackingService(self.tracking_storage, self.room_storage)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
self.admin_username = "admin"
self._make_admin(-1001, self.admin_username)
def _make_admin(self, room_id: int, user_id: int):
def _make_admin(self, room_id: int, username: str):
"""Helper to set up a room with an admin user."""
room_data = self.room_storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
if username not in (room_data.admin_usernames or []):
room_data.admin_usernames = room_data.admin_usernames or []
room_data.admin_usernames.append(username)
self.room_storage.save(room_data)
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
def _add_bounty(self, room_id=-1001, username="admin", text="Test bounty"):
"""Helper to add a bounty for tracking tests."""
if self.room_storage.load(room_id) is None or user_id not in (
self.room_storage.load(room_id).admin_user_ids or []
if self.room_storage.load(room_id) is None or username not in (
self.room_storage.load(room_id).admin_usernames or []
):
self._make_admin(room_id, user_id)
self._make_admin(room_id, username)
bounty_service = BountyService(self.room_storage)
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)
return bounty_service.add_bounty(
room_id=room_id, user_id=123, username=username, text=text
)
def test_track_bounty_success(self):
"""Test track_bounty successfully tracks a bounty."""
@@ -369,11 +378,13 @@ class TestTrackingService:
def test_get_tracked_bounties_ignores_deleted_bounties(self):
"""Test get_tracked_bounties ignores bounties that were deleted."""
bounty_service = BountyService(self.room_storage)
bounty = bounty_service.add_bounty(room_id=-1001, user_id=123, text="To delete")
bounty = bounty_service.add_bounty(
room_id=-1001, user_id=123, username="admin", text="To delete"
)
self.service.track_bounty(-1001, 123456, bounty.id)
# Delete the bounty
bounty_service.delete_bounty(-1001, bounty.id, 123)
bounty_service.delete_bounty(-1001, bounty.id, "admin")
tracked = self.service.get_tracked_bounties(-1001, 123456)
assert len(tracked) == 0 # deleted bounty not returned