Compare commits

..

1 Commits

Author SHA1 Message Date
shokollm
6312d94c0b feat: implement /recover command for listing and recovering soft-deleted bounties
- Add recover_bounty method to BountyService for recovering soft-deleted bounties
- Add cmd_recover function to CLI with list and recover modes
- List mode: jigaido-cli recover --group-id <id>
- Recover mode: jigaido-cli recover <id>... --group-id <id>
- Admin-only for recover, everyone for list
- Fixes #49
2026-04-04 07:00:17 +00:00
4 changed files with 93 additions and 306 deletions

View File

@@ -8,15 +8,12 @@ from telegram.ext import Application, CommandHandler, MessageHandler, filters
from commands import (
cmd_add,
cmd_admin,
cmd_bounty,
cmd_delete,
cmd_edit,
cmd_help,
cmd_my,
cmd_show,
cmd_start,
cmd_timezone,
cmd_track,
cmd_untrack,
cmd_update,
@@ -44,9 +41,6 @@ def build_app() -> Application:
app.add_handler(CommandHandler("delete", cmd_delete))
app.add_handler(CommandHandler("track", cmd_track))
app.add_handler(CommandHandler("untrack", cmd_untrack))
app.add_handler(CommandHandler("show", cmd_show))
app.add_handler(CommandHandler("timezone", cmd_timezone))
app.add_handler(CommandHandler("admin", cmd_admin))
app.add_handler(MessageHandler(filters.COMMAND, cmd_help))
@@ -62,9 +56,6 @@ async def post_init(app: Application) -> None:
("edit", "Edit a bounty"),
("track", "Track a bounty"),
("untrack", "Stop tracking"),
("show", "Show bounty details"),
("timezone", "Get/set room timezone"),
("admin", "Manage admins"),
("help", "Show help"),
]
)

View File

@@ -1,11 +1,8 @@
"""Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
import time
from datetime import datetime, timezone
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from functools import wraps
from typing import Optional
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import dateparser
from telegram import Update
@@ -22,36 +19,6 @@ TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE)
TELEGRAM_BOT_USERNAME = "your_bot_username"
def format_due_date(due_date_ts: int | None, timezone_str: str) -> str:
"""Format due date as human-readable with timezone.
Examples:
No due date: (none shown)
Date only: 4 April 2026
Date + time: 4 April 2026 14:30
With timezone: 4 April 2026 14:30 (Asia/Jakarta)
"""
if not due_date_ts:
return ""
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
dt = datetime.fromtimestamp(due_date_ts, tz=tz)
date_str = dt.strftime("%-d %B %Y")
if dt.hour != 0 or dt.minute != 0:
date_str += f" {dt.strftime('%H:%M')}"
date_str += f" ({timezone_str})"
return date_str
def extract_args(text: str) -> list[str]:
if not text:
return []
@@ -59,84 +26,47 @@ def extract_args(text: str) -> list[str]:
return tokens[1:] if len(tokens) > 1 else []
def parse_args(
args: list[str],
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool]:
def parse_args(args: list[str]) -> tuple[Optional[str], Optional[str], Optional[int]]:
text = None
link = None
due_date_ts = None
clear_link = False
clear_date = False
i = 0
while i < len(args):
arg = args[i]
if arg == "-link":
if i + 1 < len(args) and (
args[i + 1].startswith("http://") or args[i + 1].startswith("https://")
):
link = args[i + 1]
i += 2
else:
clear_link = True
i += 1
elif arg == "-date":
if i + 1 < len(args):
parsed = dateparser.parse(args[i + 1])
if parsed:
due_date_ts = int(parsed.timestamp())
i += 2
else:
clear_date = True
i += 1
else:
clear_date = True
i += 1
elif not link and (arg.startswith("http://") or arg.startswith("https://")):
remaining = []
for arg in args:
if not link and (arg.startswith("http://") or arg.startswith("https://")):
link = arg
i += 1
elif due_date_ts is None:
parsed = dateparser.parse(arg)
if parsed:
due_date_ts = int(parsed.timestamp())
i += 1
else:
i += 1
if text is None:
text = arg
remaining.append(arg)
else:
text = text + " " + arg
else:
i += 1
if text is None:
text = arg
else:
text = text + " " + arg
remaining.append(arg)
return text, link, due_date_ts, clear_link, clear_date
text = " ".join(remaining) if remaining else None
return text, link, due_date_ts
def format_bounty(b, show_id: bool = True, slice_length: int = 0) -> str:
def format_bounty(b, show_id: bool = True) -> str:
parts = []
if show_id:
parts.append(f"[#{b.id}]")
if b.text:
text = b.text
if slice_length > 0 and len(text) > slice_length:
text = text[:slice_length] + "..."
parts.append(text)
parts.append(b.text)
if b.link:
parts.append(f"🔗 {b.link}")
if b.due_date_ts:
due_str = time.strftime("%d %b %Y", time.localtime(b.due_date_ts))
due_str = time.strftime("%Y-%m-%d", time.localtime(b.due_date_ts))
days_left = (b.due_date_ts - int(time.time())) // 86400
if days_left < 0:
parts.append(f"{due_str} (OVERDUE)")
elif days_left == 0:
parts.append(f"Today (OVERDUE)")
parts.append(f"{due_str} (TODAY)")
else:
parts.append(f"{due_str} ({days_left}d)")
if b.created_by_user_id:
parts.append(f"by {b.created_by_user_id}")
return " | ".join(parts)
@@ -165,58 +95,13 @@ def get_room_id(update: Update) -> int:
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update)
args = extract_args(update.message.text)
bounties = BOUNTY_SERVICE.list_bounties(room_id)
show_all = "all" in args
args = [a for a in args if a != "all"]
try:
limit = int(args[0]) if args else 5
except (ValueError, IndexError):
limit = 5
now = int(time.time())
cutoff_24h = now - 86400
all_bounties = BOUNTY_SERVICE.list_bounties(room_id)
def is_expired(b) -> bool:
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
def sort_key(b):
if b.due_date_ts is not None:
return (0, b.due_date_ts)
return (1, b.created_at)
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
filtered_bounties.sort(key=sort_key)
total_count = len(filtered_bounties)
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
if show_all:
if not bounties:
await update.message.reply_text("No bounties yet.")
else:
await update.message.reply_text(
"No active bounties. Use /bounty all to show expired."
)
return
lines = []
if limit < total_count:
lines.append(f"Showing {limit} of {total_count} bounties:")
slice_length = 40
elif show_all and total_count > limit:
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
slice_length = 40
else:
lines.append(f"Showing {total_count} bounties:")
slice_length = 0
for b in displayed_bounties:
lines.append(format_bounty(b, show_id=True, slice_length=slice_length))
lines = [format_bounty(b, show_id=True) for b in bounties]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
@@ -226,7 +111,6 @@ async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update):
group_id = get_group_id(update)
bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id)
room_id = group_id
else:
room_id = get_room_id(update)
bounties = BOUNTY_SERVICE.list_bounties(room_id)
@@ -253,7 +137,7 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
)
return
text, link, due_date_ts, _, _ = parse_args(args)
text, link, due_date_ts = parse_args(args)
if not text and not link:
await update.message.reply_text("A bounty needs at least text or a link.")
return
@@ -271,8 +155,8 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
due_str = ""
if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
due_str = f" | Due: {time.strftime('%Y-%m-%d', time.localtime(due_date_ts))}"
await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}",
disable_web_page_preview=True,
@@ -283,14 +167,7 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
"Usage: /update <bounty_id> [text] [link] [due_date]\n"
" /update <bounty_id> -link [<url>] - clear or set link\n"
" /update <bounty_id> -date [<date>] - clear or set date\n"
"Examples:\n"
" /update 1 new text - update text only\n"
" /update 1 -link - clear link\n"
" /update 1 -link https://... - set link\n"
" /update 1 -link -date - clear both link and date"
"Usage: /update <bounty_id> [text] [link] [due_date]"
)
return
@@ -300,14 +177,8 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Invalid bounty ID.")
return
text, link, due_date_ts, clear_link, clear_date = parse_args(args[1:])
if (
not text
and not link
and due_date_ts is None
and not clear_link
and not clear_date
):
text, link, due_date_ts = parse_args(args[1:])
if not text and not link and due_date_ts is None:
await update.message.reply_text("Nothing to update.")
return
@@ -322,15 +193,10 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
text=text,
link=link,
due_date_ts=due_date_ts,
clear_link=clear_link,
clear_due=clear_date,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
except ValueError as e:
await update.message.reply_text(f"{e}")
return
if success:
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
@@ -445,152 +311,16 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
)
async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /show <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
room_id = get_room_id(update)
bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
if not bounty:
await update.message.reply_text("Bounty not found.")
return
timezone = BOUNTY_SERVICE.get_timezone(room_id)
lines = []
title = bounty.text or "(no text)"
lines.append(f"[#{bounty.id}] {title}")
if bounty.link:
lines.append(f"🔗 {bounty.link}")
if bounty.due_date_ts:
due_str = time.strftime("%d %B %Y %H:%M", time.localtime(bounty.due_date_ts))
lines.append(f"📅 {due_str} ({timezone})")
username = bounty.created_by_username or f"user#{bounty.created_by_user_id}"
lines.append(f"👤 @{username}")
created_str = time.strftime("%Y-%m-%d %H:%M", time.localtime(bounty.created_at))
lines.append(f"📌 Created: {created_str}")
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
if not args:
current_tz = BOUNTY_SERVICE.get_timezone(room_id)
await update.message.reply_text(f"Current timezone: {current_tz}")
return
timezone_str = args[0]
try:
ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
await update.message.reply_text(
"⛔ Invalid timezone. Use IANA format (e.g., Asia/Jakarta)"
)
return
try:
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, user_id)
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
await update.message.reply_text(f"✅ Timezone set to {timezone_str}.")
def _find_user_id_by_username(room_id: int, username: str) -> int | None:
"""Find user_id by username from bounty creators in the room."""
bounties = BOUNTY_SERVICE.list_bounties(room_id)
for bounty in bounties:
if (
bounty.created_by_username
and bounty.created_by_username.lower() == username.lower()
):
return bounty.created_by_user_id
return None
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args or args[0] not in ("add", "remove"):
await update.message.reply_text(
"Usage:\n"
"/admin add @username — add admin\n"
"/admin remove @username — remove admin"
)
return
subcommand = args[0]
if len(args) < 2:
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
raw_username = args[1]
if not raw_username.startswith("@"):
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
username = raw_username[1:]
user_id = get_user_id(update)
room_id = get_room_id(update)
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
return
target_user_id = _find_user_id_by_username(room_id, username)
if target_user_id is None:
await update.message.reply_text(f"⛔ User @{username} not found.")
return
try:
if subcommand == "add":
BOUNTY_SERVICE.add_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is now an admin.")
elif subcommand == "remove":
BOUNTY_SERVICE.remove_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is no longer an admin.")
except PermissionError as e:
await update.message.reply_text(f"{e}")
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"👻 JIGAIDO Commands:\n\n"
"/bounty — list all bounties\n"
"/my — bounties you're tracking\n"
"/add <text> [link] [due] — add bounty\n"
"/update <id> [text] [link] [due] — update bounty\n"
"/edit <id> [text] [link] [due] — edit bounty (same as update)\n"
" /edit <id> -link [<url>] — clear or set link\n"
" /edit <id> -date [<date>] — clear or set date\n"
"/delete <id> — delete bounty (admin only)\n"
"/update <id> [text> [link] [due] — update bounty\n"
"/delete <id> — delete bounty\n"
"/track <id> — track a bounty (groups only)\n"
"/untrack <id> — stop tracking (groups only)\n"
"/show <id> — show bounty details\n"
"/timezone — get room timezone\n"
"/timezone <tz> — set room timezone (admin only)\n"
"/start — re-initialize\n"
"/help — this message",
disable_web_page_preview=True,

View File

@@ -127,6 +127,40 @@ def cmd_delete(args):
sys.exit(1)
def cmd_recover(args):
"""List or recover soft-deleted bounties."""
bounty_service, _ = create_services()
room_id = args.group_id or args.user_id
user_id = args.user_id or 0
if not args.bounty_ids:
deleted = bounty_service.list_deleted_bounties(room_id)
if not deleted:
print("No recoverable bounties")
return
print("Recoverable bounties:")
for b in deleted:
from datetime import datetime
deleted_str = datetime.fromtimestamp(b.deleted_at).strftime("%d %b %Y")
print(f" [#{b.id}] {b.text or '(no text)'} | Deleted {deleted_str}")
return
if not bounty_service.is_admin(room_id, user_id):
print("Error: Only admins can recover bounties.", file=sys.stderr)
sys.exit(1)
for bounty_id in args.bounty_ids:
try:
success, msg = bounty_service.recover_bounty(
room_id=room_id, bounty_id=bounty_id, user_id=user_id
)
print(msg)
except PermissionError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cmd_track(args):
"""Track a bounty."""
_, tracking_service = create_services()
@@ -196,6 +230,9 @@ def main():
parser_untrack = subparsers.add_parser("untrack", help="Untrack a bounty")
parser_untrack.add_argument("bounty_id", type=int, help="Bounty ID to untrack")
parser_recover = subparsers.add_parser("recover", help="List or recover soft-deleted bounties")
parser_recover.add_argument("bounty_ids", nargs="*", type=int, help="Bounty ID(s) to recover (optional)")
for sp in [
parser_add,
parser_list,
@@ -204,6 +241,7 @@ def main():
parser_delete,
parser_track,
parser_untrack,
parser_recover,
]:
sp.add_argument(
"--group-id", type=int, help="Group context (use group room ID)"
@@ -222,7 +260,7 @@ def main():
print("Error: either --group-id or --user-id is required", file=sys.stderr)
sys.exit(1)
if args.command in ("add", "list", "my", "update", "delete"):
if args.command in ("add", "list", "my", "update", "delete", "recover"):
if not (args.group_id or args.user_id):
print("Error: --group-id or --user-id required", file=sys.stderr)
sys.exit(1)
@@ -238,6 +276,7 @@ def main():
"delete": cmd_delete,
"track": cmd_track,
"untrack": cmd_untrack,
"recover": cmd_recover,
}
if args.command in command_map:

View File

@@ -210,6 +210,33 @@ class BountyService:
self._storage.update_bounty(room_id, bounty)
return True
def recover_bounty(
self, room_id: int, bounty_id: int, user_id: int
) -> tuple[bool, str]:
"""Recover a soft-deleted bounty. Only admins can recover.
Returns (success, message) tuple.
"""
all_bounties = self._storage.list_all_bounties(room_id, include_deleted=True)
bounty = None
for b in all_bounties:
if b.id == bounty_id:
bounty = b
break
if not bounty:
return False, f"Bounty #{bounty_id} not found."
if bounty.deleted_at is None:
return False, f"Bounty #{bounty_id} is not deleted."
if not self.is_admin(room_id, user_id):
raise PermissionError("Only admins can recover bounties.")
bounty.deleted_at = None
self._storage.update_bounty(room_id, bounty)
return True, f"Recovered bounty #{bounty_id}."
class TrackingService:
"""Service for tracking bounty operations."""