Compare commits
9 Commits
235a89653f
...
fix/my-sor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3743dc6a45 | ||
| 7e0bc1f8a3 | |||
|
|
961adf103b | ||
| e9d7ba0c8e | |||
|
|
e43c36b84f | ||
| 44680dcb4c | |||
|
|
7c17bff110 | ||
| 43d07eae92 | |||
|
|
66d2a9fb86 |
@@ -11,7 +11,7 @@ import os
|
|||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from core.models import Bounty, RoomData, TrackingData, TrackedBounty
|
from core.models import Bounty, Category, RoomData, TrackingData, TrackedBounty
|
||||||
|
|
||||||
|
|
||||||
class JsonFileRoomStorage:
|
class JsonFileRoomStorage:
|
||||||
@@ -57,16 +57,28 @@ class JsonFileRoomStorage:
|
|||||||
created_by_user_id=b["created_by_user_id"],
|
created_by_user_id=b["created_by_user_id"],
|
||||||
deleted_at=b.get("deleted_at"),
|
deleted_at=b.get("deleted_at"),
|
||||||
created_by_username=b.get("created_by_username"),
|
created_by_username=b.get("created_by_username"),
|
||||||
|
category_ids=b.get("category_ids", []),
|
||||||
)
|
)
|
||||||
for b in data.get("bounties", [])
|
for b in data.get("bounties", [])
|
||||||
]
|
]
|
||||||
|
|
||||||
|
categories = [
|
||||||
|
Category(
|
||||||
|
id=c["id"],
|
||||||
|
name=c["name"],
|
||||||
|
created_at=c["created_at"],
|
||||||
|
deleted_at=c.get("deleted_at"),
|
||||||
|
)
|
||||||
|
for c in data.get("categories", [])
|
||||||
|
]
|
||||||
|
|
||||||
return RoomData(
|
return RoomData(
|
||||||
room_id=data["room_id"],
|
room_id=data["room_id"],
|
||||||
bounties=bounties,
|
bounties=bounties,
|
||||||
next_id=data["next_id"],
|
next_id=data["next_id"],
|
||||||
timezone=data.get("timezone"),
|
timezone=data.get("timezone"),
|
||||||
admin_usernames=data.get("admin_usernames", []),
|
admin_usernames=data.get("admin_usernames", []),
|
||||||
|
categories=categories,
|
||||||
)
|
)
|
||||||
|
|
||||||
def save(self, room_data: RoomData) -> None:
|
def save(self, room_data: RoomData) -> None:
|
||||||
@@ -76,6 +88,15 @@ class JsonFileRoomStorage:
|
|||||||
"next_id": room_data.next_id,
|
"next_id": room_data.next_id,
|
||||||
"timezone": room_data.timezone,
|
"timezone": room_data.timezone,
|
||||||
"admin_usernames": room_data.admin_usernames or [],
|
"admin_usernames": room_data.admin_usernames or [],
|
||||||
|
"categories": [
|
||||||
|
{
|
||||||
|
"id": c.id,
|
||||||
|
"name": c.name,
|
||||||
|
"created_at": c.created_at,
|
||||||
|
"deleted_at": c.deleted_at,
|
||||||
|
}
|
||||||
|
for c in room_data.categories
|
||||||
|
],
|
||||||
"bounties": [
|
"bounties": [
|
||||||
{
|
{
|
||||||
"id": b.id,
|
"id": b.id,
|
||||||
@@ -86,6 +107,7 @@ class JsonFileRoomStorage:
|
|||||||
"created_by_user_id": b.created_by_user_id,
|
"created_by_user_id": b.created_by_user_id,
|
||||||
"deleted_at": b.deleted_at,
|
"deleted_at": b.deleted_at,
|
||||||
"created_by_username": b.created_by_username,
|
"created_by_username": b.created_by_username,
|
||||||
|
"category_ids": b.category_ids,
|
||||||
}
|
}
|
||||||
for b in room_data.bounties
|
for b in room_data.bounties
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from commands import (
|
|||||||
cmd_add,
|
cmd_add,
|
||||||
cmd_admin,
|
cmd_admin,
|
||||||
cmd_bounty,
|
cmd_bounty,
|
||||||
|
cmd_category,
|
||||||
cmd_delete,
|
cmd_delete,
|
||||||
cmd_delete_message,
|
cmd_delete_message,
|
||||||
cmd_edit,
|
cmd_edit,
|
||||||
@@ -61,6 +62,7 @@ def build_app() -> Application:
|
|||||||
app.add_handler(CommandHandler("timezone", cmd_timezone))
|
app.add_handler(CommandHandler("timezone", cmd_timezone))
|
||||||
app.add_handler(CommandHandler("admin", cmd_admin))
|
app.add_handler(CommandHandler("admin", cmd_admin))
|
||||||
app.add_handler(CommandHandler("recover", cmd_recover))
|
app.add_handler(CommandHandler("recover", cmd_recover))
|
||||||
|
app.add_handler(CommandHandler("category", cmd_category))
|
||||||
|
|
||||||
app.add_handler(CallbackQueryHandler(cmd_delete_message))
|
app.add_handler(CallbackQueryHandler(cmd_delete_message))
|
||||||
|
|
||||||
@@ -82,6 +84,7 @@ async def post_init(app: Application) -> None:
|
|||||||
("timezone", "Get/set room timezone"),
|
("timezone", "Get/set room timezone"),
|
||||||
("admin", "Manage admins"),
|
("admin", "Manage admins"),
|
||||||
("recover", "Recover deleted bounties"),
|
("recover", "Recover deleted bounties"),
|
||||||
|
("category", "Manage categories"),
|
||||||
("help", "Show help"),
|
("help", "Show help"),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -60,12 +60,19 @@ def extract_args(text: str) -> list[str]:
|
|||||||
def parse_args(
|
def parse_args(
|
||||||
args: list[str],
|
args: list[str],
|
||||||
timezone_str: str = "UTC",
|
timezone_str: str = "UTC",
|
||||||
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool]:
|
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool, list[str], bool]:
|
||||||
|
"""Parse command arguments.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(text, link, due_date_ts, clear_link, clear_date, category_slugs, clear_categories)
|
||||||
|
"""
|
||||||
text = None
|
text = None
|
||||||
link = None
|
link = None
|
||||||
due_date_ts = None
|
due_date_ts = None
|
||||||
clear_link = False
|
clear_link = False
|
||||||
clear_date = False
|
clear_date = False
|
||||||
|
category_slugs: list[str] = []
|
||||||
|
clear_categories = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tz = ZoneInfo(timezone_str)
|
tz = ZoneInfo(timezone_str)
|
||||||
@@ -109,6 +116,10 @@ def parse_args(
|
|||||||
return int(localized.timestamp())
|
return int(localized.timestamp())
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def parse_category_slugs(cat_str: str) -> list[str]:
|
||||||
|
"""Parse comma-separated category slugs."""
|
||||||
|
return [s.strip().lower() for s in cat_str.split(",") if s.strip()]
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
while i < len(args):
|
while i < len(args):
|
||||||
arg = args[i]
|
arg = args[i]
|
||||||
@@ -137,6 +148,16 @@ def parse_args(
|
|||||||
else:
|
else:
|
||||||
clear_date = True
|
clear_date = True
|
||||||
i += 1
|
i += 1
|
||||||
|
elif arg == "-cat":
|
||||||
|
if i + 1 < len(args) and args[i + 1] == "-":
|
||||||
|
clear_categories = True
|
||||||
|
i += 2
|
||||||
|
elif i + 1 < len(args) and not args[i + 1].startswith("-"):
|
||||||
|
category_slugs = parse_category_slugs(args[i + 1])
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
clear_categories = True
|
||||||
|
i += 1
|
||||||
elif not link and is_url(arg):
|
elif not link and is_url(arg):
|
||||||
link = normalize_url(arg)
|
link = normalize_url(arg)
|
||||||
i += 1
|
i += 1
|
||||||
@@ -161,7 +182,7 @@ def parse_args(
|
|||||||
else:
|
else:
|
||||||
text = text + " " + arg
|
text = text + " " + arg
|
||||||
|
|
||||||
return text, link, due_date_ts, clear_link, clear_date
|
return text, link, due_date_ts, clear_link, clear_date, category_slugs, clear_categories
|
||||||
|
|
||||||
|
|
||||||
def format_bounty(
|
def format_bounty(
|
||||||
@@ -267,8 +288,23 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
show_all = "all" in args
|
show_all = "all" in args
|
||||||
args = [a for a in args if a != "all"]
|
args = [a for a in args if a != "all"]
|
||||||
|
|
||||||
|
# Parse -c flag for category filter
|
||||||
|
category_slugs: list[str] = []
|
||||||
|
filtered_args = []
|
||||||
|
i = 0
|
||||||
|
while i < len(args):
|
||||||
|
if args[i] == "-c":
|
||||||
|
if i + 1 < len(args) and not args[i + 1].startswith("-"):
|
||||||
|
category_slugs = [s.strip().lower() for s in args[i + 1].split(",")]
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
filtered_args.append(args[i])
|
||||||
|
i += 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
limit = int(args[0]) if args else 5
|
limit = int(filtered_args[0]) if filtered_args else 5
|
||||||
except (ValueError, IndexError):
|
except (ValueError, IndexError):
|
||||||
limit = 5
|
limit = 5
|
||||||
|
|
||||||
@@ -286,6 +322,13 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
return (1, b.created_at)
|
return (1, b.created_at)
|
||||||
|
|
||||||
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
|
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
|
||||||
|
|
||||||
|
# Filter by category if specified
|
||||||
|
if category_slugs:
|
||||||
|
filtered_bounties = [
|
||||||
|
b for b in filtered_bounties if any(cat in b.category_ids for cat in category_slugs)
|
||||||
|
]
|
||||||
|
|
||||||
filtered_bounties.sort(key=sort_key)
|
filtered_bounties.sort(key=sort_key)
|
||||||
|
|
||||||
total_count = len(filtered_bounties)
|
total_count = len(filtered_bounties)
|
||||||
@@ -296,7 +339,12 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
]
|
]
|
||||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
if show_all:
|
if category_slugs:
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"No bounties with category: {', '.join(category_slugs)}",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
elif show_all:
|
||||||
await update.message.reply_text(
|
await update.message.reply_text(
|
||||||
"No bounties yet.",
|
"No bounties yet.",
|
||||||
reply_markup=reply_markup,
|
reply_markup=reply_markup,
|
||||||
@@ -309,6 +357,9 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
lines = []
|
lines = []
|
||||||
|
if category_slugs:
|
||||||
|
lines.append(f"📂 Filtering with {', '.join(category_slugs)} categories:")
|
||||||
|
|
||||||
if limit < total_count:
|
if limit < total_count:
|
||||||
lines.append(f"Showing {limit} of {total_count} bounties:")
|
lines.append(f"Showing {limit} of {total_count} bounties:")
|
||||||
slice_length = 40
|
slice_length = 40
|
||||||
@@ -335,6 +386,17 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
|
|
||||||
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
user_id = get_user_id(update)
|
user_id = get_user_id(update)
|
||||||
|
args = extract_args(update.message.text)
|
||||||
|
|
||||||
|
# Parse show_all flag
|
||||||
|
show_all = "all" in args
|
||||||
|
args = [a for a in args if a != "all"]
|
||||||
|
|
||||||
|
# Parse optional limit
|
||||||
|
try:
|
||||||
|
limit = int(args[0]) if args else 5
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
limit = 5
|
||||||
|
|
||||||
if is_group(update):
|
if is_group(update):
|
||||||
group_id = get_group_id(update)
|
group_id = get_group_id(update)
|
||||||
@@ -346,7 +408,25 @@ async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
|
|
||||||
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
||||||
|
|
||||||
if not bounties:
|
now = int(time.time())
|
||||||
|
cutoff_24h = now - 86400
|
||||||
|
|
||||||
|
def is_expired(b) -> bool:
|
||||||
|
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
|
||||||
|
|
||||||
|
def sort_key(b):
|
||||||
|
if b.due_date_ts is not None:
|
||||||
|
return (0, b.due_date_ts)
|
||||||
|
return (1, b.created_at)
|
||||||
|
|
||||||
|
# Filter expired and sort
|
||||||
|
filtered_bounties = [b for b in bounties if not is_expired(b) or show_all]
|
||||||
|
filtered_bounties.sort(key=sort_key)
|
||||||
|
|
||||||
|
total_count = len(filtered_bounties)
|
||||||
|
displayed_bounties = filtered_bounties[:limit]
|
||||||
|
|
||||||
|
if not displayed_bounties:
|
||||||
msg = (
|
msg = (
|
||||||
"You are not tracking any bounties."
|
"You are not tracking any bounties."
|
||||||
if is_group(update)
|
if is_group(update)
|
||||||
@@ -355,18 +435,35 @@ async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
await update.message.reply_text(msg)
|
await update.message.reply_text(msg)
|
||||||
return
|
return
|
||||||
|
|
||||||
lines = [
|
lines = []
|
||||||
format_bounty(b, show_id=True, timezone_str=timezone_str) for b in bounties
|
if limit < total_count:
|
||||||
]
|
lines.append(f"Showing {limit} of {total_count} bounties:")
|
||||||
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
|
slice_length = 40
|
||||||
|
elif show_all and total_count > limit:
|
||||||
|
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
|
||||||
|
slice_length = 40
|
||||||
|
else:
|
||||||
|
lines.append(f"Showing {total_count} bounties:")
|
||||||
|
slice_length = 0
|
||||||
|
|
||||||
|
for b in displayed_bounties:
|
||||||
|
lines.append(
|
||||||
|
format_bounty(b, show_id=True, slice_length=slice_length, timezone_str=timezone_str)
|
||||||
|
)
|
||||||
|
|
||||||
|
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
args = extract_args(update.message.text)
|
args = extract_args(update.message.text)
|
||||||
if not args:
|
if not args:
|
||||||
await update.message.reply_text(
|
await update.message.reply_text(
|
||||||
"Usage: /add <text> [link] [due_date]\n"
|
"Usage: /add <text> [link] [due_date] [-cat <slug>[,<slug>]]\n"
|
||||||
"Example: /add Fix the bug https://github.com/foo/bar tomorrow"
|
"Example: /add Fix the bug https://github.com/foo/bar tomorrow -cat bug,urgent"
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -377,7 +474,7 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
effective_user = update.effective_user
|
effective_user = update.effective_user
|
||||||
username = effective_user.username or effective_user.first_name or None
|
username = effective_user.username or effective_user.first_name or None
|
||||||
|
|
||||||
text, link, due_date_ts, _, _ = parse_args(args, timezone_str)
|
text, link, due_date_ts, _, _, category_slugs, _ = parse_args(args, timezone_str)
|
||||||
if not text and not link:
|
if not text and not link:
|
||||||
await update.message.reply_text("A bounty needs at least text or a link.")
|
await update.message.reply_text("A bounty needs at least text or a link.")
|
||||||
return
|
return
|
||||||
@@ -392,6 +489,18 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
due_date_ts=due_date_ts,
|
due_date_ts=due_date_ts,
|
||||||
created_by_username=username,
|
created_by_username=username,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Add categories if specified
|
||||||
|
for slug in category_slugs:
|
||||||
|
try:
|
||||||
|
BOUNTY_SERVICE.add_category_to_bounty(
|
||||||
|
room_id, bounty.id, slug, username
|
||||||
|
)
|
||||||
|
except ValueError:
|
||||||
|
pass # Category doesn't exist, skip it
|
||||||
|
except PermissionError:
|
||||||
|
pass
|
||||||
|
|
||||||
except PermissionError as e:
|
except PermissionError as e:
|
||||||
keyboard = [
|
keyboard = [
|
||||||
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
@@ -411,10 +520,15 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
if due_date_ts:
|
if due_date_ts:
|
||||||
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
||||||
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
|
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
|
||||||
|
|
||||||
|
cat_str = ""
|
||||||
|
if category_slugs:
|
||||||
|
cat_str = f" | 📂 {', '.join(category_slugs)}"
|
||||||
|
|
||||||
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
|
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
|
||||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
await update.message.reply_text(
|
await update.message.reply_text(
|
||||||
f"✅ Bounty added (#{bounty.id}){due_str}",
|
f"✅ Bounty added (#{bounty.id}){due_str}{cat_str}",
|
||||||
disable_web_page_preview=True,
|
disable_web_page_preview=True,
|
||||||
reply_markup=reply_markup,
|
reply_markup=reply_markup,
|
||||||
)
|
)
|
||||||
@@ -443,11 +557,15 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
"Usage: /update <bounty_id> [text] [link] [due_date]\n"
|
"Usage: /update <bounty_id> [text] [link] [due_date]\n"
|
||||||
" /update <bounty_id> -link [<url>] - clear or set link\n"
|
" /update <bounty_id> -link [<url>] - clear or set link\n"
|
||||||
" /update <bounty_id> -date [<date>] - clear or set date\n"
|
" /update <bounty_id> -date [<date>] - clear or set date\n"
|
||||||
|
" /update <bounty_id> -cat <slug>[,<slug>] - add/replace categories\n"
|
||||||
|
" /update <bounty_id> -cat - - clear categories\n"
|
||||||
|
" /update <bounty_id> -remove-cat <slug> - remove category\n"
|
||||||
"Examples:\n"
|
"Examples:\n"
|
||||||
" /update 1 new text - update text only\n"
|
" /update 1 new text - update text only\n"
|
||||||
" /update 1 -link - clear link\n"
|
" /update 1 -link - clear link\n"
|
||||||
" /update 1 -link https://... - set link\n"
|
" /update 1 -link https://... - set link\n"
|
||||||
" /update 1 -link -date - clear both link and date"
|
" /update 1 -cat bug,feature - set categories\n"
|
||||||
|
" /update 1 -remove-cat bug - remove category"
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -466,23 +584,38 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
|
|
||||||
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
||||||
|
|
||||||
text, link, due_date_ts, clear_link, clear_date = parse_args(args[1:], timezone_str)
|
# Parse flags separately to handle -cat and -remove-cat specially
|
||||||
if (
|
remaining_args = args[1:]
|
||||||
not text
|
category_slugs: list[str] = []
|
||||||
and not link
|
clear_categories = False
|
||||||
and due_date_ts is None
|
remove_category_slug: str | None = None
|
||||||
and not clear_link
|
|
||||||
and not clear_date
|
# Process remaining args for special flags
|
||||||
):
|
filtered_args = []
|
||||||
keyboard = [
|
i = 0
|
||||||
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
while i < len(remaining_args):
|
||||||
]
|
arg = remaining_args[i]
|
||||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
if arg == "-cat":
|
||||||
await update.message.reply_text(
|
if i + 1 < len(remaining_args) and remaining_args[i + 1] == "-":
|
||||||
"Nothing to update.",
|
clear_categories = True
|
||||||
reply_markup=reply_markup,
|
i += 2
|
||||||
)
|
elif i + 1 < len(remaining_args) and not remaining_args[i + 1].startswith("-"):
|
||||||
return
|
category_slugs = [s.strip().lower() for s in remaining_args[i + 1].split(",")]
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
clear_categories = True
|
||||||
|
i += 1
|
||||||
|
elif arg == "-remove-cat":
|
||||||
|
if i + 1 < len(remaining_args) and not remaining_args[i + 1].startswith("-"):
|
||||||
|
remove_category_slug = remaining_args[i + 1].lower()
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
filtered_args.append(arg)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
text, link, due_date_ts, clear_link, clear_date, _, _ = parse_args(filtered_args, timezone_str)
|
||||||
|
|
||||||
old_bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
|
old_bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
|
||||||
if not old_bounty:
|
if not old_bounty:
|
||||||
@@ -496,8 +629,34 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
changes = []
|
||||||
|
|
||||||
|
# Handle category operations
|
||||||
|
if clear_categories:
|
||||||
try:
|
try:
|
||||||
success = BOUNTY_SERVICE.update_bounty(
|
BOUNTY_SERVICE.update_bounty_categories(room_id, bounty_id, [], username)
|
||||||
|
changes.append("Categories cleared")
|
||||||
|
except (PermissionError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
if category_slugs:
|
||||||
|
try:
|
||||||
|
BOUNTY_SERVICE.update_bounty_categories(room_id, bounty_id, category_slugs, username)
|
||||||
|
changes.append(f"Categories set: {', '.join(category_slugs)}")
|
||||||
|
except (PermissionError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
if remove_category_slug:
|
||||||
|
try:
|
||||||
|
BOUNTY_SERVICE.remove_category_from_bounty(room_id, bounty_id, remove_category_slug, username)
|
||||||
|
changes.append(f"Removed category: {remove_category_slug}")
|
||||||
|
except (PermissionError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Only update other fields if something was provided
|
||||||
|
if text is not None or link is not None or due_date_ts is not None or clear_link or clear_date:
|
||||||
|
try:
|
||||||
|
BOUNTY_SERVICE.update_bounty(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
bounty_id=bounty_id,
|
bounty_id=bounty_id,
|
||||||
username=username,
|
username=username,
|
||||||
@@ -507,23 +666,6 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
clear_link=clear_link,
|
clear_link=clear_link,
|
||||||
clear_due=clear_date,
|
clear_due=clear_date,
|
||||||
)
|
)
|
||||||
except PermissionError as e:
|
|
||||||
keyboard = [
|
|
||||||
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
|
||||||
]
|
|
||||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
|
||||||
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
|
||||||
return
|
|
||||||
except ValueError as e:
|
|
||||||
keyboard = [
|
|
||||||
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
|
||||||
]
|
|
||||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
|
||||||
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
|
||||||
return
|
|
||||||
|
|
||||||
if success:
|
|
||||||
changes = []
|
|
||||||
if text is not None:
|
if text is not None:
|
||||||
old_text = old_bounty.text or "(none)"
|
old_text = old_bounty.text or "(none)"
|
||||||
changes.append(f"Text: {old_text} → {text}")
|
changes.append(f"Text: {old_text} → {text}")
|
||||||
@@ -548,6 +690,20 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
else "(none)"
|
else "(none)"
|
||||||
)
|
)
|
||||||
changes.append(f"Date: {old_date} → (cleared)")
|
changes.append(f"Date: {old_date} → (cleared)")
|
||||||
|
except PermissionError as e:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
||||||
|
return
|
||||||
|
except ValueError as e:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
||||||
|
return
|
||||||
|
|
||||||
if changes:
|
if changes:
|
||||||
keyboard = [
|
keyboard = [
|
||||||
@@ -567,8 +723,6 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
f"✅ Bounty #{bounty_id} updated.",
|
f"✅ Bounty #{bounty_id} updated.",
|
||||||
reply_markup=reply_markup,
|
reply_markup=reply_markup,
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
await update.message.reply_text("Bounty not found.")
|
|
||||||
|
|
||||||
|
|
||||||
cmd_edit = cmd_update
|
cmd_edit = cmd_update
|
||||||
@@ -842,6 +996,9 @@ async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
due_str = dt_due.strftime("%d %B %Y %H:%M")
|
due_str = dt_due.strftime("%d %B %Y %H:%M")
|
||||||
lines.append(f"📅 {due_str} ({timezone_str})")
|
lines.append(f"📅 {due_str} ({timezone_str})")
|
||||||
|
|
||||||
|
if bounty.category_ids:
|
||||||
|
lines.append(f"📂 Categories: {' | '.join(bounty.category_ids)}")
|
||||||
|
|
||||||
if bounty.created_by_username:
|
if bounty.created_by_username:
|
||||||
lines.append(
|
lines.append(
|
||||||
f'👤 <a href="tg://user?id={bounty.created_by_user_id}">{bounty.created_by_username}</a>'
|
f'👤 <a href="tg://user?id={bounty.created_by_user_id}">{bounty.created_by_username}</a>'
|
||||||
@@ -1101,6 +1258,162 @@ async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_category(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
"""Handle /category command for listing, adding, and deleting categories."""
|
||||||
|
user_id = get_user_id(update)
|
||||||
|
room_id = get_room_id(update)
|
||||||
|
effective_user = update.effective_user
|
||||||
|
username = effective_user.username or effective_user.first_name or None
|
||||||
|
|
||||||
|
args = extract_args(update.message.text)
|
||||||
|
|
||||||
|
if not args:
|
||||||
|
# List categories
|
||||||
|
categories = BOUNTY_SERVICE.list_categories(room_id)
|
||||||
|
if not categories:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"No categories yet.\n"
|
||||||
|
"Usage: /category add <slug> <name>",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
lines = ["Categories:"]
|
||||||
|
for cat in categories:
|
||||||
|
lines.append(f"- {cat.id} → {cat.name}")
|
||||||
|
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"\n".join(lines),
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
subcommand = args[0]
|
||||||
|
|
||||||
|
if subcommand == "add":
|
||||||
|
# Add category
|
||||||
|
if not BOUNTY_SERVICE.is_admin(room_id, username):
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"⛔ Only admins can add categories.",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(args) < 3:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"Usage: /category add <slug> <name>\n"
|
||||||
|
"Example: /category add bug 'Bug Report'",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
slug = args[1].lower()
|
||||||
|
name = " ".join(args[2:])
|
||||||
|
|
||||||
|
try:
|
||||||
|
category = BOUNTY_SERVICE.add_category(room_id, slug, name, username)
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"✅ Category added: {category.id} → {category.name}",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
except PermissionError as e:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
||||||
|
except ValueError as e:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
||||||
|
return
|
||||||
|
|
||||||
|
elif subcommand == "delete":
|
||||||
|
# Delete category
|
||||||
|
if not BOUNTY_SERVICE.is_admin(room_id, username):
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"⛔ Only admins can delete categories.",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(args) < 2:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"Usage: /category delete <slug>",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
slug = args[1].lower()
|
||||||
|
|
||||||
|
try:
|
||||||
|
success = BOUNTY_SERVICE.delete_category(room_id, slug, username)
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
if success:
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"✅ Category '{slug}' deleted.",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"⛔ Category '{slug}' not found.",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
except PermissionError as e:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup)
|
||||||
|
return
|
||||||
|
|
||||||
|
else:
|
||||||
|
keyboard = [
|
||||||
|
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
|
||||||
|
]
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
await update.message.reply_text(
|
||||||
|
"Usage:\n"
|
||||||
|
"/category - list categories\n"
|
||||||
|
"/category add <slug> <name> - add category (admin)\n"
|
||||||
|
"/category delete <slug> - delete category (admin)",
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
user_id = get_user_id(update)
|
user_id = get_user_id(update)
|
||||||
room_id = get_room_id(update)
|
room_id = get_room_id(update)
|
||||||
@@ -1118,6 +1431,11 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|||||||
"/delete — delete bounty",
|
"/delete — delete bounty",
|
||||||
"/recover — recover deleted bounties",
|
"/recover — recover deleted bounties",
|
||||||
"",
|
"",
|
||||||
|
"📂 Categories:",
|
||||||
|
"/category — list categories",
|
||||||
|
"/category add — add category",
|
||||||
|
"/category delete — delete category",
|
||||||
|
"",
|
||||||
"🔗 Tracking:",
|
"🔗 Tracking:",
|
||||||
"/track — track bounty",
|
"/track — track bounty",
|
||||||
"/untrack — stop tracking",
|
"/untrack — stop tracking",
|
||||||
|
|||||||
@@ -1,6 +1,20 @@
|
|||||||
"""Domain dataclasses for JIGAIDO bounty tracker."""
|
"""Domain dataclasses for JIGAIDO bounty tracker."""
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Category:
|
||||||
|
"""A category for organizing bounties in a room.
|
||||||
|
|
||||||
|
Categories are per-room and support soft delete.
|
||||||
|
The id (slug) must be lowercase alphabetic only (e.g., "bug", "feature").
|
||||||
|
"""
|
||||||
|
|
||||||
|
id: str # slug: lowercase alphabetic only, e.g., "bug", "feature"
|
||||||
|
name: str # display name: e.g., "Bug", "Feature"
|
||||||
|
created_at: int
|
||||||
|
deleted_at: int | None = None # soft delete
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -12,6 +26,8 @@ class Bounty:
|
|||||||
|
|
||||||
The deleted_at field indicates soft-delete: None means not deleted,
|
The deleted_at field indicates soft-delete: None means not deleted,
|
||||||
a value means deleted at that Unix timestamp.
|
a value means deleted at that Unix timestamp.
|
||||||
|
|
||||||
|
The category_ids field lists category slugs associated with this bounty.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
id: int
|
id: int
|
||||||
@@ -22,6 +38,7 @@ class Bounty:
|
|||||||
created_by_user_id: int
|
created_by_user_id: int
|
||||||
deleted_at: int | None = None
|
deleted_at: int | None = None
|
||||||
created_by_username: str | None = None
|
created_by_username: str | None = None
|
||||||
|
category_ids: list[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -45,6 +62,7 @@ class RoomData:
|
|||||||
|
|
||||||
The timezone field stores the room's timezone (e.g., "Asia/Jakarta"), default UTC+0.
|
The timezone field stores the room's timezone (e.g., "Asia/Jakarta"), default UTC+0.
|
||||||
The admin_usernames field lists usernames who have admin privileges in this room.
|
The admin_usernames field lists usernames who have admin privileges in this room.
|
||||||
|
The categories field contains all categories for organizing bounties in this room.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
room_id: int
|
room_id: int
|
||||||
@@ -52,10 +70,13 @@ class RoomData:
|
|||||||
next_id: int
|
next_id: int
|
||||||
timezone: str | None = None
|
timezone: str | None = None
|
||||||
admin_usernames: list[str] | None = None
|
admin_usernames: list[str] | None = None
|
||||||
|
categories: list[Category] = field(default_factory=list)
|
||||||
|
|
||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
if self.admin_usernames is None:
|
if self.admin_usernames is None:
|
||||||
self.admin_usernames = []
|
self.admin_usernames = []
|
||||||
|
if self.categories is None:
|
||||||
|
self.categories = []
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
230
core/services.py
230
core/services.py
@@ -3,7 +3,7 @@
|
|||||||
import time
|
import time
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from core.models import Bounty, RoomData, TrackedBounty, TrackingData
|
from core.models import Bounty, Category, RoomData, TrackedBounty, TrackingData
|
||||||
from core.ports import RoomStorage, TrackingStorage
|
from core.ports import RoomStorage, TrackingStorage
|
||||||
|
|
||||||
|
|
||||||
@@ -289,6 +289,234 @@ class BountyService:
|
|||||||
results[bounty_id] = "deleted"
|
results[bounty_id] = "deleted"
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
# --- Category Management ---
|
||||||
|
|
||||||
|
def add_category(
|
||||||
|
self,
|
||||||
|
room_id: int,
|
||||||
|
slug: str,
|
||||||
|
name: str,
|
||||||
|
username: str | None,
|
||||||
|
) -> Category:
|
||||||
|
"""Create a new category. Admin only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
slug: Category ID (lowercase alphabetic, e.g., "bug")
|
||||||
|
name: Display name (e.g., "Bug Report")
|
||||||
|
username: Requesting admin's username
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Created Category
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PermissionError: If not admin
|
||||||
|
ValueError: If slug already exists or invalid
|
||||||
|
"""
|
||||||
|
if not self.is_admin(room_id, username):
|
||||||
|
raise PermissionError("Only admins can add categories.")
|
||||||
|
|
||||||
|
# Validate slug format (lowercase alphabetic only)
|
||||||
|
if not slug or not slug.isalpha() or not slug.islower():
|
||||||
|
raise ValueError(
|
||||||
|
"Category slug must be lowercase alphabetic only (e.g., 'bug', 'feature')."
|
||||||
|
)
|
||||||
|
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
room_data = RoomData(
|
||||||
|
room_id=room_id, bounties=[], next_id=1, admin_usernames=[], categories=[]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check for duplicate slug
|
||||||
|
for cat in room_data.categories:
|
||||||
|
if cat.id == slug and cat.deleted_at is None:
|
||||||
|
raise ValueError(f"Category '{slug}' already exists.")
|
||||||
|
|
||||||
|
category = Category(
|
||||||
|
id=slug,
|
||||||
|
name=name,
|
||||||
|
created_at=int(time.time()),
|
||||||
|
deleted_at=None,
|
||||||
|
)
|
||||||
|
room_data.categories.append(category)
|
||||||
|
self._storage.save(room_data)
|
||||||
|
return category
|
||||||
|
|
||||||
|
def delete_category(
|
||||||
|
self,
|
||||||
|
room_id: int,
|
||||||
|
slug: str,
|
||||||
|
username: str | None,
|
||||||
|
) -> bool:
|
||||||
|
"""Soft delete a category. Admin only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
slug: Category slug to delete
|
||||||
|
username: Requesting admin's username
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if deleted, False if not found
|
||||||
|
"""
|
||||||
|
if not self.is_admin(room_id, username):
|
||||||
|
raise PermissionError("Only admins can delete categories.")
|
||||||
|
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
for cat in room_data.categories:
|
||||||
|
if cat.id == slug and cat.deleted_at is None:
|
||||||
|
cat.deleted_at = int(time.time())
|
||||||
|
self._storage.save(room_data)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def list_categories(self, room_id: int) -> list[Category]:
|
||||||
|
"""List active categories (excludes soft-deleted).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of active categories
|
||||||
|
"""
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return []
|
||||||
|
return [c for c in room_data.categories if c.deleted_at is None]
|
||||||
|
|
||||||
|
def get_category(self, room_id: int, slug: str) -> Category | None:
|
||||||
|
"""Get a category by slug (excludes soft-deleted).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
slug: Category slug
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Category or None if not found
|
||||||
|
"""
|
||||||
|
room_data = self._storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
return None
|
||||||
|
for cat in room_data.categories:
|
||||||
|
if cat.id == slug and cat.deleted_at is None:
|
||||||
|
return cat
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _validate_category_exists(self, room_id: int, slug: str) -> None:
|
||||||
|
"""Validate that a category exists (and is not deleted). Raises ValueError if not found."""
|
||||||
|
if not self.get_category(room_id, slug):
|
||||||
|
raise ValueError(f"Category '{slug}' not found.")
|
||||||
|
|
||||||
|
def add_category_to_bounty(
|
||||||
|
self,
|
||||||
|
room_id: int,
|
||||||
|
bounty_id: int,
|
||||||
|
category_slug: str,
|
||||||
|
username: str | None,
|
||||||
|
) -> bool:
|
||||||
|
"""Add category to a bounty. Admin only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
bounty_id: Bounty ID
|
||||||
|
category_slug: Category slug to add
|
||||||
|
username: Requesting admin's username
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if newly added, False if already exists
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PermissionError: If not admin
|
||||||
|
ValueError: If bounty or category not found
|
||||||
|
"""
|
||||||
|
if not self.is_admin(room_id, username):
|
||||||
|
raise PermissionError("Only admins can manage bounty categories.")
|
||||||
|
|
||||||
|
bounty = self.get_bounty(room_id, bounty_id)
|
||||||
|
if not bounty:
|
||||||
|
raise ValueError("Bounty not found.")
|
||||||
|
|
||||||
|
self._validate_category_exists(room_id, category_slug)
|
||||||
|
|
||||||
|
if category_slug in bounty.category_ids:
|
||||||
|
return False # Already exists
|
||||||
|
|
||||||
|
bounty.category_ids.append(category_slug)
|
||||||
|
self._storage.update_bounty(room_id, bounty)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def remove_category_from_bounty(
|
||||||
|
self,
|
||||||
|
room_id: int,
|
||||||
|
bounty_id: int,
|
||||||
|
category_slug: str,
|
||||||
|
username: str | None,
|
||||||
|
) -> bool:
|
||||||
|
"""Remove category from a bounty. Admin only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
bounty_id: Bounty ID
|
||||||
|
category_slug: Category slug to remove
|
||||||
|
username: Requesting admin's username
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if removed, False if not found
|
||||||
|
"""
|
||||||
|
if not self.is_admin(room_id, username):
|
||||||
|
raise PermissionError("Only admins can manage bounty categories.")
|
||||||
|
|
||||||
|
bounty = self.get_bounty(room_id, bounty_id)
|
||||||
|
if not bounty:
|
||||||
|
raise ValueError("Bounty not found.")
|
||||||
|
|
||||||
|
if category_slug not in bounty.category_ids:
|
||||||
|
return False
|
||||||
|
|
||||||
|
bounty.category_ids.remove(category_slug)
|
||||||
|
self._storage.update_bounty(room_id, bounty)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def update_bounty_categories(
|
||||||
|
self,
|
||||||
|
room_id: int,
|
||||||
|
bounty_id: int,
|
||||||
|
category_slugs: list[str],
|
||||||
|
username: str | None,
|
||||||
|
) -> bool:
|
||||||
|
"""Replace all categories on a bounty. Admin only.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: Room identifier
|
||||||
|
bounty_id: Bounty ID
|
||||||
|
category_slugs: New list of category slugs
|
||||||
|
username: Requesting admin's username
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if updated
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PermissionError: If not admin
|
||||||
|
ValueError: If bounty or any category not found
|
||||||
|
"""
|
||||||
|
if not self.is_admin(room_id, username):
|
||||||
|
raise PermissionError("Only admins can manage bounty categories.")
|
||||||
|
|
||||||
|
bounty = self.get_bounty(room_id, bounty_id)
|
||||||
|
if not bounty:
|
||||||
|
raise ValueError("Bounty not found.")
|
||||||
|
|
||||||
|
# Validate all categories exist
|
||||||
|
for slug in category_slugs:
|
||||||
|
self._validate_category_exists(room_id, slug)
|
||||||
|
|
||||||
|
bounty.category_ids = category_slugs
|
||||||
|
self._storage.update_bounty(room_id, bounty)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class TrackingService:
|
class TrackingService:
|
||||||
"""Service for tracking bounty operations."""
|
"""Service for tracking bounty operations."""
|
||||||
|
|||||||
@@ -404,3 +404,283 @@ class TestTrackingService:
|
|||||||
assert len(tracked_room2) == 1
|
assert len(tracked_room2) == 1
|
||||||
assert tracked_room1[0].text == "Room 1"
|
assert tracked_room1[0].text == "Room 1"
|
||||||
assert tracked_room2[0].text == "Room 2"
|
assert tracked_room2[0].text == "Room 2"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCategoryService:
|
||||||
|
"""Unit tests for category management."""
|
||||||
|
|
||||||
|
def setup_method(self):
|
||||||
|
"""Set up fresh storage and service for each test."""
|
||||||
|
self.storage = MockRoomStorage()
|
||||||
|
self.service = BountyService(self.storage)
|
||||||
|
self.admin_username = "admin"
|
||||||
|
self._make_admin(-1001, self.admin_username)
|
||||||
|
|
||||||
|
def _make_admin(self, room_id: int, username: str):
|
||||||
|
"""Helper to set up a room with an admin user."""
|
||||||
|
room_data = self.storage.load(room_id)
|
||||||
|
if room_data is None:
|
||||||
|
room_data = RoomData(
|
||||||
|
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
|
||||||
|
)
|
||||||
|
if username not in (room_data.admin_usernames or []):
|
||||||
|
room_data.admin_usernames = room_data.admin_usernames or []
|
||||||
|
room_data.admin_usernames.append(username)
|
||||||
|
self.storage.save(room_data)
|
||||||
|
|
||||||
|
def _add_bounty(self, text="Test bounty"):
|
||||||
|
"""Helper to add a bounty for category tests."""
|
||||||
|
return self.service.add_bounty(
|
||||||
|
room_id=-1001, user_id=123, username=self.admin_username, text=text
|
||||||
|
)
|
||||||
|
|
||||||
|
# Category Management Tests
|
||||||
|
|
||||||
|
def test_add_category_requires_admin(self):
|
||||||
|
"""Test that add_category raises PermissionError for non-admin."""
|
||||||
|
with pytest.raises(PermissionError, match="Only admins can add categories"):
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", "nonadmin")
|
||||||
|
|
||||||
|
def test_add_category_duplicate_slug_fails(self):
|
||||||
|
"""Test that adding a duplicate category slug raises ValueError."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
with pytest.raises(ValueError, match="already exists"):
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report 2", self.admin_username)
|
||||||
|
|
||||||
|
def test_add_category_invalid_slug_fails_uppercase(self):
|
||||||
|
"""Test that uppercase slug raises ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="lowercase alphabetic only"):
|
||||||
|
self.service.add_category(-1001, "Bug", "Bug Report", self.admin_username)
|
||||||
|
|
||||||
|
def test_add_category_invalid_slug_fails_with_numbers(self):
|
||||||
|
"""Test that slug with numbers raises ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="lowercase alphabetic only"):
|
||||||
|
self.service.add_category(-1001, "bug1", "Bug 1", self.admin_username)
|
||||||
|
|
||||||
|
def test_add_category_invalid_slug_fails_with_symbols(self):
|
||||||
|
"""Test that slug with symbols raises ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="lowercase alphabetic only"):
|
||||||
|
self.service.add_category(-1001, "bug-fix", "Bug Fix", self.admin_username)
|
||||||
|
|
||||||
|
def test_add_category_invalid_slug_fails_empty(self):
|
||||||
|
"""Test that empty slug raises ValueError."""
|
||||||
|
with pytest.raises(ValueError, match="lowercase alphabetic only"):
|
||||||
|
self.service.add_category(-1001, "", "Empty", self.admin_username)
|
||||||
|
|
||||||
|
def test_add_category_valid(self):
|
||||||
|
"""Test that valid category can be created."""
|
||||||
|
category = self.service.add_category(
|
||||||
|
-1001, "bug", "Bug Report", self.admin_username
|
||||||
|
)
|
||||||
|
assert category.id == "bug"
|
||||||
|
assert category.name == "Bug Report"
|
||||||
|
assert category.created_at > 0
|
||||||
|
assert category.deleted_at is None
|
||||||
|
|
||||||
|
def test_delete_category_soft_deletes(self):
|
||||||
|
"""Test that delete_category performs soft delete."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
result = self.service.delete_category(-1001, "bug", self.admin_username)
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
# Category should not be found via get_category
|
||||||
|
assert self.service.get_category(-1001, "bug") is None
|
||||||
|
|
||||||
|
# But should still be in raw room data (soft delete)
|
||||||
|
room_data = self.storage.load(-1001)
|
||||||
|
for cat in room_data.categories:
|
||||||
|
if cat.id == "bug":
|
||||||
|
assert cat.deleted_at is not None
|
||||||
|
return
|
||||||
|
assert False, "Category should still exist in storage"
|
||||||
|
|
||||||
|
def test_deleted_category_not_listed(self):
|
||||||
|
"""Test that list_categories excludes soft-deleted categories."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
self.service.delete_category(-1001, "bug", self.admin_username)
|
||||||
|
|
||||||
|
categories = self.service.list_categories(-1001)
|
||||||
|
assert len(categories) == 0
|
||||||
|
assert not any(cat.id == "bug" for cat in categories)
|
||||||
|
|
||||||
|
def test_list_categories_empty(self):
|
||||||
|
"""Test that list_categories returns empty list for room with no categories."""
|
||||||
|
categories = self.service.list_categories(-1001)
|
||||||
|
assert categories == []
|
||||||
|
|
||||||
|
def test_list_categories_returns_active(self):
|
||||||
|
"""Test that list_categories returns only active categories."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
self.service.add_category(-1001, "feature", "Feature Request", self.admin_username)
|
||||||
|
self.service.add_category(-1001, "docs", "Documentation", self.admin_username)
|
||||||
|
self.service.delete_category(-1001, "bug", self.admin_username)
|
||||||
|
|
||||||
|
categories = self.service.list_categories(-1001)
|
||||||
|
assert len(categories) == 2
|
||||||
|
category_ids = [c.id for c in categories]
|
||||||
|
assert "feature" in category_ids
|
||||||
|
assert "docs" in category_ids
|
||||||
|
assert "bug" not in category_ids
|
||||||
|
|
||||||
|
def test_get_category_not_found(self):
|
||||||
|
"""Test that get_category returns None for non-existent category."""
|
||||||
|
category = self.service.get_category(-1001, "nonexistent")
|
||||||
|
assert category is None
|
||||||
|
|
||||||
|
def test_get_category_deleted_returns_none(self):
|
||||||
|
"""Test that get_category returns None for soft-deleted category."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
self.service.delete_category(-1001, "bug", self.admin_username)
|
||||||
|
|
||||||
|
category = self.service.get_category(-1001, "bug")
|
||||||
|
assert category is None
|
||||||
|
|
||||||
|
def test_add_category_requires_admin_non_existent_room(self):
|
||||||
|
"""Test that add_category works for non-existent room (creates it)."""
|
||||||
|
self._make_admin(-9999, self.admin_username)
|
||||||
|
category = self.service.add_category(
|
||||||
|
-9999, "bug", "Bug Report", self.admin_username
|
||||||
|
)
|
||||||
|
assert category.id == "bug"
|
||||||
|
|
||||||
|
# Category-to-Bounty Association Tests
|
||||||
|
|
||||||
|
def test_add_category_to_bounty(self):
|
||||||
|
"""Test adding category to a bounty."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
result = self.service.add_category_to_bounty(
|
||||||
|
-1001, bounty.id, "bug", self.admin_username
|
||||||
|
)
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
updated_bounty = self.service.get_bounty(-1001, bounty.id)
|
||||||
|
assert "bug" in updated_bounty.category_ids
|
||||||
|
|
||||||
|
def test_add_duplicate_category_to_bounty_noop(self):
|
||||||
|
"""Test that adding duplicate category returns False."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
self.service.add_category_to_bounty(-1001, bounty.id, "bug", self.admin_username)
|
||||||
|
result = self.service.add_category_to_bounty(
|
||||||
|
-1001, bounty.id, "bug", self.admin_username
|
||||||
|
)
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_add_category_to_bounty_invalid_bounty(self):
|
||||||
|
"""Test that adding category to non-existent bounty raises ValueError."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Bounty not found"):
|
||||||
|
self.service.add_category_to_bounty(-1001, 999, "bug", self.admin_username)
|
||||||
|
|
||||||
|
def test_add_category_to_bounty_invalid_category(self):
|
||||||
|
"""Test that adding non-existent category raises ValueError."""
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="not found"):
|
||||||
|
self.service.add_category_to_bounty(
|
||||||
|
-1001, bounty.id, "nonexistent", self.admin_username
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_remove_category_from_bounty(self):
|
||||||
|
"""Test removing category from a bounty."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
self.service.add_category_to_bounty(-1001, bounty.id, "bug", self.admin_username)
|
||||||
|
|
||||||
|
result = self.service.remove_category_from_bounty(
|
||||||
|
-1001, bounty.id, "bug", self.admin_username
|
||||||
|
)
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
updated_bounty = self.service.get_bounty(-1001, bounty.id)
|
||||||
|
assert "bug" not in updated_bounty.category_ids
|
||||||
|
|
||||||
|
def test_remove_category_not_on_bounty_returns_false(self):
|
||||||
|
"""Test that removing category not on bounty returns False."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
result = self.service.remove_category_from_bounty(
|
||||||
|
-1001, bounty.id, "bug", self.admin_username
|
||||||
|
)
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_update_bounty_categories_replace_all(self):
|
||||||
|
"""Test that update_bounty_categories replaces all categories."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
self.service.add_category(-1001, "feature", "Feature Request", self.admin_username)
|
||||||
|
self.service.add_category(-1001, "docs", "Documentation", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
# Add initial categories
|
||||||
|
self.service.add_category_to_bounty(-1001, bounty.id, "bug", self.admin_username)
|
||||||
|
|
||||||
|
# Replace with different categories
|
||||||
|
self.service.update_bounty_categories(
|
||||||
|
-1001, bounty.id, ["feature", "docs"], self.admin_username
|
||||||
|
)
|
||||||
|
|
||||||
|
updated_bounty = self.service.get_bounty(-1001, bounty.id)
|
||||||
|
assert updated_bounty.category_ids == ["feature", "docs"]
|
||||||
|
|
||||||
|
def test_update_bounty_categories_clear_all(self):
|
||||||
|
"""Test that update_bounty_categories can clear all categories."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
self.service.add_category_to_bounty(-1001, bounty.id, "bug", self.admin_username)
|
||||||
|
|
||||||
|
self.service.update_bounty_categories(-1001, bounty.id, [], self.admin_username)
|
||||||
|
|
||||||
|
updated_bounty = self.service.get_bounty(-1001, bounty.id)
|
||||||
|
assert updated_bounty.category_ids == []
|
||||||
|
|
||||||
|
def test_update_bounty_categories_validates(self):
|
||||||
|
"""Test that update_bounty_categories validates all slugs."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="not found"):
|
||||||
|
self.service.update_bounty_categories(
|
||||||
|
-1001, bounty.id, ["bug", "nonexistent"], self.admin_username
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_add_category_to_bounty_requires_admin(self):
|
||||||
|
"""Test that adding category to bounty requires admin."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
with pytest.raises(PermissionError, match="Only admins"):
|
||||||
|
self.service.add_category_to_bounty(
|
||||||
|
-1001, bounty.id, "bug", "nonadmin"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_remove_category_from_bounty_requires_admin(self):
|
||||||
|
"""Test that removing category from bounty requires admin."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
self.service.add_category_to_bounty(-1001, bounty.id, "bug", self.admin_username)
|
||||||
|
|
||||||
|
with pytest.raises(PermissionError, match="Only admins"):
|
||||||
|
self.service.remove_category_from_bounty(
|
||||||
|
-1001, bounty.id, "bug", "nonadmin"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_update_bounty_categories_requires_admin(self):
|
||||||
|
"""Test that updating bounty categories requires admin."""
|
||||||
|
bounty = self._add_bounty()
|
||||||
|
|
||||||
|
with pytest.raises(PermissionError, match="Only admins"):
|
||||||
|
self.service.update_bounty_categories(
|
||||||
|
-1001, bounty.id, ["bug"], "nonadmin"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_delete_category_requires_admin(self):
|
||||||
|
"""Test that deleting category requires admin."""
|
||||||
|
self.service.add_category(-1001, "bug", "Bug Report", self.admin_username)
|
||||||
|
|
||||||
|
with pytest.raises(PermissionError, match="Only admins"):
|
||||||
|
self.service.delete_category(-1001, "bug", "nonadmin")
|
||||||
|
|||||||
Reference in New Issue
Block a user