"""Telegram command handlers for JIGAIDO - Thin wrappers around core services.""" import time from datetime import datetime from typing import Optional from zoneinfo import ZoneInfo, ZoneInfoNotFoundError import dateparser from telegram import Update from telegram.ext import ContextTypes from telegram import InlineKeyboardButton, InlineKeyboardMarkup from telegram.constants import ParseMode from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage from core.services import BountyService, TrackingService ROOM_STORAGE = JsonFileRoomStorage() TRACKING_STORAGE = JsonFileTrackingStorage() BOUNTY_SERVICE = BountyService(ROOM_STORAGE) TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE) TELEGRAM_BOT_USERNAME = "your_bot_username" def format_due_date(due_date_ts: int | None, timezone_str: str) -> str: """Format due date as human-readable with timezone. Examples: No due date: (none shown) Date only: 4 April 2026 Date + time: 4 April 2026 14:30 With timezone: 4 April 2026 14:30 (Asia/Jakarta) """ if not due_date_ts: return "" try: tz = ZoneInfo(timezone_str) except (KeyError, ZoneInfoNotFoundError): tz = ZoneInfo("UTC") dt = datetime.fromtimestamp(due_date_ts, tz=tz) date_str = dt.strftime("%-d %B %Y") if dt.hour != 0 or dt.minute != 0: date_str += f" {dt.strftime('%H:%M')}" date_str += f" ({timezone_str})" return date_str def extract_args(text: str) -> list[str]: if not text: return [] tokens = text.strip().split() return tokens[1:] if len(tokens) > 1 else [] def parse_args( args: list[str], timezone_str: str = "UTC", ) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool, list[str], bool]: """Parse command arguments. Returns: (text, link, due_date_ts, clear_link, clear_date, category_slugs, clear_categories) """ text = None link = None due_date_ts = None clear_link = False clear_date = False category_slugs: list[str] = [] clear_categories = False try: tz = ZoneInfo(timezone_str) except (KeyError, ZoneInfoNotFoundError): tz = ZoneInfo("UTC") def is_url(s: str) -> bool: if not s: return False if s.startswith("http://") or s.startswith("https://"): return True if "." in s and " " not in s: return True return False def normalize_url(url: str) -> str: """Normalize URL by adding https:// prefix if missing.""" if not url: return url if url.startswith("http://") or url.startswith("https://"): return url # Add https:// for URLs without scheme return f"https://{url}" def is_time(s: str) -> bool: if not s or ":" not in s: return False parts = s.split(":") if len(parts) != 2: return False try: h, m = int(parts[0]), int(parts[1]) return 0 <= h <= 23 and 0 <= m <= 59 except ValueError: return False def parse_date_with_tz(date_str: str) -> int | None: parsed = dateparser.parse(date_str) if parsed: localized = parsed.replace(tzinfo=tz) return int(localized.timestamp()) return None def parse_category_slugs(cat_str: str) -> list[str]: """Parse comma-separated category slugs.""" return [s.strip().lower() for s in cat_str.split(",") if s.strip()] i = 0 while i < len(args): arg = args[i] if arg == "-link": if i + 1 < len(args) and not args[i + 1].startswith("-"): link = normalize_url(args[i + 1]) i += 2 else: clear_link = True i += 1 elif arg == "-date": if i + 1 < len(args) and not args[i + 1].startswith("-"): due_date_ts = parse_date_with_tz(args[i + 1]) if due_date_ts is not None: i += 2 if i < len(args) and is_time(args[i]): time_parts = args[i].split(":") due_date_ts += ( int(time_parts[0]) * 3600 + int(time_parts[1]) * 60 ) i += 1 else: clear_date = True i += 1 else: clear_date = True i += 1 elif arg == "-cat": if i + 1 < len(args) and args[i + 1] == "-": clear_categories = True i += 2 elif i + 1 < len(args) and not args[i + 1].startswith("-"): category_slugs = parse_category_slugs(args[i + 1]) i += 2 else: clear_categories = True i += 1 elif not link and is_url(arg): link = normalize_url(arg) i += 1 elif due_date_ts is None: due_date_ts = parse_date_with_tz(arg) if due_date_ts is not None: i += 1 if i < len(args) and is_time(args[i]): time_parts = args[i].split(":") due_date_ts += int(time_parts[0]) * 3600 + int(time_parts[1]) * 60 i += 1 else: i += 1 if text is None: text = arg else: text = text + " " + arg else: i += 1 if text is None: text = arg else: text = text + " " + arg return text, link, due_date_ts, clear_link, clear_date, category_slugs, clear_categories def format_bounty( b, show_id: bool = True, slice_length: int = 0, timezone_str: str = "UTC" ) -> str: parts = [] if show_id: parts.append(f"[#{b.id}]") if b.text: text = b.text if slice_length > 0 and len(text) > slice_length: text = text[:slice_length] + "..." parts.append(text) if b.link: parts.append(f"🔗 {b.link}") if b.due_date_ts: try: tz = ZoneInfo(timezone_str) except (KeyError, ZoneInfoNotFoundError): tz = ZoneInfo("UTC") dt_now = datetime.now(tz) dt_due = datetime.fromtimestamp(b.due_date_ts, tz=tz) seconds_left = int((dt_due - dt_now).total_seconds()) hours_left = seconds_left // 3600 days_left = seconds_left // 86400 due_str = dt_due.strftime("%d %b %Y %H:%M") if seconds_left < 0: parts.append(f"⏰ {due_str} (OVERDUE)") elif hours_left < 48: if hours_left < 1: minutes_left = seconds_left // 60 parts.append(f"⏰ {due_str} ({minutes_left}m)") else: parts.append(f"⏰ {due_str} ({hours_left}h)") else: parts.append(f"⏰ {due_str} ({days_left}d)") return " | ".join(parts) async def cmd_delete_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query if not query: return data = query.data if not data.startswith("del_msg:"): return parts = data.split(":") if len(parts) != 2: return try: expected_user_id = int(parts[1]) except ValueError: return user_id = get_user_id(update) if user_id != expected_user_id: await query.answer("You can't delete this message", show_alert=True) return try: await query.message.delete() await query.answer("Deleted") except Exception as e: await query.answer(f"Could not delete: {e}", show_alert=True) def is_group(update: Update) -> bool: return update.effective_chat.type != "private" def get_group_id(update: Update) -> int: return update.effective_chat.id def get_user_id(update: Update) -> int: return update.effective_user.id def get_room_id(update: Update) -> int: """Get room_id for the current context. For groups: negative group_id For DMs: positive user_id """ if is_group(update): return get_group_id(update) return get_user_id(update) async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: room_id = get_room_id(update) user_id = get_user_id(update) timezone_str = BOUNTY_SERVICE.get_timezone(room_id) args = extract_args(update.message.text) show_all = "all" in args args = [a for a in args if a != "all"] # Parse -c flag for category filter category_slugs: list[str] = [] filtered_args = [] i = 0 while i < len(args): if args[i] == "-c": if i + 1 < len(args) and not args[i + 1].startswith("-"): category_slugs = [s.strip().lower() for s in args[i + 1].split(",")] i += 2 else: i += 1 else: filtered_args.append(args[i]) i += 1 try: limit = int(filtered_args[0]) if filtered_args else 5 except (ValueError, IndexError): limit = 5 now = int(time.time()) cutoff_24h = now - 86400 all_bounties = BOUNTY_SERVICE.list_bounties(room_id) def is_expired(b) -> bool: return b.due_date_ts is not None and b.due_date_ts < cutoff_24h def sort_key(b): if b.due_date_ts is not None: return (0, b.due_date_ts) return (1, b.created_at) filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all] # Filter by category if specified if category_slugs: filtered_bounties = [ b for b in filtered_bounties if any(cat in b.category_ids for cat in category_slugs) ] filtered_bounties.sort(key=sort_key) total_count = len(filtered_bounties) displayed_bounties = filtered_bounties[:limit] if not displayed_bounties: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) if category_slugs: await update.message.reply_text( f"No bounties with category: {', '.join(category_slugs)}", reply_markup=reply_markup, ) elif show_all: await update.message.reply_text( "No bounties yet.", reply_markup=reply_markup, ) else: await update.message.reply_text( "No active bounties. Use /bounty all to show expired.", reply_markup=reply_markup, ) return lines = [] if category_slugs: lines.append(f"📂 Filtering with {', '.join(category_slugs)} categories:") if limit < total_count: lines.append(f"Showing {limit} of {total_count} bounties:") slice_length = 40 elif show_all and total_count > limit: lines.append(f"Showing {limit} of {total_count} bounties (including expired):") slice_length = 40 else: lines.append(f"Showing {total_count} bounties:") slice_length = 0 for b in displayed_bounties: lines.append( format_bounty( b, show_id=True, slice_length=slice_length, timezone_str=timezone_str ) ) keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup ) async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) args = extract_args(update.message.text) # Parse show_all flag show_all = "all" in args args = [a for a in args if a != "all"] # Parse optional limit try: limit = int(args[0]) if args else 5 except (ValueError, IndexError): limit = 5 if is_group(update): group_id = get_group_id(update) bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id) room_id = group_id else: room_id = get_room_id(update) bounties = BOUNTY_SERVICE.list_bounties(room_id) timezone_str = BOUNTY_SERVICE.get_timezone(room_id) now = int(time.time()) cutoff_24h = now - 86400 def is_expired(b) -> bool: return b.due_date_ts is not None and b.due_date_ts < cutoff_24h def sort_key(b): if b.due_date_ts is not None: return (0, b.due_date_ts) return (1, b.created_at) # Filter expired and sort filtered_bounties = [b for b in bounties if not is_expired(b) or show_all] filtered_bounties.sort(key=sort_key) total_count = len(filtered_bounties) displayed_bounties = filtered_bounties[:limit] if not displayed_bounties: msg = ( "You are not tracking any bounties." if is_group(update) else "No personal bounties." ) await update.message.reply_text(msg) return lines = [] if limit < total_count: lines.append(f"Showing {limit} of {total_count} bounties:") slice_length = 40 elif show_all and total_count > limit: lines.append(f"Showing {limit} of {total_count} bounties (including expired):") slice_length = 40 else: lines.append(f"Showing {total_count} bounties:") slice_length = 0 for b in displayed_bounties: lines.append( format_bounty(b, show_id=True, slice_length=slice_length, timezone_str=timezone_str) ) keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup ) async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: args = extract_args(update.message.text) if not args: await update.message.reply_text( "Usage: /add [link] [due_date] [-cat [,]]\n" "Example: /add Fix the bug https://github.com/foo/bar tomorrow -cat bug,urgent" ) return user_id = get_user_id(update) room_id = get_room_id(update) timezone_str = BOUNTY_SERVICE.get_timezone(room_id) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None text, link, due_date_ts, _, _, category_slugs, _ = parse_args(args, timezone_str) if not text and not link: await update.message.reply_text("A bounty needs at least text or a link.") return try: bounty = BOUNTY_SERVICE.add_bounty( room_id=room_id, user_id=user_id, username=username, text=text, link=link, due_date_ts=due_date_ts, created_by_username=username, ) # Add categories if specified for slug in category_slugs: try: BOUNTY_SERVICE.add_category_to_bounty( room_id, bounty.id, slug, username ) except ValueError: pass # Category doesn't exist, skip it except PermissionError: pass except PermissionError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return except ValueError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return due_str = "" if due_date_ts: timezone_str = BOUNTY_SERVICE.get_timezone(room_id) due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}" cat_str = "" if category_slugs: cat_str = f" | 📂 {', '.join(category_slugs)}" keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Bounty added (#{bounty.id}){due_str}{cat_str}", disable_web_page_preview=True, reply_markup=reply_markup, ) async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None if not BOUNTY_SERVICE.is_admin(room_id, username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can edit bounties.", reply_markup=reply_markup, ) return args = extract_args(update.message.text) if len(args) < 1: await update.message.reply_text( "Usage: /update [text] [link] [due_date]\n" " /update -link [] - clear or set link\n" " /update -date [] - clear or set date\n" " /update -cat [,] - add/replace categories\n" " /update -cat - - clear categories\n" " /update -remove-cat - remove category\n" "Examples:\n" " /update 1 new text - update text only\n" " /update 1 -link - clear link\n" " /update 1 -link https://... - set link\n" " /update 1 -cat bug,feature - set categories\n" " /update 1 -remove-cat bug - remove category" ) return try: bounty_id = int(args[0]) except ValueError: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Invalid bounty ID.", reply_markup=reply_markup, ) return timezone_str = BOUNTY_SERVICE.get_timezone(room_id) # Parse flags separately to handle -cat and -remove-cat specially remaining_args = args[1:] category_slugs: list[str] = [] clear_categories = False remove_category_slug: str | None = None # Process remaining args for special flags filtered_args = [] i = 0 while i < len(remaining_args): arg = remaining_args[i] if arg == "-cat": if i + 1 < len(remaining_args) and remaining_args[i + 1] == "-": clear_categories = True i += 2 elif i + 1 < len(remaining_args) and not remaining_args[i + 1].startswith("-"): category_slugs = [s.strip().lower() for s in remaining_args[i + 1].split(",")] i += 2 else: clear_categories = True i += 1 elif arg == "-remove-cat": if i + 1 < len(remaining_args) and not remaining_args[i + 1].startswith("-"): remove_category_slug = remaining_args[i + 1].lower() i += 2 else: i += 1 else: filtered_args.append(arg) i += 1 text, link, due_date_ts, clear_link, clear_date, _, _ = parse_args(filtered_args, timezone_str) old_bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id) if not old_bounty: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Bounty not found.", reply_markup=reply_markup, ) return changes = [] # Handle category operations if clear_categories: try: BOUNTY_SERVICE.update_bounty_categories(room_id, bounty_id, [], username) changes.append("Categories cleared") except (PermissionError, ValueError): pass if category_slugs: try: BOUNTY_SERVICE.update_bounty_categories(room_id, bounty_id, category_slugs, username) changes.append(f"Categories set: {', '.join(category_slugs)}") except (PermissionError, ValueError): pass if remove_category_slug: try: BOUNTY_SERVICE.remove_category_from_bounty(room_id, bounty_id, remove_category_slug, username) changes.append(f"Removed category: {remove_category_slug}") except (PermissionError, ValueError): pass # Only update other fields if something was provided if text is not None or link is not None or due_date_ts is not None or clear_link or clear_date: try: BOUNTY_SERVICE.update_bounty( room_id=room_id, bounty_id=bounty_id, username=username, text=text, link=link, due_date_ts=due_date_ts, clear_link=clear_link, clear_due=clear_date, ) if text is not None: old_text = old_bounty.text or "(none)" changes.append(f"Text: {old_text} → {text}") if link is not None: old_link = old_bounty.link or "(none)" changes.append(f"Link: {old_link} → {link}") if due_date_ts is not None: old_date = ( format_due_date(old_bounty.due_date_ts, timezone_str) if old_bounty.due_date_ts else "(none)" ) new_date = format_due_date(due_date_ts, timezone_str) changes.append(f"Date: {old_date} → {new_date}") if clear_link: old_link = old_bounty.link or "(none)" changes.append(f"Link: {old_link} → (cleared)") if clear_date: old_date = ( format_due_date(old_bounty.due_date_ts, timezone_str) if old_bounty.due_date_ts else "(none)" ) changes.append(f"Date: {old_date} → (cleared)") except PermissionError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return except ValueError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return if changes: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Bounty #{bounty_id} updated:\n" + "\n".join(changes), reply_markup=reply_markup, ) else: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Bounty #{bounty_id} updated.", reply_markup=reply_markup, ) cmd_edit = cmd_update async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None if not BOUNTY_SERVICE.is_admin(room_id, username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can delete bounties.", reply_markup=reply_markup, ) return args = extract_args(update.message.text) if not args: await update.message.reply_text("Usage: /delete [bounty_id ...]") return try: bounty_ids = [int(arg) for arg in args] except ValueError: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Invalid bounty ID.", reply_markup=reply_markup, ) return try: results = BOUNTY_SERVICE.delete_bounties( room_id=room_id, bounty_ids=bounty_ids, username=username, ) except PermissionError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return response_lines = [] for bounty_id, result in results.items(): if result == "deleted": response_lines.append(f"✅ Bounty #{bounty_id} deleted.") elif result == "not_found": response_lines.append(f"⛔ Bounty #{bounty_id} not found.") elif result == "permission_denied": response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.") keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(response_lines), reply_markup=reply_markup ) async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) if not is_group(update): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ /track is only available in groups.", reply_markup=reply_markup, ) return args = extract_args(update.message.text) if not args: await update.message.reply_text("Usage: /track ") return try: bounty_id = int(args[0]) except ValueError: await update.message.reply_text("Invalid bounty ID.") return user_id = get_user_id(update) room_id = get_room_id(update) try: if TRACKING_SERVICE.track_bounty(room_id, user_id, bounty_id): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Now tracking bounty #{bounty_id}.", reply_markup=reply_markup, ) else: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"Already tracking bounty #{bounty_id}.", reply_markup=reply_markup, ) except ValueError as e: await update.message.reply_text(str(e)) async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) if not is_group(update): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ /untrack is only available in groups.", reply_markup=reply_markup, ) return args = extract_args(update.message.text) if not args: await update.message.reply_text("Usage: /untrack ") return try: bounty_id = int(args[0]) except ValueError: await update.message.reply_text("Invalid bounty ID.") return user_id = get_user_id(update) room_id = get_room_id(update) if TRACKING_SERVICE.untrack_bounty(room_id, user_id, bounty_id): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Untracked bounty #{bounty_id}.", reply_markup=reply_markup, ) else: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"Not tracking bounty #{bounty_id}.", reply_markup=reply_markup, ) async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None if is_group(update): try: chat_member = await ctx.bot.get_chat_member(room_id, user_id) if chat_member.status == "creator" and not BOUNTY_SERVICE.is_admin( room_id, username ): BOUNTY_SERVICE.add_admin(room_id, username, username) keyboard = [ [ InlineKeyboardButton( "🗑️ Delete", callback_data=f"del_msg:{user_id}" ) ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "👻 JIGAIDO is watching.\n\n" "This group's bounty list is now active.\n" "You are now an admin (group creator).\n" "/bounty — list bounties\n" "/add — create a bounty\n" "/track — track a bounty\n" "/my — your tracked bounties", reply_markup=reply_markup, ) return except Exception: pass keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "👻 JIGAIDO is watching.\n\n" "This group's bounty list is now active.\n" "/bounty — list bounties\n" "/add — create a bounty\n" "/track — track a bounty\n" "/my — your tracked bounties", reply_markup=reply_markup, ) else: if not BOUNTY_SERVICE.is_admin(room_id, username): BOUNTY_SERVICE.add_admin(room_id, username, username) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "👻 JIGAIDO activated.\n\n" "Personal bounty list ready.\n" "/bounty — list your bounties\n" "/add — create a bounty\n" "/my — your tracked bounties", reply_markup=reply_markup, ) async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) args = extract_args(update.message.text) if not args: await update.message.reply_text("Usage: /show ") return try: bounty_id = int(args[0]) except ValueError: await update.message.reply_text("Invalid bounty ID.") return room_id = get_room_id(update) bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id) if not bounty: await update.message.reply_text("Bounty not found.") return timezone_str = BOUNTY_SERVICE.get_timezone(room_id) try: tz = ZoneInfo(timezone_str) except (KeyError, ZoneInfoNotFoundError): tz = ZoneInfo("UTC") lines = [] title = bounty.text or "(no text)" lines.append(f"[#{bounty.id}] {title}") if bounty.link: lines.append(f"🔗 {bounty.link}") if bounty.due_date_ts: dt_due = datetime.fromtimestamp(bounty.due_date_ts, tz=tz) due_str = dt_due.strftime("%d %B %Y %H:%M") lines.append(f"📅 {due_str} ({timezone_str})") if bounty.category_ids: lines.append(f"📂 Categories: {' | '.join(bounty.category_ids)}") if bounty.created_by_username: lines.append( f'👤 {bounty.created_by_username}' ) dt_created = datetime.fromtimestamp(bounty.created_at, tz=tz) created_str = dt_created.strftime("%Y-%m-%d %H:%M") lines.append(f"📌 Created: {created_str}") keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), disable_web_page_preview=True, parse_mode=ParseMode.HTML, reply_markup=reply_markup, ) async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) args = extract_args(update.message.text) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None if not BOUNTY_SERVICE.is_admin(room_id, username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can change timezone.", reply_markup=reply_markup, ) return if not args: current_tz = BOUNTY_SERVICE.get_timezone(room_id) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"Current timezone: {current_tz}", reply_markup=reply_markup, ) return timezone_str = args[0] try: ZoneInfo(timezone_str) except (KeyError, ZoneInfoNotFoundError): await update.message.reply_text( "⛔ Invalid timezone. Use IANA format (e.g., Asia/Jakarta)" ) return try: BOUNTY_SERVICE.set_timezone(room_id, timezone_str, username) except PermissionError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Timezone set to {timezone_str}.", reply_markup=reply_markup, ) async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) args = extract_args(update.message.text) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None if not BOUNTY_SERVICE.is_admin(room_id, username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can perform this action.", reply_markup=reply_markup, ) return if not args: deleted_bounties = BOUNTY_SERVICE.list_deleted_bounties(room_id) if not deleted_bounties: await update.message.reply_text("No recoverable bounties.") return deleted_bounties.sort(key=lambda b: b.deleted_at or 0, reverse=True) lines = ["Recoverable bounties:"] for b in deleted_bounties[:10]: text = ( b.text[:40] + "..." if b.text and len(b.text) > 40 else (b.text or "(no text)") ) link_str = f" | 🔗 {b.link}" if b.link else "" deleted_str = ( time.strftime("%d %b %Y", time.localtime(b.deleted_at)) if b.deleted_at else "unknown" ) lines.append(f"[#{b.id}] {text}{link_str} | 🗑️ Deleted {deleted_str}") user_id = get_user_id(update) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup, ) return try: bounty_ids = [int(arg) for arg in args] except ValueError: await update.message.reply_text("Invalid bounty ID.") return results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, username) response_lines = [] for bounty_id, result in results.items(): if result == "recovered": response_lines.append(f"✅ Recovered bounty #{bounty_id}.") elif result == "not_found": response_lines.append(f"⛔ Bounty #{bounty_id} not found.") elif result == "not_deleted": response_lines.append(f"⛔ Bounty #{bounty_id} is not deleted.") elif result == "permission_denied": response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.") user_id = get_user_id(update) keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(response_lines), reply_markup=reply_markup ) async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: args = extract_args(update.message.text) room_id = get_room_id(update) effective_user = update.effective_user requesting_username = effective_user.username or effective_user.first_name or None if not args: if not BOUNTY_SERVICE.is_admin(room_id, requesting_username): user_id = get_user_id(update) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can perform this action.", reply_markup=reply_markup, ) return admins = BOUNTY_SERVICE.list_admins(room_id) if not admins: user_id = get_user_id(update) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "No admins in this room.", reply_markup=reply_markup, ) return user_id = get_user_id(update) admin_mentions = [f"@{admin}" for admin in admins] keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions), parse_mode=ParseMode.HTML, reply_markup=reply_markup, ) return if args[0] not in ("add", "remove"): await update.message.reply_text( "Usage:\n" "/admin — list admins\n" "/admin add @username — add admin\n" "/admin remove @username — remove admin" ) return subcommand = args[0] if len(args) < 2: await update.message.reply_text(f"Usage: /admin {subcommand} @username") return raw_username = args[1] if not raw_username.startswith("@"): await update.message.reply_text(f"Usage: /admin {subcommand} @username") return target_username = raw_username[1:] user_id = get_user_id(update) if not BOUNTY_SERVICE.is_admin(room_id, requesting_username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can perform this action.", reply_markup=reply_markup, ) return try: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) if subcommand == "add": BOUNTY_SERVICE.add_admin(room_id, target_username, requesting_username) await update.message.reply_text( f"✅ @{target_username} is now an admin.", reply_markup=reply_markup, ) elif subcommand == "remove": BOUNTY_SERVICE.remove_admin(room_id, target_username, requesting_username) await update.message.reply_text( f"✅ @{target_username} is no longer an admin.", reply_markup=reply_markup, ) except (PermissionError, ValueError) as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) async def cmd_category(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: """Handle /category command for listing, adding, and deleting categories.""" user_id = get_user_id(update) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None args = extract_args(update.message.text) if not args: # List categories categories = BOUNTY_SERVICE.list_categories(room_id) if not categories: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "No categories yet.\n" "Usage: /category add ", reply_markup=reply_markup, ) return lines = ["Categories:"] for cat in categories: lines.append(f"- {cat.id} → {cat.name}") keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), reply_markup=reply_markup, ) return subcommand = args[0] if subcommand == "add": # Add category if not BOUNTY_SERVICE.is_admin(room_id, username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can add categories.", reply_markup=reply_markup, ) return if len(args) < 3: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Usage: /category add \n" "Example: /category add bug 'Bug Report'", reply_markup=reply_markup, ) return slug = args[1].lower() name = " ".join(args[2:]) try: category = BOUNTY_SERVICE.add_category(room_id, slug, name, username) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Category added: {category.id} → {category.name}", reply_markup=reply_markup, ) except PermissionError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) except ValueError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return elif subcommand == "delete": # Delete category if not BOUNTY_SERVICE.is_admin(room_id, username): keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "⛔ Only admins can delete categories.", reply_markup=reply_markup, ) return if len(args) < 2: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Usage: /category delete ", reply_markup=reply_markup, ) return slug = args[1].lower() try: success = BOUNTY_SERVICE.delete_category(room_id, slug, username) keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) if success: await update.message.reply_text( f"✅ Category '{slug}' deleted.", reply_markup=reply_markup, ) else: await update.message.reply_text( f"⛔ Category '{slug}' not found.", reply_markup=reply_markup, ) except PermissionError as e: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(f"⛔ {e}", reply_markup=reply_markup) return else: keyboard = [ [InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "Usage:\n" "/category - list categories\n" "/category add - add category (admin)\n" "/category delete - delete category (admin)", reply_markup=reply_markup, ) async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None: user_id = get_user_id(update) room_id = get_room_id(update) effective_user = update.effective_user username = effective_user.username or effective_user.first_name or None is_admin = BOUNTY_SERVICE.is_admin(room_id, username) if is_admin: lines = [ "👻 JIGAIDO Commands (Admin):\n", "📋 Bounty Management:", "/bounty — list bounties", "/add — add bounty", "/edit — edit bounty", "/delete — delete bounty", "/recover — recover deleted bounties", "", "📂 Categories:", "/category — list categories", "/category add — add category", "/category delete — delete category", "", "🔗 Tracking:", "/track — track bounty", "/untrack — stop tracking", "/my — your tracked bounties", "/show — show bounty details", "", "⚙️ Room Management:", "/admin — manage admins", "/timezone — get/set timezone", "", "/start — re-initialize", "/help — show this message", ] else: lines = [ "👻 JIGAIDO Commands:\n", "/bounty — list bounties", "/track — track bounty", "/untrack — stop tracking", "/my — your tracked bounties", "/show — show bounty details", "/start — re-initialize", "/help — show this message", ] keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup, )