Compare commits

..

1 Commits

Author SHA1 Message Date
shokollm
e937cc85b9 feat: implement /recover command for listing and recovering soft-deleted bounties
- Add recover_bounty method to BountyService (admin-only)
- Add cmd_recover handler for listing and recovering deleted bounties
- Register /recover command in bot.py
- Add /recover to bot command list

/recover - list all recoverable bounties (sorted by deleted_at desc)
/recover <id...> - recover specific bounty(ies)

Output formats:
List: [#1] Deleted bounty | 🗑️ Deleted 2 Apr 2026
Recover:  Recovered bounty #1. or  Bounty #5 not found or not deleted.

Fixes #49
2026-04-04 13:12:13 +00:00
4 changed files with 96 additions and 201 deletions

View File

@@ -14,6 +14,7 @@ from commands import (
cmd_edit, cmd_edit,
cmd_help, cmd_help,
cmd_my, cmd_my,
cmd_recover,
cmd_show, cmd_show,
cmd_start, cmd_start,
cmd_timezone, cmd_timezone,
@@ -47,7 +48,7 @@ def build_app() -> Application:
app.add_handler(CommandHandler("show", cmd_show)) app.add_handler(CommandHandler("show", cmd_show))
app.add_handler(CommandHandler("timezone", cmd_timezone)) app.add_handler(CommandHandler("timezone", cmd_timezone))
app.add_handler(CommandHandler("admin", cmd_admin)) app.add_handler(CommandHandler("admin", cmd_admin))
app.add_handler(CommandHandler("admin", cmd_admin)) app.add_handler(CommandHandler("recover", cmd_recover))
app.add_handler(MessageHandler(filters.COMMAND, cmd_help)) app.add_handler(MessageHandler(filters.COMMAND, cmd_help))
@@ -66,6 +67,7 @@ async def post_init(app: Application) -> None:
("show", "Show bounty details"), ("show", "Show bounty details"),
("timezone", "Get/set room timezone"), ("timezone", "Get/set room timezone"),
("admin", "Manage admins"), ("admin", "Manage admins"),
("recover", "Recover deleted bounties"),
("help", "Show help"), ("help", "Show help"),
] ]
) )

View File

@@ -1,8 +1,6 @@
"""Telegram command handlers for JIGAIDO - Thin wrappers around core services.""" """Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
import time import time
from datetime import datetime, timezone
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from functools import wraps from functools import wraps
from typing import Optional from typing import Optional
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
@@ -22,36 +20,6 @@ TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE)
TELEGRAM_BOT_USERNAME = "your_bot_username" TELEGRAM_BOT_USERNAME = "your_bot_username"
def format_due_date(due_date_ts: int | None, timezone_str: str) -> str:
"""Format due date as human-readable with timezone.
Examples:
No due date: (none shown)
Date only: 4 April 2026
Date + time: 4 April 2026 14:30
With timezone: 4 April 2026 14:30 (Asia/Jakarta)
"""
if not due_date_ts:
return ""
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
dt = datetime.fromtimestamp(due_date_ts, tz=tz)
date_str = dt.strftime("%-d %B %Y")
if dt.hour != 0 or dt.minute != 0:
date_str += f" {dt.strftime('%H:%M')}"
date_str += f" ({timezone_str})"
return date_str
def extract_args(text: str) -> list[str]: def extract_args(text: str) -> list[str]:
if not text: if not text:
return [] return []
@@ -278,8 +246,7 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
due_str = "" due_str = ""
if due_date_ts: if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id) due_str = f" | Due: {time.strftime('%Y-%m-%d', time.localtime(due_date_ts))}"
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
await update.message.reply_text( await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}", f"✅ Bounty added (#{bounty.id}){due_str}",
disable_web_page_preview=True, disable_web_page_preview=True,
@@ -351,11 +318,11 @@ cmd_edit = cmd_update
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text) args = extract_args(update.message.text)
if not args: if not args:
await update.message.reply_text("Usage: /delete <bounty_id> [bounty_id ...]") await update.message.reply_text("Usage: /delete <bounty_id>")
return return
try: try:
bounty_ids = [int(arg) for arg in args] bounty_id = int(args[0])
except ValueError: except ValueError:
await update.message.reply_text("Invalid bounty ID.") await update.message.reply_text("Invalid bounty ID.")
return return
@@ -364,25 +331,81 @@ async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update) room_id = get_room_id(update)
try: try:
results = BOUNTY_SERVICE.delete_bounties( success = BOUNTY_SERVICE.delete_bounty(
room_id=room_id, room_id=room_id,
bounty_ids=bounty_ids, bounty_id=bounty_id,
user_id=user_id, user_id=user_id,
) )
except PermissionError as e: except PermissionError as e:
await update.message.reply_text(f"{e}") await update.message.reply_text(f"{e}")
return return
response_lines = [] if success:
for bounty_id, result in results.items(): await update.message.reply_text(f"✅ Bounty #{bounty_id} deleted.")
if result == "deleted": else:
response_lines.append(f"Bounty #{bounty_id} deleted.") await update.message.reply_text("Bounty not found.")
elif result == "not_found":
response_lines.append(f"⛔ Bounty #{bounty_id} not found.")
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
await update.message.reply_text("\n".join(response_lines))
async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
try:
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can recover bounties.")
return
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
if not args:
deleted_bounties = BOUNTY_SERVICE.list_deleted_bounties(room_id)
if not deleted_bounties:
await update.message.reply_text("No recoverable bounties.")
return
deleted_bounties.sort(key=lambda b: b.deleted_at or 0, reverse=True)
lines = ["Recoverable bounties:"]
for b in deleted_bounties:
deleted_str = time.strftime("%d %b %Y", time.localtime(b.deleted_at))
text_part = (
b.text[:40] + "..."
if b.text and len(b.text) > 40
else (b.text or "(no text)")
)
lines.append(f"[#{b.id}] {text_part} | 🗑️ Deleted {deleted_str}")
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
return
bounty_ids = []
for arg in args:
try:
bounty_ids.append(int(arg))
except ValueError:
await update.message.reply_text(f"Invalid bounty ID: {arg}")
return
results = []
for bounty_id in bounty_ids:
try:
success, message = BOUNTY_SERVICE.recover_bounty(
room_id=room_id,
bounty_id=bounty_id,
user_id=user_id,
)
except PermissionError as e:
results.append(f"{e}")
continue
if success:
results.append(f"{message}")
else:
results.append(f"{message}")
await update.message.reply_text("\n".join(results))
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
@@ -438,101 +461,6 @@ async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Not tracking bounty #{bounty_id}.") await update.message.reply_text("Not tracking bounty #{bounty_id}.")
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text(
"Usage: /admin <add|remove|list> [@username]\n"
"/admin list — list admins\n"
"/admin add @username — add admin\n"
"/admin remove @username — remove admin"
)
return
subcommand = args[0].lower()
username = args[1] if len(args) > 1 else None
requesting_user_id = get_user_id(update)
room_id = get_room_id(update)
if subcommand == "list":
admins = BOUNTY_SERVICE.list_admins(room_id)
if not admins:
await update.message.reply_text("No admins in this room.")
return
admin_mentions = " ".join(f"admin_id:{uid}" for uid in admins)
await update.message.reply_text(f"Admins: {admin_mentions}")
return
if subcommand == "remove":
if not username:
await update.message.reply_text("Usage: /admin remove @username")
return
if not username.startswith("@"):
await update.message.reply_text(
f"{username} is not a valid username (must start with @)."
)
return
target_username = username[1:]
try:
chat = await ctx.bot.get_chat(target_username)
target_user_id = chat.id
except Exception:
await update.message.reply_text(
f"⛔ Could not find user @{target_username}."
)
return
try:
BOUNTY_SERVICE.remove_admin(room_id, target_user_id, requesting_user_id)
await update.message.reply_text(
f"✅ @{target_username} is no longer an admin."
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
except ValueError:
await update.message.reply_text(f"⛔ @{target_username} is not an admin.")
return
if subcommand == "add":
if not username:
await update.message.reply_text("Usage: /admin add @username")
return
if not username.startswith("@"):
await update.message.reply_text(
f"{username} is not a valid username (must start with @)."
)
return
target_username = username[1:]
try:
chat = await ctx.bot.get_chat(target_username)
target_user_id = chat.id
except Exception:
await update.message.reply_text(
f"⛔ Could not find user @{target_username}."
)
return
try:
BOUNTY_SERVICE.add_admin(room_id, target_user_id, requesting_user_id)
await update.message.reply_text(f"✅ @{target_username} is now an admin.")
except PermissionError as e:
await update.message.reply_text(f"{e}")
except ValueError:
await update.message.reply_text(
f"⛔ @{target_username} is already an admin."
)
return
await update.message.reply_text(
"Unknown subcommand. Use: /admin <add|remove|list> [@username]"
)
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update): if is_group(update):
await update.message.reply_text( await update.message.reply_text(

View File

@@ -210,27 +210,32 @@ class BountyService:
self._storage.update_bounty(room_id, bounty) self._storage.update_bounty(room_id, bounty)
return True return True
def delete_bounties( def recover_bounty(
self, room_id: int, bounty_ids: list[int], user_id: int self, room_id: int, bounty_id: int, user_id: int
) -> dict[int, str]: ) -> tuple[bool, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result. """Recover a soft-deleted bounty. Only admins can recover.
Results can be: 'deleted', 'not_found', 'permission_denied' Returns (success, message) tuple.
""" """
results = {}
for bounty_id in bounty_ids:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, user_id): if not self.is_admin(room_id, user_id):
results[bounty_id] = "permission_denied" raise PermissionError("Only admins can recover bounties.")
continue
bounty.deleted_at = int(time.time()) all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
bounty = None
for b in all_bounties:
if b.id == bounty_id:
bounty = b
break
if not bounty:
return False, f"Bounty #{bounty_id} not found."
if bounty.deleted_at is None:
return False, f"Bounty #{bounty_id} is not deleted."
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty) self._storage.update_bounty(room_id, bounty)
results[bounty_id] = "deleted" return True, f"Recovered bounty #{bounty_id}."
return results
class TrackingService: class TrackingService:

View File

@@ -210,46 +210,6 @@ class TestBountyService:
result = self.service.delete_bounty(-1001, 999, self.admin_user_id) result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
assert result is False assert result is False
def test_delete_bounties_multi_id_success(self):
"""Test delete_bounties returns individual results for multiple bounties."""
bounty1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 1"
)
bounty2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 2"
)
results = self.service.delete_bounties(
-1001, [bounty1.id, bounty2.id], self.admin_user_id
)
assert results == {bounty1.id: "deleted", bounty2.id: "deleted"}
# Verify both are soft deleted
assert self.service.get_bounty(-1001, bounty1.id) is None
assert self.service.get_bounty(-1001, bounty2.id) is None
def test_delete_bounties_mixed_results(self):
"""Test delete_bounties returns not_found for non-existent bounties."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
)
results = self.service.delete_bounties(
-1001, [bounty.id, 999, 888], self.admin_user_id
)
assert results == {bounty.id: "deleted", 999: "not_found", 888: "not_found"}
def test_delete_bounties_permission_denied(self):
"""Test delete_bounties returns permission_denied for non-admin users."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
)
results = self.service.delete_bounties(
-1001,
[bounty.id],
999, # non-admin user
)
assert results == {bounty.id: "permission_denied"}
# Verify bounty was NOT deleted
assert self.service.get_bounty(-1001, bounty.id) is not None
class TestTrackingService: class TestTrackingService:
"""Unit tests for TrackingService.""" """Unit tests for TrackingService."""