Files
jigaido/apps/telegram-bot/commands.py
shokollm 3743dc6a45 fix: add sort and limit to /my command for consistency with /bounty
- Add same sort_key logic as /bounty (due date first, then created_at)
- Add default limit of 5 with "Showing X of Y" message
- Add "all" flag to show expired bounties
- Add delete button for consistency with /bounty

Fixes #94
2026-04-09 14:08:57 +00:00

1471 lines
50 KiB
Python

"""Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
import time
from datetime import datetime
from typing import Optional
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import dateparser
from telegram import Update
from telegram.ext import ContextTypes
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ParseMode
from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage
from core.services import BountyService, TrackingService
ROOM_STORAGE = JsonFileRoomStorage()
TRACKING_STORAGE = JsonFileTrackingStorage()
BOUNTY_SERVICE = BountyService(ROOM_STORAGE)
TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE)
TELEGRAM_BOT_USERNAME = "your_bot_username"
def format_due_date(due_date_ts: int | None, timezone_str: str) -> str:
"""Format due date as human-readable with timezone.
Examples:
No due date: (none shown)
Date only: 4 April 2026
Date + time: 4 April 2026 14:30
With timezone: 4 April 2026 14:30 (Asia/Jakarta)
"""
if not due_date_ts:
return ""
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
dt = datetime.fromtimestamp(due_date_ts, tz=tz)
date_str = dt.strftime("%-d %B %Y")
if dt.hour != 0 or dt.minute != 0:
date_str += f" {dt.strftime('%H:%M')}"
date_str += f" ({timezone_str})"
return date_str
def extract_args(text: str) -> list[str]:
if not text:
return []
tokens = text.strip().split()
return tokens[1:] if len(tokens) > 1 else []
def parse_args(
args: list[str],
timezone_str: str = "UTC",
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool, list[str], bool]:
"""Parse command arguments.
Returns:
(text, link, due_date_ts, clear_link, clear_date, category_slugs, clear_categories)
"""
text = None
link = None
due_date_ts = None
clear_link = False
clear_date = False
category_slugs: list[str] = []
clear_categories = False
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
def is_url(s: str) -> bool:
if not s:
return False
if s.startswith("http://") or s.startswith("https://"):
return True
if "." in s and " " not in s:
return True
return False
def normalize_url(url: str) -> str:
"""Normalize URL by adding https:// prefix if missing."""
if not url:
return url
if url.startswith("http://") or url.startswith("https://"):
return url
# Add https:// for URLs without scheme
return f"https://{url}"
def is_time(s: str) -> bool:
if not s or ":" not in s:
return False
parts = s.split(":")
if len(parts) != 2:
return False
try:
h, m = int(parts[0]), int(parts[1])
return 0 <= h <= 23 and 0 <= m <= 59
except ValueError:
return False
def parse_date_with_tz(date_str: str) -> int | None:
parsed = dateparser.parse(date_str)
if parsed:
localized = parsed.replace(tzinfo=tz)
return int(localized.timestamp())
return None
def parse_category_slugs(cat_str: str) -> list[str]:
"""Parse comma-separated category slugs."""
return [s.strip().lower() for s in cat_str.split(",") if s.strip()]
i = 0
while i < len(args):
arg = args[i]
if arg == "-link":
if i + 1 < len(args) and not args[i + 1].startswith("-"):
link = normalize_url(args[i + 1])
i += 2
else:
clear_link = True
i += 1
elif arg == "-date":
if i + 1 < len(args) and not args[i + 1].startswith("-"):
due_date_ts = parse_date_with_tz(args[i + 1])
if due_date_ts is not None:
i += 2
if i < len(args) and is_time(args[i]):
time_parts = args[i].split(":")
due_date_ts += (
int(time_parts[0]) * 3600 + int(time_parts[1]) * 60
)
i += 1
else:
clear_date = True
i += 1
else:
clear_date = True
i += 1
elif arg == "-cat":
if i + 1 < len(args) and args[i + 1] == "-":
clear_categories = True
i += 2
elif i + 1 < len(args) and not args[i + 1].startswith("-"):
category_slugs = parse_category_slugs(args[i + 1])
i += 2
else:
clear_categories = True
i += 1
elif not link and is_url(arg):
link = normalize_url(arg)
i += 1
elif due_date_ts is None:
due_date_ts = parse_date_with_tz(arg)
if due_date_ts is not None:
i += 1
if i < len(args) and is_time(args[i]):
time_parts = args[i].split(":")
due_date_ts += int(time_parts[0]) * 3600 + int(time_parts[1]) * 60
i += 1
else:
i += 1
if text is None:
text = arg
else:
text = text + " " + arg
else:
i += 1
if text is None:
text = arg
else:
text = text + " " + arg
return text, link, due_date_ts, clear_link, clear_date, category_slugs, clear_categories
def format_bounty(
b, show_id: bool = True, slice_length: int = 0, timezone_str: str = "UTC"
) -> str:
parts = []
if show_id:
parts.append(f"[#{b.id}]")
if b.text:
text = b.text
if slice_length > 0 and len(text) > slice_length:
text = text[:slice_length] + "..."
parts.append(text)
if b.link:
parts.append(f"🔗 {b.link}")
if b.due_date_ts:
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
dt_now = datetime.now(tz)
dt_due = datetime.fromtimestamp(b.due_date_ts, tz=tz)
seconds_left = int((dt_due - dt_now).total_seconds())
hours_left = seconds_left // 3600
days_left = seconds_left // 86400
due_str = dt_due.strftime("%d %b %Y %H:%M")
if seconds_left < 0:
parts.append(f"{due_str} (OVERDUE)")
elif hours_left < 48:
if hours_left < 1:
minutes_left = seconds_left // 60
parts.append(f"{due_str} ({minutes_left}m)")
else:
parts.append(f"{due_str} ({hours_left}h)")
else:
parts.append(f"{due_str} ({days_left}d)")
return " | ".join(parts)
async def cmd_delete_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
if not query:
return
data = query.data
if not data.startswith("del_msg:"):
return
parts = data.split(":")
if len(parts) != 2:
return
try:
expected_user_id = int(parts[1])
except ValueError:
return
user_id = get_user_id(update)
if user_id != expected_user_id:
await query.answer("You can't delete this message", show_alert=True)
return
try:
await query.message.delete()
await query.answer("Deleted")
except Exception as e:
await query.answer(f"Could not delete: {e}", show_alert=True)
def is_group(update: Update) -> bool:
return update.effective_chat.type != "private"
def get_group_id(update: Update) -> int:
return update.effective_chat.id
def get_user_id(update: Update) -> int:
return update.effective_user.id
def get_room_id(update: Update) -> int:
"""Get room_id for the current context.
For groups: negative group_id
For DMs: positive user_id
"""
if is_group(update):
return get_group_id(update)
return get_user_id(update)
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update)
user_id = get_user_id(update)
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
args = extract_args(update.message.text)
show_all = "all" in args
args = [a for a in args if a != "all"]
# Parse -c flag for category filter
category_slugs: list[str] = []
filtered_args = []
i = 0
while i < len(args):
if args[i] == "-c":
if i + 1 < len(args) and not args[i + 1].startswith("-"):
category_slugs = [s.strip().lower() for s in args[i + 1].split(",")]
i += 2
else:
i += 1
else:
filtered_args.append(args[i])
i += 1
try:
limit = int(filtered_args[0]) if filtered_args else 5
except (ValueError, IndexError):
limit = 5
now = int(time.time())
cutoff_24h = now - 86400
all_bounties = BOUNTY_SERVICE.list_bounties(room_id)
def is_expired(b) -> bool:
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
def sort_key(b):
if b.due_date_ts is not None:
return (0, b.due_date_ts)
return (1, b.created_at)
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
# Filter by category if specified
if category_slugs:
filtered_bounties = [
b for b in filtered_bounties if any(cat in b.category_ids for cat in category_slugs)
]
filtered_bounties.sort(key=sort_key)
total_count = len(filtered_bounties)
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if category_slugs:
await update.message.reply_text(
f"No bounties with category: {', '.join(category_slugs)}",
reply_markup=reply_markup,
)
elif show_all:
await update.message.reply_text(
"No bounties yet.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text(
"No active bounties. Use /bounty all to show expired.",
reply_markup=reply_markup,
)
return
lines = []
if category_slugs:
lines.append(f"📂 Filtering with {', '.join(category_slugs)} categories:")
if limit < total_count:
lines.append(f"Showing {limit} of {total_count} bounties:")
slice_length = 40
elif show_all and total_count > limit:
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
slice_length = 40
else:
lines.append(f"Showing {total_count} bounties:")
slice_length = 0
for b in displayed_bounties:
lines.append(
format_bounty(
b, show_id=True, slice_length=slice_length, timezone_str=timezone_str
)
)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup
)
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
# Parse show_all flag
show_all = "all" in args
args = [a for a in args if a != "all"]
# Parse optional limit
try:
limit = int(args[0]) if args else 5
except (ValueError, IndexError):
limit = 5
if is_group(update):
group_id = get_group_id(update)
bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id)
room_id = group_id
else:
room_id = get_room_id(update)
bounties = BOUNTY_SERVICE.list_bounties(room_id)
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
now = int(time.time())
cutoff_24h = now - 86400
def is_expired(b) -> bool:
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
def sort_key(b):
if b.due_date_ts is not None:
return (0, b.due_date_ts)
return (1, b.created_at)
# Filter expired and sort
filtered_bounties = [b for b in bounties if not is_expired(b) or show_all]
filtered_bounties.sort(key=sort_key)
total_count = len(filtered_bounties)
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
msg = (
"You are not tracking any bounties."
if is_group(update)
else "No personal bounties."
)
await update.message.reply_text(msg)
return
lines = []
if limit < total_count:
lines.append(f"Showing {limit} of {total_count} bounties:")
slice_length = 40
elif show_all and total_count > limit:
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
slice_length = 40
else:
lines.append(f"Showing {total_count} bounties:")
slice_length = 0
for b in displayed_bounties:
lines.append(
format_bounty(b, show_id=True, slice_length=slice_length, timezone_str=timezone_str)
)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines), disable_web_page_preview=True, reply_markup=reply_markup
)
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text(
"Usage: /add <text> [link] [due_date] [-cat <slug>[,<slug>]]\n"
"Example: /add Fix the bug https://github.com/foo/bar tomorrow -cat bug,urgent"
)
return
user_id = get_user_id(update)
room_id = get_room_id(update)
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
text, link, due_date_ts, _, _, category_slugs, _ = parse_args(args, timezone_str)
if not text and not link:
await update.message.reply_text("A bounty needs at least text or a link.")
return
try:
bounty = BOUNTY_SERVICE.add_bounty(
room_id=room_id,
user_id=user_id,
username=username,
text=text,
link=link,
due_date_ts=due_date_ts,
created_by_username=username,
)
# Add categories if specified
for slug in category_slugs:
try:
BOUNTY_SERVICE.add_category_to_bounty(
room_id, bounty.id, slug, username
)
except ValueError:
pass # Category doesn't exist, skip it
except PermissionError:
pass
except PermissionError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
except ValueError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
due_str = ""
if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
cat_str = ""
if category_slugs:
cat_str = f" | 📂 {', '.join(category_slugs)}"
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}{cat_str}",
disable_web_page_preview=True,
reply_markup=reply_markup,
)
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can edit bounties.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
"Usage: /update <bounty_id> [text] [link] [due_date]\n"
" /update <bounty_id> -link [<url>] - clear or set link\n"
" /update <bounty_id> -date [<date>] - clear or set date\n"
" /update <bounty_id> -cat <slug>[,<slug>] - add/replace categories\n"
" /update <bounty_id> -cat - - clear categories\n"
" /update <bounty_id> -remove-cat <slug> - remove category\n"
"Examples:\n"
" /update 1 new text - update text only\n"
" /update 1 -link - clear link\n"
" /update 1 -link https://... - set link\n"
" /update 1 -cat bug,feature - set categories\n"
" /update 1 -remove-cat bug - remove category"
)
return
try:
bounty_id = int(args[0])
except ValueError:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Invalid bounty ID.",
reply_markup=reply_markup,
)
return
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
# Parse flags separately to handle -cat and -remove-cat specially
remaining_args = args[1:]
category_slugs: list[str] = []
clear_categories = False
remove_category_slug: str | None = None
# Process remaining args for special flags
filtered_args = []
i = 0
while i < len(remaining_args):
arg = remaining_args[i]
if arg == "-cat":
if i + 1 < len(remaining_args) and remaining_args[i + 1] == "-":
clear_categories = True
i += 2
elif i + 1 < len(remaining_args) and not remaining_args[i + 1].startswith("-"):
category_slugs = [s.strip().lower() for s in remaining_args[i + 1].split(",")]
i += 2
else:
clear_categories = True
i += 1
elif arg == "-remove-cat":
if i + 1 < len(remaining_args) and not remaining_args[i + 1].startswith("-"):
remove_category_slug = remaining_args[i + 1].lower()
i += 2
else:
i += 1
else:
filtered_args.append(arg)
i += 1
text, link, due_date_ts, clear_link, clear_date, _, _ = parse_args(filtered_args, timezone_str)
old_bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
if not old_bounty:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Bounty not found.",
reply_markup=reply_markup,
)
return
changes = []
# Handle category operations
if clear_categories:
try:
BOUNTY_SERVICE.update_bounty_categories(room_id, bounty_id, [], username)
changes.append("Categories cleared")
except (PermissionError, ValueError):
pass
if category_slugs:
try:
BOUNTY_SERVICE.update_bounty_categories(room_id, bounty_id, category_slugs, username)
changes.append(f"Categories set: {', '.join(category_slugs)}")
except (PermissionError, ValueError):
pass
if remove_category_slug:
try:
BOUNTY_SERVICE.remove_category_from_bounty(room_id, bounty_id, remove_category_slug, username)
changes.append(f"Removed category: {remove_category_slug}")
except (PermissionError, ValueError):
pass
# Only update other fields if something was provided
if text is not None or link is not None or due_date_ts is not None or clear_link or clear_date:
try:
BOUNTY_SERVICE.update_bounty(
room_id=room_id,
bounty_id=bounty_id,
username=username,
text=text,
link=link,
due_date_ts=due_date_ts,
clear_link=clear_link,
clear_due=clear_date,
)
if text is not None:
old_text = old_bounty.text or "(none)"
changes.append(f"Text: {old_text}{text}")
if link is not None:
old_link = old_bounty.link or "(none)"
changes.append(f"Link: {old_link}{link}")
if due_date_ts is not None:
old_date = (
format_due_date(old_bounty.due_date_ts, timezone_str)
if old_bounty.due_date_ts
else "(none)"
)
new_date = format_due_date(due_date_ts, timezone_str)
changes.append(f"Date: {old_date}{new_date}")
if clear_link:
old_link = old_bounty.link or "(none)"
changes.append(f"Link: {old_link} → (cleared)")
if clear_date:
old_date = (
format_due_date(old_bounty.due_date_ts, timezone_str)
if old_bounty.due_date_ts
else "(none)"
)
changes.append(f"Date: {old_date} → (cleared)")
except PermissionError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
except ValueError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
if changes:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty #{bounty_id} updated:\n" + "\n".join(changes),
reply_markup=reply_markup,
)
else:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty #{bounty_id} updated.",
reply_markup=reply_markup,
)
cmd_edit = cmd_update
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can delete bounties.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /delete <bounty_id> [bounty_id ...]")
return
try:
bounty_ids = [int(arg) for arg in args]
except ValueError:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Invalid bounty ID.",
reply_markup=reply_markup,
)
return
try:
results = BOUNTY_SERVICE.delete_bounties(
room_id=room_id,
bounty_ids=bounty_ids,
username=username,
)
except PermissionError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
response_lines = []
for bounty_id, result in results.items():
if result == "deleted":
response_lines.append(f"✅ Bounty #{bounty_id} deleted.")
elif result == "not_found":
response_lines.append(f"⛔ Bounty #{bounty_id} not found.")
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(response_lines), reply_markup=reply_markup
)
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if not is_group(update):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ /track is only available in groups.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /track <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
try:
if TRACKING_SERVICE.track_bounty(room_id, user_id, bounty_id):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Now tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
else:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Already tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
except ValueError as e:
await update.message.reply_text(str(e))
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if not is_group(update):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ /untrack is only available in groups.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /untrack <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
if TRACKING_SERVICE.untrack_bounty(room_id, user_id, bounty_id):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Untracked bounty #{bounty_id}.",
reply_markup=reply_markup,
)
else:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Not tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if is_group(update):
try:
chat_member = await ctx.bot.get_chat_member(room_id, user_id)
if chat_member.status == "creator" and not BOUNTY_SERVICE.is_admin(
room_id, username
):
BOUNTY_SERVICE.add_admin(room_id, username, username)
keyboard = [
[
InlineKeyboardButton(
"🗑️ Delete", callback_data=f"del_msg:{user_id}"
)
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"You are now an admin (group creator).\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
return
except Exception:
pass
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
else:
if not BOUNTY_SERVICE.is_admin(room_id, username):
BOUNTY_SERVICE.add_admin(room_id, username, username)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO activated.\n\n"
"Personal bounty list ready.\n"
"/bounty — list your bounties\n"
"/add — create a bounty\n"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /show <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
room_id = get_room_id(update)
bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
if not bounty:
await update.message.reply_text("Bounty not found.")
return
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
lines = []
title = bounty.text or "(no text)"
lines.append(f"[#{bounty.id}] {title}")
if bounty.link:
lines.append(f"🔗 {bounty.link}")
if bounty.due_date_ts:
dt_due = datetime.fromtimestamp(bounty.due_date_ts, tz=tz)
due_str = dt_due.strftime("%d %B %Y %H:%M")
lines.append(f"📅 {due_str} ({timezone_str})")
if bounty.category_ids:
lines.append(f"📂 Categories: {' | '.join(bounty.category_ids)}")
if bounty.created_by_username:
lines.append(
f'👤 <a href="tg://user?id={bounty.created_by_user_id}">{bounty.created_by_username}</a>'
)
dt_created = datetime.fromtimestamp(bounty.created_at, tz=tz)
created_str = dt_created.strftime("%Y-%m-%d %H:%M")
lines.append(f"📌 Created: {created_str}")
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can change timezone.",
reply_markup=reply_markup,
)
return
if not args:
current_tz = BOUNTY_SERVICE.get_timezone(room_id)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Current timezone: {current_tz}",
reply_markup=reply_markup,
)
return
timezone_str = args[0]
try:
ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
await update.message.reply_text(
"⛔ Invalid timezone. Use IANA format (e.g., Asia/Jakarta)"
)
return
try:
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, username)
except PermissionError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Timezone set to {timezone_str}.",
reply_markup=reply_markup,
)
async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
if not args:
deleted_bounties = BOUNTY_SERVICE.list_deleted_bounties(room_id)
if not deleted_bounties:
await update.message.reply_text("No recoverable bounties.")
return
deleted_bounties.sort(key=lambda b: b.deleted_at or 0, reverse=True)
lines = ["Recoverable bounties:"]
for b in deleted_bounties[:10]:
text = (
b.text[:40] + "..."
if b.text and len(b.text) > 40
else (b.text or "(no text)")
)
link_str = f" | 🔗 {b.link}" if b.link else ""
deleted_str = (
time.strftime("%d %b %Y", time.localtime(b.deleted_at))
if b.deleted_at
else "unknown"
)
lines.append(f"[#{b.id}] {text}{link_str} | 🗑️ Deleted {deleted_str}")
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
reply_markup=reply_markup,
)
return
try:
bounty_ids = [int(arg) for arg in args]
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, username)
response_lines = []
for bounty_id, result in results.items():
if result == "recovered":
response_lines.append(f"✅ Recovered bounty #{bounty_id}.")
elif result == "not_found":
response_lines.append(f"⛔ Bounty #{bounty_id} not found.")
elif result == "not_deleted":
response_lines.append(f"⛔ Bounty #{bounty_id} is not deleted.")
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
user_id = get_user_id(update)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(response_lines), reply_markup=reply_markup
)
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
effective_user = update.effective_user
requesting_username = effective_user.username or effective_user.first_name or None
if not args:
if not BOUNTY_SERVICE.is_admin(room_id, requesting_username):
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
admins = BOUNTY_SERVICE.list_admins(room_id)
if not admins:
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"No admins in this room.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
admin_mentions = [f"@{admin}" for admin in admins]
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions),
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
return
if args[0] not in ("add", "remove"):
await update.message.reply_text(
"Usage:\n"
"/admin — list admins\n"
"/admin add @username — add admin\n"
"/admin remove @username — remove admin"
)
return
subcommand = args[0]
if len(args) < 2:
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
raw_username = args[1]
if not raw_username.startswith("@"):
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
target_username = raw_username[1:]
user_id = get_user_id(update)
if not BOUNTY_SERVICE.is_admin(room_id, requesting_username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
try:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if subcommand == "add":
BOUNTY_SERVICE.add_admin(room_id, target_username, requesting_username)
await update.message.reply_text(
f"✅ @{target_username} is now an admin.",
reply_markup=reply_markup,
)
elif subcommand == "remove":
BOUNTY_SERVICE.remove_admin(room_id, target_username, requesting_username)
await update.message.reply_text(
f"✅ @{target_username} is no longer an admin.",
reply_markup=reply_markup,
)
except (PermissionError, ValueError) as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
async def cmd_category(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /category command for listing, adding, and deleting categories."""
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
args = extract_args(update.message.text)
if not args:
# List categories
categories = BOUNTY_SERVICE.list_categories(room_id)
if not categories:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"No categories yet.\n"
"Usage: /category add <slug> <name>",
reply_markup=reply_markup,
)
return
lines = ["Categories:"]
for cat in categories:
lines.append(f"- {cat.id}{cat.name}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
reply_markup=reply_markup,
)
return
subcommand = args[0]
if subcommand == "add":
# Add category
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can add categories.",
reply_markup=reply_markup,
)
return
if len(args) < 3:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Usage: /category add <slug> <name>\n"
"Example: /category add bug 'Bug Report'",
reply_markup=reply_markup,
)
return
slug = args[1].lower()
name = " ".join(args[2:])
try:
category = BOUNTY_SERVICE.add_category(room_id, slug, name, username)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Category added: {category.id}{category.name}",
reply_markup=reply_markup,
)
except PermissionError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
except ValueError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
elif subcommand == "delete":
# Delete category
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can delete categories.",
reply_markup=reply_markup,
)
return
if len(args) < 2:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Usage: /category delete <slug>",
reply_markup=reply_markup,
)
return
slug = args[1].lower()
try:
success = BOUNTY_SERVICE.delete_category(room_id, slug, username)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if success:
await update.message.reply_text(
f"✅ Category '{slug}' deleted.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text(
f"⛔ Category '{slug}' not found.",
reply_markup=reply_markup,
)
except PermissionError as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
else:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Usage:\n"
"/category - list categories\n"
"/category add <slug> <name> - add category (admin)\n"
"/category delete <slug> - delete category (admin)",
reply_markup=reply_markup,
)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
is_admin = BOUNTY_SERVICE.is_admin(room_id, username)
if is_admin:
lines = [
"👻 JIGAIDO Commands (Admin):\n",
"📋 Bounty Management:",
"/bounty — list bounties",
"/add — add bounty",
"/edit — edit bounty",
"/delete — delete bounty",
"/recover — recover deleted bounties",
"",
"📂 Categories:",
"/category — list categories",
"/category add — add category",
"/category delete — delete category",
"",
"🔗 Tracking:",
"/track — track bounty",
"/untrack — stop tracking",
"/my — your tracked bounties",
"/show — show bounty details",
"",
"⚙️ Room Management:",
"/admin — manage admins",
"/timezone — get/set timezone",
"",
"/start — re-initialize",
"/help — show this message",
]
else:
lines = [
"👻 JIGAIDO Commands:\n",
"/bounty — list bounties",
"/track — track bounty",
"/untrack — stop tracking",
"/my — your tracked bounties",
"/show — show bounty details",
"/start — re-initialize",
"/help — show this message",
]
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
reply_markup=reply_markup,
)