When /start is called in a group, check if user is the group creator and automatically add them as admin. DMs don't need admin concept.
719 lines
23 KiB
Python
719 lines
23 KiB
Python
"""Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
|
|
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from functools import wraps
|
|
from typing import Optional
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
|
|
import dateparser
|
|
from telegram import Update
|
|
from telegram.ext import ContextTypes
|
|
|
|
from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage
|
|
from core.services import BountyService, TrackingService
|
|
|
|
ROOM_STORAGE = JsonFileRoomStorage()
|
|
TRACKING_STORAGE = JsonFileTrackingStorage()
|
|
BOUNTY_SERVICE = BountyService(ROOM_STORAGE)
|
|
TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE)
|
|
|
|
TELEGRAM_BOT_USERNAME = "your_bot_username"
|
|
|
|
|
|
def format_due_date(due_date_ts: int | None, timezone_str: str) -> str:
|
|
"""Format due date as human-readable with timezone.
|
|
|
|
Examples:
|
|
No due date: (none shown)
|
|
Date only: 4 April 2026
|
|
Date + time: 4 April 2026 14:30
|
|
With timezone: 4 April 2026 14:30 (Asia/Jakarta)
|
|
"""
|
|
if not due_date_ts:
|
|
return ""
|
|
|
|
try:
|
|
tz = ZoneInfo(timezone_str)
|
|
except (KeyError, ZoneInfoNotFoundError):
|
|
tz = ZoneInfo("UTC")
|
|
|
|
dt = datetime.fromtimestamp(due_date_ts, tz=tz)
|
|
|
|
date_str = dt.strftime("%-d %B %Y")
|
|
|
|
if dt.hour != 0 or dt.minute != 0:
|
|
date_str += f" {dt.strftime('%H:%M')}"
|
|
date_str += f" ({timezone_str})"
|
|
|
|
return date_str
|
|
|
|
|
|
def extract_args(text: str) -> list[str]:
|
|
if not text:
|
|
return []
|
|
tokens = text.strip().split()
|
|
return tokens[1:] if len(tokens) > 1 else []
|
|
|
|
|
|
def parse_args(
|
|
args: list[str],
|
|
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool]:
|
|
text = None
|
|
link = None
|
|
due_date_ts = None
|
|
clear_link = False
|
|
clear_date = False
|
|
|
|
i = 0
|
|
while i < len(args):
|
|
arg = args[i]
|
|
|
|
if arg == "-link":
|
|
if i + 1 < len(args) and (
|
|
args[i + 1].startswith("http://") or args[i + 1].startswith("https://")
|
|
):
|
|
link = args[i + 1]
|
|
i += 2
|
|
else:
|
|
clear_link = True
|
|
i += 1
|
|
elif arg == "-date":
|
|
if i + 1 < len(args):
|
|
parsed = dateparser.parse(args[i + 1])
|
|
if parsed:
|
|
due_date_ts = int(parsed.timestamp())
|
|
i += 2
|
|
else:
|
|
clear_date = True
|
|
i += 1
|
|
else:
|
|
clear_date = True
|
|
i += 1
|
|
elif not link and (arg.startswith("http://") or arg.startswith("https://")):
|
|
link = arg
|
|
i += 1
|
|
elif due_date_ts is None:
|
|
parsed = dateparser.parse(arg)
|
|
if parsed:
|
|
due_date_ts = int(parsed.timestamp())
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
if text is None:
|
|
text = arg
|
|
else:
|
|
text = text + " " + arg
|
|
else:
|
|
i += 1
|
|
if text is None:
|
|
text = arg
|
|
else:
|
|
text = text + " " + arg
|
|
|
|
return text, link, due_date_ts, clear_link, clear_date
|
|
|
|
|
|
def format_bounty(b, show_id: bool = True, slice_length: int = 0) -> str:
|
|
parts = []
|
|
if show_id:
|
|
parts.append(f"[#{b.id}]")
|
|
if b.text:
|
|
text = b.text
|
|
if slice_length > 0 and len(text) > slice_length:
|
|
text = text[:slice_length] + "..."
|
|
parts.append(text)
|
|
if b.link:
|
|
parts.append(f"🔗 {b.link}")
|
|
if b.due_date_ts:
|
|
due_str = time.strftime("%d %b %Y", time.localtime(b.due_date_ts))
|
|
days_left = (b.due_date_ts - int(time.time())) // 86400
|
|
if days_left < 0:
|
|
parts.append(f"⏰ {due_str} (OVERDUE)")
|
|
elif days_left == 0:
|
|
parts.append(f"⏰ Today (OVERDUE)")
|
|
else:
|
|
parts.append(f"⏰ {due_str} ({days_left}d)")
|
|
return " | ".join(parts)
|
|
|
|
|
|
def is_group(update: Update) -> bool:
|
|
return update.effective_chat.type != "private"
|
|
|
|
|
|
def get_group_id(update: Update) -> int:
|
|
return update.effective_chat.id
|
|
|
|
|
|
def get_user_id(update: Update) -> int:
|
|
return update.effective_user.id
|
|
|
|
|
|
def get_room_id(update: Update) -> int:
|
|
"""Get room_id for the current context.
|
|
|
|
For groups: negative group_id
|
|
For DMs: positive user_id
|
|
"""
|
|
if is_group(update):
|
|
return get_group_id(update)
|
|
return get_user_id(update)
|
|
|
|
|
|
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
room_id = get_room_id(update)
|
|
args = extract_args(update.message.text)
|
|
|
|
show_all = "all" in args
|
|
args = [a for a in args if a != "all"]
|
|
|
|
try:
|
|
limit = int(args[0]) if args else 5
|
|
except (ValueError, IndexError):
|
|
limit = 5
|
|
|
|
now = int(time.time())
|
|
cutoff_24h = now - 86400
|
|
|
|
all_bounties = BOUNTY_SERVICE.list_bounties(room_id)
|
|
|
|
def is_expired(b) -> bool:
|
|
return b.due_date_ts is not None and b.due_date_ts < cutoff_24h
|
|
|
|
def sort_key(b):
|
|
if b.due_date_ts is not None:
|
|
return (0, b.due_date_ts)
|
|
return (1, b.created_at)
|
|
|
|
filtered_bounties = [b for b in all_bounties if not is_expired(b) or show_all]
|
|
filtered_bounties.sort(key=sort_key)
|
|
|
|
total_count = len(filtered_bounties)
|
|
displayed_bounties = filtered_bounties[:limit]
|
|
|
|
if not displayed_bounties:
|
|
if show_all:
|
|
await update.message.reply_text("No bounties yet.")
|
|
else:
|
|
await update.message.reply_text(
|
|
"No active bounties. Use /bounty all to show expired."
|
|
)
|
|
return
|
|
|
|
lines = []
|
|
if limit < total_count:
|
|
lines.append(f"Showing {limit} of {total_count} bounties:")
|
|
slice_length = 40
|
|
elif show_all and total_count > limit:
|
|
lines.append(f"Showing {limit} of {total_count} bounties (including expired):")
|
|
slice_length = 40
|
|
else:
|
|
lines.append(f"Showing {total_count} bounties:")
|
|
slice_length = 0
|
|
|
|
for b in displayed_bounties:
|
|
lines.append(format_bounty(b, show_id=True, slice_length=slice_length))
|
|
|
|
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
|
|
|
|
|
|
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
user_id = get_user_id(update)
|
|
|
|
if is_group(update):
|
|
group_id = get_group_id(update)
|
|
bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id)
|
|
room_id = group_id
|
|
else:
|
|
room_id = get_room_id(update)
|
|
bounties = BOUNTY_SERVICE.list_bounties(room_id)
|
|
|
|
if not bounties:
|
|
msg = (
|
|
"You are not tracking any bounties."
|
|
if is_group(update)
|
|
else "No personal bounties."
|
|
)
|
|
await update.message.reply_text(msg)
|
|
return
|
|
|
|
lines = [format_bounty(b, show_id=True) for b in bounties]
|
|
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
|
|
|
|
|
|
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
if not args:
|
|
await update.message.reply_text(
|
|
"Usage: /add <text> [link] [due_date]\n"
|
|
"Example: /add Fix the bug https://github.com/foo/bar tomorrow"
|
|
)
|
|
return
|
|
|
|
text, link, due_date_ts, _, _ = parse_args(args)
|
|
if not text and not link:
|
|
await update.message.reply_text("A bounty needs at least text or a link.")
|
|
return
|
|
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
try:
|
|
bounty = BOUNTY_SERVICE.add_bounty(
|
|
room_id=room_id,
|
|
user_id=user_id,
|
|
text=text,
|
|
link=link,
|
|
due_date_ts=due_date_ts,
|
|
)
|
|
except PermissionError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
return
|
|
except ValueError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
return
|
|
|
|
due_str = ""
|
|
if due_date_ts:
|
|
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
|
|
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
|
|
await update.message.reply_text(
|
|
f"✅ Bounty added (#{bounty.id}){due_str}",
|
|
disable_web_page_preview=True,
|
|
)
|
|
|
|
|
|
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
if len(args) < 1:
|
|
await update.message.reply_text(
|
|
"Usage: /update <bounty_id> [text] [link] [due_date]\n"
|
|
" /update <bounty_id> -link [<url>] - clear or set link\n"
|
|
" /update <bounty_id> -date [<date>] - clear or set date\n"
|
|
"Examples:\n"
|
|
" /update 1 new text - update text only\n"
|
|
" /update 1 -link - clear link\n"
|
|
" /update 1 -link https://... - set link\n"
|
|
" /update 1 -link -date - clear both link and date"
|
|
)
|
|
return
|
|
|
|
try:
|
|
bounty_id = int(args[0])
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid bounty ID.")
|
|
return
|
|
|
|
text, link, due_date_ts, clear_link, clear_date = parse_args(args[1:])
|
|
if (
|
|
not text
|
|
and not link
|
|
and due_date_ts is None
|
|
and not clear_link
|
|
and not clear_date
|
|
):
|
|
await update.message.reply_text("Nothing to update.")
|
|
return
|
|
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
try:
|
|
success = BOUNTY_SERVICE.update_bounty(
|
|
room_id=room_id,
|
|
bounty_id=bounty_id,
|
|
user_id=user_id,
|
|
text=text,
|
|
link=link,
|
|
due_date_ts=due_date_ts,
|
|
clear_link=clear_link,
|
|
clear_due=clear_date,
|
|
)
|
|
except PermissionError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
return
|
|
except ValueError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
return
|
|
|
|
if success:
|
|
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
|
|
else:
|
|
await update.message.reply_text("Bounty not found.")
|
|
|
|
|
|
cmd_edit = cmd_update
|
|
|
|
|
|
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
if not args:
|
|
await update.message.reply_text("Usage: /delete <bounty_id> [bounty_id ...]")
|
|
return
|
|
|
|
try:
|
|
bounty_ids = [int(arg) for arg in args]
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid bounty ID.")
|
|
return
|
|
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
try:
|
|
results = BOUNTY_SERVICE.delete_bounties(
|
|
room_id=room_id,
|
|
bounty_ids=bounty_ids,
|
|
user_id=user_id,
|
|
)
|
|
except PermissionError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
return
|
|
|
|
response_lines = []
|
|
for bounty_id, result in results.items():
|
|
if result == "deleted":
|
|
response_lines.append(f"✅ Bounty #{bounty_id} deleted.")
|
|
elif result == "not_found":
|
|
response_lines.append(f"⛔ Bounty #{bounty_id} not found.")
|
|
elif result == "permission_denied":
|
|
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
|
|
|
|
await update.message.reply_text("\n".join(response_lines))
|
|
|
|
|
|
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if not is_group(update):
|
|
await update.message.reply_text("⛔ /track is only available in groups.")
|
|
return
|
|
|
|
args = extract_args(update.message.text)
|
|
if not args:
|
|
await update.message.reply_text("Usage: /track <bounty_id>")
|
|
return
|
|
|
|
try:
|
|
bounty_id = int(args[0])
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid bounty ID.")
|
|
return
|
|
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
try:
|
|
if TRACKING_SERVICE.track_bounty(room_id, user_id, bounty_id):
|
|
await update.message.reply_text(f"✅ Now tracking bounty #{bounty_id}.")
|
|
else:
|
|
await update.message.reply_text(f"Already tracking bounty #{bounty_id}.")
|
|
except ValueError as e:
|
|
await update.message.reply_text(str(e))
|
|
|
|
|
|
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
if not is_group(update):
|
|
await update.message.reply_text("⛔ /untrack is only available in groups.")
|
|
return
|
|
|
|
args = extract_args(update.message.text)
|
|
if not args:
|
|
await update.message.reply_text("Usage: /untrack <bounty_id>")
|
|
return
|
|
|
|
try:
|
|
bounty_id = int(args[0])
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid bounty ID.")
|
|
return
|
|
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
if TRACKING_SERVICE.untrack_bounty(room_id, user_id, bounty_id):
|
|
await update.message.reply_text(f"✅ Untracked bounty #{bounty_id}.")
|
|
else:
|
|
await update.message.reply_text("Not tracking bounty #{bounty_id}.")
|
|
|
|
|
|
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
if is_group(update):
|
|
try:
|
|
chat_member = await ctx.bot.get_chat_member(room_id, user_id)
|
|
if chat_member.status == "creator" and not BOUNTY_SERVICE.is_admin(
|
|
room_id, user_id
|
|
):
|
|
BOUNTY_SERVICE.add_admin(room_id, user_id, user_id)
|
|
await update.message.reply_text(
|
|
"👻 JIGAIDO is watching.\n\n"
|
|
"This group's bounty list is now active.\n"
|
|
"You are now an admin (group creator).\n"
|
|
"/bounty — list bounties\n"
|
|
"/add — create a bounty\n"
|
|
"/track — track a bounty\n"
|
|
"/my — your tracked bounties"
|
|
)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
await update.message.reply_text(
|
|
"👻 JIGAIDO is watching.\n\n"
|
|
"This group's bounty list is now active.\n"
|
|
"/bounty — list bounties\n"
|
|
"/add — create a bounty\n"
|
|
"/track — track a bounty\n"
|
|
"/my — your tracked bounties"
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
"👻 JIGAIDO activated.\n\n"
|
|
"Personal bounty list ready.\n"
|
|
"/bounty — list your bounties\n"
|
|
"/add — create a bounty\n"
|
|
"/my — your tracked bounties"
|
|
)
|
|
|
|
|
|
async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
if not args:
|
|
await update.message.reply_text("Usage: /show <bounty_id>")
|
|
return
|
|
|
|
try:
|
|
bounty_id = int(args[0])
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid bounty ID.")
|
|
return
|
|
|
|
room_id = get_room_id(update)
|
|
bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
|
|
|
|
if not bounty:
|
|
await update.message.reply_text("Bounty not found.")
|
|
return
|
|
|
|
timezone = BOUNTY_SERVICE.get_timezone(room_id)
|
|
|
|
lines = []
|
|
|
|
title = bounty.text or "(no text)"
|
|
lines.append(f"[#{bounty.id}] {title}")
|
|
|
|
if bounty.link:
|
|
lines.append(f"🔗 {bounty.link}")
|
|
|
|
if bounty.due_date_ts:
|
|
due_str = time.strftime("%d %B %Y %H:%M", time.localtime(bounty.due_date_ts))
|
|
lines.append(f"📅 {due_str} ({timezone})")
|
|
|
|
username = bounty.created_by_username or f"user#{bounty.created_by_user_id}"
|
|
lines.append(f"👤 @{username}")
|
|
|
|
created_str = time.strftime("%Y-%m-%d %H:%M", time.localtime(bounty.created_at))
|
|
lines.append(f"📌 Created: {created_str}")
|
|
|
|
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
|
|
|
|
|
|
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
room_id = get_room_id(update)
|
|
user_id = get_user_id(update)
|
|
|
|
if not args:
|
|
current_tz = BOUNTY_SERVICE.get_timezone(room_id)
|
|
await update.message.reply_text(f"Current timezone: {current_tz}")
|
|
return
|
|
|
|
timezone_str = args[0]
|
|
|
|
try:
|
|
ZoneInfo(timezone_str)
|
|
except (KeyError, ZoneInfoNotFoundError):
|
|
await update.message.reply_text(
|
|
"⛔ Invalid timezone. Use IANA format (e.g., Asia/Jakarta)"
|
|
)
|
|
return
|
|
|
|
try:
|
|
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, user_id)
|
|
except PermissionError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
return
|
|
|
|
await update.message.reply_text(f"✅ Timezone set to {timezone_str}.")
|
|
|
|
|
|
async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
room_id = get_room_id(update)
|
|
user_id = get_user_id(update)
|
|
|
|
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
|
|
await update.message.reply_text("⛔ Only admins can perform this action.")
|
|
return
|
|
|
|
if not args:
|
|
deleted_bounties = BOUNTY_SERVICE.list_deleted_bounties(room_id)
|
|
if not deleted_bounties:
|
|
await update.message.reply_text("No recoverable bounties.")
|
|
return
|
|
|
|
deleted_bounties.sort(key=lambda b: b.deleted_at or 0, reverse=True)
|
|
|
|
lines = ["Recoverable bounties:"]
|
|
for b in deleted_bounties[:10]:
|
|
text = (
|
|
b.text[:40] + "..."
|
|
if b.text and len(b.text) > 40
|
|
else (b.text or "(no text)")
|
|
)
|
|
link_str = f" | 🔗 {b.link}" if b.link else ""
|
|
deleted_str = (
|
|
time.strftime("%d %b %Y", time.localtime(b.deleted_at))
|
|
if b.deleted_at
|
|
else "unknown"
|
|
)
|
|
lines.append(f"[#{b.id}] {text}{link_str} | 🗑️ Deleted {deleted_str}")
|
|
|
|
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
|
|
return
|
|
|
|
try:
|
|
bounty_ids = [int(arg) for arg in args]
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid bounty ID.")
|
|
return
|
|
|
|
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, user_id)
|
|
|
|
response_lines = []
|
|
for bounty_id, result in results.items():
|
|
if result == "recovered":
|
|
response_lines.append(f"✅ Recovered bounty #{bounty_id}.")
|
|
elif result == "not_found":
|
|
response_lines.append(f"⛔ Bounty #{bounty_id} not found.")
|
|
elif result == "not_deleted":
|
|
response_lines.append(f"⛔ Bounty #{bounty_id} is not deleted.")
|
|
elif result == "permission_denied":
|
|
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
|
|
|
|
await update.message.reply_text("\n".join(response_lines))
|
|
|
|
|
|
def _find_user_id_by_username(room_id: int, username: str) -> int | None:
|
|
"""Find user_id by username from bounty creators in the room."""
|
|
bounties = BOUNTY_SERVICE.list_bounties(room_id)
|
|
for bounty in bounties:
|
|
if (
|
|
bounty.created_by_username
|
|
and bounty.created_by_username.lower() == username.lower()
|
|
):
|
|
return bounty.created_by_user_id
|
|
return None
|
|
|
|
|
|
def _find_username_by_user_id(room_id: int, user_id: int) -> str | None:
|
|
"""Find username by user_id from bounty creators in the room."""
|
|
bounties = BOUNTY_SERVICE.list_bounties(room_id)
|
|
for bounty in bounties:
|
|
if bounty.created_by_user_id == user_id:
|
|
return bounty.created_by_username
|
|
return None
|
|
|
|
|
|
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
args = extract_args(update.message.text)
|
|
if not args:
|
|
admins = BOUNTY_SERVICE.list_admins(get_room_id(update))
|
|
if not admins:
|
|
await update.message.reply_text("No admins in this room.")
|
|
return
|
|
admin_mentions = []
|
|
for admin_id in admins:
|
|
username = _find_username_by_user_id(get_room_id(update), admin_id)
|
|
if username:
|
|
admin_mentions.append(f"@{username}")
|
|
else:
|
|
admin_mentions.append(f"user#{admin_id}")
|
|
await update.message.reply_text(
|
|
f"Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions)
|
|
)
|
|
return
|
|
|
|
if args[0] not in ("add", "remove"):
|
|
await update.message.reply_text(
|
|
"Usage:\n"
|
|
"/admin — list admins\n"
|
|
"/admin add @username — add admin\n"
|
|
"/admin remove @username — remove admin"
|
|
)
|
|
return
|
|
|
|
subcommand = args[0]
|
|
|
|
if len(args) < 2:
|
|
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
|
|
return
|
|
|
|
raw_username = args[1]
|
|
if not raw_username.startswith("@"):
|
|
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
|
|
return
|
|
|
|
username = raw_username[1:]
|
|
|
|
user_id = get_user_id(update)
|
|
room_id = get_room_id(update)
|
|
|
|
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
|
|
await update.message.reply_text("⛔ Only admins can perform this action.")
|
|
return
|
|
|
|
target_user_id = _find_user_id_by_username(room_id, username)
|
|
|
|
if target_user_id is None:
|
|
await update.message.reply_text(f"⛔ User @{username} not found.")
|
|
return
|
|
|
|
try:
|
|
if subcommand == "add":
|
|
BOUNTY_SERVICE.add_admin(room_id, target_user_id, user_id)
|
|
await update.message.reply_text(f"✅ @{username} is now an admin.")
|
|
elif subcommand == "remove":
|
|
BOUNTY_SERVICE.remove_admin(room_id, target_user_id, user_id)
|
|
await update.message.reply_text(f"✅ @{username} is no longer an admin.")
|
|
except PermissionError as e:
|
|
await update.message.reply_text(f"⛔ {e}")
|
|
|
|
|
|
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
|
|
await update.message.reply_text(
|
|
"👻 JIGAIDO Commands:\n\n"
|
|
"/bounty — list all bounties\n"
|
|
"/my — bounties you're tracking\n"
|
|
"/add <text> [link] [due] — add bounty (admin only)\n"
|
|
"/update <id> [text> [link] [due] — update bounty (admin only)\n"
|
|
"/edit <id> [text> [link] [due] — edit bounty (same as update)\n"
|
|
" /edit <id> -link [<url>] — clear or set link\n"
|
|
" /edit <id> -date [<date>] — clear or set date\n"
|
|
"/delete <id> [<id>...] — delete bounty (admin only)\n"
|
|
"/track <id> — track a bounty (groups only)\n"
|
|
"/untrack <id> — stop tracking (groups only)\n"
|
|
"/show <id> — show bounty details\n"
|
|
"/admin — list admins\n"
|
|
"/admin add @username — add admin (admin only)\n"
|
|
"/admin remove @username — remove admin (admin only)\n"
|
|
"/timezone — get room timezone\n"
|
|
"/timezone <tz> — set room timezone (admin only)\n"
|
|
"/recover — list recoverable bounties (admin only)\n"
|
|
"/recover <id> [<id>...] — recover bounty (admin only)\n"
|
|
"/start — re-initialize\n"
|
|
"/help — this message",
|
|
disable_web_page_preview=True,
|
|
)
|