Compare commits

..

1 Commits

Author SHA1 Message Date
shokollm
2fe73cf2b2 Revert "Merge pull request 'feat: human-readable date format with timezone awareness' (#68) from fix/issue-54 into main"
This reverts commit 922858a81a, reversing
changes made to 015df15bd5.
2026-04-04 07:23:50 +00:00
5 changed files with 92 additions and 493 deletions

View File

@@ -8,14 +8,11 @@ from telegram.ext import Application, CommandHandler, MessageHandler, filters
from commands import ( from commands import (
cmd_add, cmd_add,
cmd_admin,
cmd_bounty, cmd_bounty,
cmd_delete, cmd_delete,
cmd_edit, cmd_edit,
cmd_help, cmd_help,
cmd_my, cmd_my,
cmd_recover,
cmd_show,
cmd_start, cmd_start,
cmd_timezone, cmd_timezone,
cmd_track, cmd_track,
@@ -45,10 +42,7 @@ def build_app() -> Application:
app.add_handler(CommandHandler("delete", cmd_delete)) app.add_handler(CommandHandler("delete", cmd_delete))
app.add_handler(CommandHandler("track", cmd_track)) app.add_handler(CommandHandler("track", cmd_track))
app.add_handler(CommandHandler("untrack", cmd_untrack)) app.add_handler(CommandHandler("untrack", cmd_untrack))
app.add_handler(CommandHandler("show", cmd_show))
app.add_handler(CommandHandler("timezone", cmd_timezone)) app.add_handler(CommandHandler("timezone", cmd_timezone))
app.add_handler(CommandHandler("admin", cmd_admin))
app.add_handler(CommandHandler("recover", cmd_recover))
app.add_handler(MessageHandler(filters.COMMAND, cmd_help)) app.add_handler(MessageHandler(filters.COMMAND, cmd_help))
@@ -64,10 +58,7 @@ async def post_init(app: Application) -> None:
("edit", "Edit a bounty"), ("edit", "Edit a bounty"),
("track", "Track a bounty"), ("track", "Track a bounty"),
("untrack", "Stop tracking"), ("untrack", "Stop tracking"),
("show", "Show bounty details"),
("timezone", "Get/set room timezone"), ("timezone", "Get/set room timezone"),
("admin", "Manage admins"),
("recover", "Recover deleted bounties"),
("help", "Show help"), ("help", "Show help"),
] ]
) )

View File

@@ -1,7 +1,6 @@
"""Telegram command handlers for JIGAIDO - Thin wrappers around core services.""" """Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
import time import time
from datetime import datetime, timezone
from functools import wraps from functools import wraps
from typing import Optional from typing import Optional
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
@@ -21,34 +20,6 @@ TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE)
TELEGRAM_BOT_USERNAME = "your_bot_username" TELEGRAM_BOT_USERNAME = "your_bot_username"
def format_due_date(due_date_ts: int | None, timezone_str: str) -> str:
"""Format due date as human-readable with timezone.
Examples:
No due date: (none shown)
Date only: 4 April 2026
Date + time: 4 April 2026 14:30
With timezone: 4 April 2026 14:30 (Asia/Jakarta)
"""
if not due_date_ts:
return ""
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
dt = datetime.fromtimestamp(due_date_ts, tz=tz)
date_str = dt.strftime("%-d %B %Y")
if dt.hour != 0 or dt.minute != 0:
date_str += f" {dt.strftime('%H:%M')}"
date_str += f" ({timezone_str})"
return date_str
def extract_args(text: str) -> list[str]: def extract_args(text: str) -> list[str]:
if not text: if not text:
return [] return []
@@ -56,84 +27,47 @@ def extract_args(text: str) -> list[str]:
return tokens[1:] if len(tokens) > 1 else [] return tokens[1:] if len(tokens) > 1 else []
def parse_args( def parse_args(args: list[str]) -> tuple[Optional[str], Optional[str], Optional[int]]:
args: list[str],
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool]:
text = None text = None
link = None link = None
due_date_ts = None due_date_ts = None
clear_link = False
clear_date = False
i = 0 remaining = []
while i < len(args): for arg in args:
arg = args[i] if not link and (arg.startswith("http://") or arg.startswith("https://")):
if arg == "-link":
if i + 1 < len(args) and (
args[i + 1].startswith("http://") or args[i + 1].startswith("https://")
):
link = args[i + 1]
i += 2
else:
clear_link = True
i += 1
elif arg == "-date":
if i + 1 < len(args):
parsed = dateparser.parse(args[i + 1])
if parsed:
due_date_ts = int(parsed.timestamp())
i += 2
else:
clear_date = True
i += 1
else:
clear_date = True
i += 1
elif not link and (arg.startswith("http://") or arg.startswith("https://")):
link = arg link = arg
i += 1
elif due_date_ts is None: elif due_date_ts is None:
parsed = dateparser.parse(arg) parsed = dateparser.parse(arg)
if parsed: if parsed:
due_date_ts = int(parsed.timestamp()) due_date_ts = int(parsed.timestamp())
i += 1
else: else:
i += 1 remaining.append(arg)
if text is None:
text = arg
else: else:
text = text + " " + arg remaining.append(arg)
else:
i += 1
if text is None:
text = arg
else:
text = text + " " + arg
return text, link, due_date_ts, clear_link, clear_date text = " ".join(remaining) if remaining else None
return text, link, due_date_ts
def format_bounty(b, show_id: bool = True, slice_length: int = 0) -> str: def format_bounty(b, show_id: bool = True) -> str:
parts = [] parts = []
if show_id: if show_id:
parts.append(f"[#{b.id}]") parts.append(f"[#{b.id}]")
if b.text: if b.text:
text = b.text parts.append(b.text)
if slice_length > 0 and len(text) > slice_length:
text = text[:slice_length] + "..."
parts.append(text)
if b.link: if b.link:
parts.append(f"🔗 {b.link}") parts.append(f"🔗 {b.link}")
if b.due_date_ts: if b.due_date_ts:
due_str = time.strftime("%d %b %Y", time.localtime(b.due_date_ts)) due_str = time.strftime("%Y-%m-%d", time.localtime(b.due_date_ts))
days_left = (b.due_date_ts - int(time.time())) // 86400 days_left = (b.due_date_ts - int(time.time())) // 86400
if days_left < 0: if days_left < 0:
parts.append(f"{due_str} (OVERDUE)") parts.append(f"{due_str} (OVERDUE)")
elif days_left == 0: elif days_left == 0:
parts.append(f"Today (OVERDUE)") parts.append(f"{due_str} (TODAY)")
else: else:
parts.append(f"{due_str} ({days_left}d)") parts.append(f"{due_str} ({days_left}d)")
if b.created_by_user_id:
parts.append(f"by {b.created_by_user_id}")
return " | ".join(parts) return " | ".join(parts)
@@ -162,58 +96,13 @@ def get_room_id(update: Update) -> int:
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update) room_id = get_room_id(update)
args = extract_args(update.message.text) bounties = BOUNTY_SERVICE.list_bounties(room_id)
show_all = "all" in args if not bounties:
args = [a for a in args if a != "all"]
try:
limit = int(args[0]) if args else 5
except (ValueError, IndexError):
limit = 5
now = int(time.time())
cutoff_24h = now - 86400
all_bounties = BOUNTY_SERVICE.list_bounties(room_id)
def is_expired(b) -> bool:
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
def sort_key(b):
if b.due_date_ts is not None:
return (0, b.due_date_ts)
return (1, b.created_at)
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
filtered_bounties.sort(key=sort_key)
total_count = len(filtered_bounties)
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
if show_all:
await update.message.reply_text("No bounties yet.") await update.message.reply_text("No bounties yet.")
else:
await update.message.reply_text(
"No active bounties. Use /bounty all to show expired."
)
return return
lines = [] lines = [format_bounty(b, show_id=True) for b in bounties]
if limit < total_count:
lines.append(f"Showing {limit} of {total_count} bounties:")
slice_length = 40
elif show_all and total_count > limit:
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
slice_length = 40
else:
lines.append(f"Showing {total_count} bounties:")
slice_length = 0
for b in displayed_bounties:
lines.append(format_bounty(b, show_id=True, slice_length=slice_length))
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True) await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
@@ -223,7 +112,6 @@ async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update): if is_group(update):
group_id = get_group_id(update) group_id = get_group_id(update)
bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id) bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id)
room_id = group_id
else: else:
room_id = get_room_id(update) room_id = get_room_id(update)
bounties = BOUNTY_SERVICE.list_bounties(room_id) bounties = BOUNTY_SERVICE.list_bounties(room_id)
@@ -250,7 +138,7 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
) )
return return
text, link, due_date_ts, _, _ = parse_args(args) text, link, due_date_ts = parse_args(args)
if not text and not link: if not text and not link:
await update.message.reply_text("A bounty needs at least text or a link.") await update.message.reply_text("A bounty needs at least text or a link.")
return return
@@ -258,7 +146,6 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update) user_id = get_user_id(update)
room_id = get_room_id(update) room_id = get_room_id(update)
try:
bounty = BOUNTY_SERVICE.add_bounty( bounty = BOUNTY_SERVICE.add_bounty(
room_id=room_id, room_id=room_id,
user_id=user_id, user_id=user_id,
@@ -266,17 +153,11 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
link=link, link=link,
due_date_ts=due_date_ts, due_date_ts=due_date_ts,
) )
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
except ValueError as e:
await update.message.reply_text(f"{e}")
return
due_str = "" due_str = ""
if due_date_ts: if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id) due_str = f" | Due: {time.strftime('%Y-%m-%d', time.localtime(due_date_ts))}"
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
await update.message.reply_text( await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}", f"✅ Bounty added (#{bounty.id}){due_str}",
disable_web_page_preview=True, disable_web_page_preview=True,
@@ -287,14 +168,7 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text) args = extract_args(update.message.text)
if len(args) < 1: if len(args) < 1:
await update.message.reply_text( await update.message.reply_text(
"Usage: /update <bounty_id> [text] [link] [due_date]\n" "Usage: /update <bounty_id> [text] [link] [due_date]"
" /update <bounty_id> -link [<url>] - clear or set link\n"
" /update <bounty_id> -date [<date>] - clear or set date\n"
"Examples:\n"
" /update 1 new text - update text only\n"
" /update 1 -link - clear link\n"
" /update 1 -link https://... - set link\n"
" /update 1 -link -date - clear both link and date"
) )
return return
@@ -304,14 +178,8 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Invalid bounty ID.") await update.message.reply_text("Invalid bounty ID.")
return return
text, link, due_date_ts, clear_link, clear_date = parse_args(args[1:]) text, link, due_date_ts = parse_args(args[1:])
if ( if not text and not link and due_date_ts is None:
not text
and not link
and due_date_ts is None
and not clear_link
and not clear_date
):
await update.message.reply_text("Nothing to update.") await update.message.reply_text("Nothing to update.")
return return
@@ -326,15 +194,10 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
text=text, text=text,
link=link, link=link,
due_date_ts=due_date_ts, due_date_ts=due_date_ts,
clear_link=clear_link,
clear_due=clear_date,
) )
except PermissionError as e: except PermissionError as e:
await update.message.reply_text(f"{e}") await update.message.reply_text(f"{e}")
return return
except ValueError as e:
await update.message.reply_text(f"{e}")
return
if success: if success:
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.") await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
@@ -354,32 +217,28 @@ async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try: try:
bounty_ids = [int(arg) for arg in args] bounty_ids = [int(arg) for arg in args]
except ValueError: except ValueError:
await update.message.reply_text("Invalid bounty ID.") await update.message.reply_text("Invalid bounty ID(s).")
return return
user_id = get_user_id(update) user_id = get_user_id(update)
room_id = get_room_id(update) room_id = get_room_id(update)
try:
results = BOUNTY_SERVICE.delete_bounties( results = BOUNTY_SERVICE.delete_bounties(
room_id=room_id, room_id=room_id,
bounty_ids=bounty_ids, bounty_ids=bounty_ids,
user_id=user_id, user_id=user_id,
) )
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
response_lines = [] lines = []
for bounty_id, result in results.items(): for bounty_id, result in results.items():
if result == "deleted": if result == "deleted":
response_lines.append(f"✅ Bounty #{bounty_id} deleted.") lines.append(f"✅ Bounty #{bounty_id} deleted.")
elif result == "not_found": elif result == "not_found":
response_lines.append(f"⛔ Bounty #{bounty_id} not found.") lines.append(f"⛔ Bounty #{bounty_id} not found.")
elif result == "permission_denied": elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.") lines.append(f"⛔ Bounty #{bounty_id} - only admins can delete.")
await update.message.reply_text("\n".join(response_lines)) await update.message.reply_text("\n".join(lines))
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
@@ -455,46 +314,21 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
) )
async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text) await update.message.reply_text(
if not args: "👻 JIGAIDO Commands:\n\n"
await update.message.reply_text("Usage: /show <bounty_id>") "/bounty — list all bounties\n"
return "/my — bounties you're tracking\n"
"/add <text> [link] [due] — add bounty\n"
try: "/update <id> [text> [link] [due] — update bounty\n"
bounty_id = int(args[0]) "/delete <id> — delete bounty\n"
except ValueError: "/track <id> — track a bounty (groups only)\n"
await update.message.reply_text("Invalid bounty ID.") "/untrack <id> — stop tracking (groups only)\n"
return "/timezone [tz] — get/set room timezone (admin only)\n"
"/start — re-initialize\n"
room_id = get_room_id(update) "/help — this message",
bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id) disable_web_page_preview=True,
)
if not bounty:
await update.message.reply_text("Bounty not found.")
return
timezone = BOUNTY_SERVICE.get_timezone(room_id)
lines = []
title = bounty.text or "(no text)"
lines.append(f"[#{bounty.id}] {title}")
if bounty.link:
lines.append(f"🔗 {bounty.link}")
if bounty.due_date_ts:
due_str = time.strftime("%d %B %Y %H:%M", time.localtime(bounty.due_date_ts))
lines.append(f"📅 {due_str} ({timezone})")
username = bounty.created_by_username or f"user#{bounty.created_by_user_id}"
lines.append(f"👤 @{username}")
created_str = time.strftime("%Y-%m-%d %H:%M", time.localtime(bounty.created_at))
lines.append(f"📌 Created: {created_str}")
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
@@ -524,173 +358,3 @@ async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
return return
await update.message.reply_text(f"✅ Timezone set to {timezone_str}.") await update.message.reply_text(f"✅ Timezone set to {timezone_str}.")
async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
return
if not args:
deleted_bounties = BOUNTY_SERVICE.list_deleted_bounties(room_id)
if not deleted_bounties:
await update.message.reply_text("No recoverable bounties.")
return
deleted_bounties.sort(key=lambda b: b.deleted_at or 0, reverse=True)
lines = ["Recoverable bounties:"]
for b in deleted_bounties[:10]:
text = (
b.text[:40] + "..."
if b.text and len(b.text) > 40
else (b.text or "(no text)")
)
link_str = f" | 🔗 {b.link}" if b.link else ""
deleted_str = (
time.strftime("%d %b %Y", time.localtime(b.deleted_at))
if b.deleted_at
else "unknown"
)
lines.append(f"[#{b.id}] {text}{link_str} | 🗑️ Deleted {deleted_str}")
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
return
try:
bounty_ids = [int(arg) for arg in args]
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, user_id)
response_lines = []
for bounty_id, result in results.items():
if result == "recovered":
response_lines.append(f"✅ Recovered bounty #{bounty_id}.")
elif result == "not_found":
response_lines.append(f"⛔ Bounty #{bounty_id} not found.")
elif result == "not_deleted":
response_lines.append(f"⛔ Bounty #{bounty_id} is not deleted.")
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
await update.message.reply_text("\n".join(response_lines))
def _find_user_id_by_username(room_id: int, username: str) -> int | None:
"""Find user_id by username from bounty creators in the room."""
bounties = BOUNTY_SERVICE.list_bounties(room_id)
for bounty in bounties:
if (
bounty.created_by_username
and bounty.created_by_username.lower() == username.lower()
):
return bounty.created_by_user_id
return None
def _find_username_by_user_id(room_id: int, user_id: int) -> str | None:
"""Find username by user_id from bounty creators in the room."""
bounties = BOUNTY_SERVICE.list_bounties(room_id)
for bounty in bounties:
if bounty.created_by_user_id == user_id:
return bounty.created_by_username
return None
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
admins = BOUNTY_SERVICE.list_admins(get_room_id(update))
if not admins:
await update.message.reply_text("No admins in this room.")
return
admin_mentions = []
for admin_id in admins:
username = _find_username_by_user_id(get_room_id(update), admin_id)
if username:
admin_mentions.append(f"@{username}")
else:
admin_mentions.append(f"user#{admin_id}")
await update.message.reply_text(
f"Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions)
)
return
if args[0] not in ("add", "remove"):
await update.message.reply_text(
"Usage:\n"
"/admin — list admins\n"
"/admin add @username — add admin\n"
"/admin remove @username — remove admin"
)
return
subcommand = args[0]
if len(args) < 2:
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
raw_username = args[1]
if not raw_username.startswith("@"):
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
username = raw_username[1:]
user_id = get_user_id(update)
room_id = get_room_id(update)
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
return
target_user_id = _find_user_id_by_username(room_id, username)
if target_user_id is None:
await update.message.reply_text(f"⛔ User @{username} not found.")
return
try:
if subcommand == "add":
BOUNTY_SERVICE.add_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is now an admin.")
elif subcommand == "remove":
BOUNTY_SERVICE.remove_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is no longer an admin.")
except PermissionError as e:
await update.message.reply_text(f"{e}")
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"👻 JIGAIDO Commands:\n\n"
"/bounty — list all bounties\n"
"/my — bounties you're tracking\n"
"/add <text> [link] [due] — add bounty (admin only)\n"
"/update <id> [text> [link] [due] — update bounty (admin only)\n"
"/edit <id> [text> [link] [due] — edit bounty (same as update)\n"
" /edit <id> -link [<url>] — clear or set link\n"
" /edit <id> -date [<date>] — clear or set date\n"
"/delete <id> [<id>...] — delete bounty (admin only)\n"
"/track <id> — track a bounty (groups only)\n"
"/untrack <id> — stop tracking (groups only)\n"
"/show <id> — show bounty details\n"
"/admin — list admins\n"
"/admin add @username — add admin (admin only)\n"
"/admin remove @username — remove admin (admin only)\n"
"/timezone — get room timezone\n"
"/timezone <tz> — set room timezone (admin only)\n"
"/recover — list recoverable bounties (admin only)\n"
"/recover <id> [<id>...] — recover bounty (admin only)\n"
"/start — re-initialize\n"
"/help — this message",
disable_web_page_preview=True,
)

View File

@@ -1,28 +0,0 @@
#!/usr/bin/env python3
import asyncio
import os
import sys
# Run from the telegram-bot directory so local imports work
os.chdir("/home/shoko/repositories/jigaido/apps/telegram-bot")
sys.path.insert(0, "/home/shoko/repositories/jigaido")
# Import main from the local bot module
import bot as bot_module
if __name__ == "__main__":
if not bot_module.BOT_TOKEN:
bot_module.log.error("JIGAIDO_BOT_TOKEN environment variable not set.")
sys.exit(1)
app = bot_module.build_app()
app.post_init = bot_module.post_init
bot_module.log.info("JIGAIDO starting...")
# PTB v20+ app.run_polling() is async - use asyncio.get_event_loop() + run_until_complete
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(app.run_polling(drop_pending_updates=True))
finally:
loop.close()

View File

@@ -153,44 +153,6 @@ class BountyService:
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True) all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
return [b for b in all_bounties if b.deleted_at is not None] return [b for b in all_bounties if b.deleted_at is not None]
def get_deleted_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific soft-deleted bounty by ID."""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
for b in all_bounties:
if b.id == bounty_id and b.deleted_at is not None:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, user_id: int) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, user_id):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
if not bounty:
return "not_found"
if bounty.deleted_at is None:
return "not_deleted"
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty)
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
Returns dict of bounty_id -> result ('recovered', 'not_found', 'not_deleted', 'permission_denied')
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, user_id)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None: def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
"""Get a specific bounty by ID. Excludes soft-deleted bounties.""" """Get a specific bounty by ID. Excludes soft-deleted bounties."""
bounty = self._storage.get_bounty(room_id, bounty_id) bounty = self._storage.get_bounty(room_id, bounty_id)
@@ -251,9 +213,12 @@ class BountyService:
def delete_bounties( def delete_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int self, room_id: int, bounty_ids: list[int], user_id: int
) -> dict[int, str]: ) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result. """Soft delete multiple bounties. Only admins can delete.
Results can be: 'deleted', 'not_found', 'permission_denied' Returns a dict mapping bounty_id to result:
- "deleted": Successfully soft-deleted
- "not_found": Bounty does not exist
- "permission_denied": User is not admin
""" """
results = {} results = {}
for bounty_id in bounty_ids: for bounty_id in bounty_ids:

View File

@@ -210,45 +210,52 @@ class TestBountyService:
result = self.service.delete_bounty(-1001, 999, self.admin_user_id) result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
assert result is False assert result is False
def test_delete_bounties_multi_id_success(self): def test_delete_bounties_multiple_success(self):
"""Test delete_bounties returns individual results for multiple bounties.""" """Test delete_bounties soft deletes multiple bounties."""
bounty1 = self.service.add_bounty( b1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 1" room_id=-1001, user_id=self.admin_user_id, text="First"
) )
bounty2 = self.service.add_bounty( b2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 2" room_id=-1001, user_id=self.admin_user_id, text="Second"
)
b3 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Third"
) )
results = self.service.delete_bounties( results = self.service.delete_bounties(
-1001, [bounty1.id, bounty2.id], self.admin_user_id -1001, [b1.id, b2.id, b3.id], self.admin_user_id
) )
assert results == {bounty1.id: "deleted", bounty2.id: "deleted"} assert results == {b1.id: "deleted", b2.id: "deleted", b3.id: "deleted"}
# Verify both are soft deleted assert self.service.get_bounty(-1001, b1.id) is None
assert self.service.get_bounty(-1001, bounty1.id) is None assert self.service.get_bounty(-1001, b2.id) is None
assert self.service.get_bounty(-1001, bounty2.id) is None assert self.service.get_bounty(-1001, b3.id) is None
def test_delete_bounties_mixed_results(self): def test_delete_bounties_mixed_results(self):
"""Test delete_bounties returns not_found for non-existent bounties.""" """Test delete_bounties returns individual results per ID."""
bounty = self.service.add_bounty( b1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete" room_id=-1001, user_id=self.admin_user_id, text="Exists"
) )
results = self.service.delete_bounties( results = self.service.delete_bounties(
-1001, [bounty.id, 999, 888], self.admin_user_id -1001, [b1.id, 999, 888], self.admin_user_id
) )
assert results == {bounty.id: "deleted", 999: "not_found", 888: "not_found"} assert results == {b1.id: "deleted", 999: "not_found", 888: "not_found"}
def test_delete_bounties_permission_denied(self): def test_delete_bounties_permission_denied(self):
"""Test delete_bounties returns permission_denied for non-admin users.""" """Test delete_bounties returns permission_denied for non-admin."""
bounty = self.service.add_bounty( b1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete" room_id=-1001, user_id=self.admin_user_id, text="First"
)
b2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
) )
results = self.service.delete_bounties( results = self.service.delete_bounties(
-1001, -1001,
[bounty.id], [b1.id, b2.id],
999, # non-admin user 999, # non-admin user
) )
assert results == {bounty.id: "permission_denied"} assert results == {b1.id: "permission_denied", b2.id: "permission_denied"}
# Verify bounty was NOT deleted # Bounties should not be deleted
assert self.service.get_bounty(-1001, bounty.id) is not None assert self.service.get_bounty(-1001, b1.id) is not None
assert self.service.get_bounty(-1001, b2.id) is not None
class TestTrackingService: class TestTrackingService: