Files
jigaido/apps/telegram-bot/commands.py
2026-04-04 07:40:59 +00:00

451 lines
14 KiB
Python

"""Telegram command handlers for JIGAIDO - Thin wrappers around core services."""
import time
from datetime import datetime
from functools import wraps
from typing import Optional
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import dateparser
from telegram import Update
from telegram.ext import ContextTypes
from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage
from core.services import BountyService, TrackingService
ROOM_STORAGE = JsonFileRoomStorage()
TRACKING_STORAGE = JsonFileTrackingStorage()
BOUNTY_SERVICE = BountyService(ROOM_STORAGE)
TRACKING_SERVICE = TrackingService(TRACKING_STORAGE, ROOM_STORAGE)
TELEGRAM_BOT_USERNAME = "your_bot_username"
def format_due_date(due_date_ts: int | None, timezone_str: str) -> str:
"""Format due date as human-readable with timezone.
Examples:
No due date: (none shown)
Date only: 4 April 2026
Date + time: 4 April 2026 14:30
With timezone: 4 April 2026 14:30 (Asia/Jakarta)
"""
if not due_date_ts:
return ""
try:
tz = ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
tz = ZoneInfo("UTC")
dt = datetime.fromtimestamp(due_date_ts, tz=tz)
date_str = dt.strftime("%-d %B %Y")
if dt.hour != 0 or dt.minute != 0:
date_str += f" {dt.strftime('%H:%M')}"
date_str += f" ({timezone_str})"
return date_str
def extract_args(text: str) -> list[str]:
if not text:
return []
tokens = text.strip().split()
return tokens[1:] if len(tokens) > 1 else []
def parse_args(
args: list[str],
) -> tuple[Optional[str], Optional[str], Optional[int], bool, bool]:
text = None
link = None
due_date_ts = None
clear_link = False
clear_date = False
i = 0
while i < len(args):
arg = args[i]
if arg == "-link":
if i + 1 < len(args) and (
args[i + 1].startswith("http://") or args[i + 1].startswith("https://")
):
link = args[i + 1]
i += 2
else:
clear_link = True
i += 1
elif arg == "-date":
if i + 1 < len(args):
parsed = dateparser.parse(args[i + 1])
if parsed:
due_date_ts = int(parsed.timestamp())
i += 2
else:
clear_date = True
i += 1
else:
clear_date = True
i += 1
elif not link and (arg.startswith("http://") or arg.startswith("https://")):
link = arg
i += 1
elif due_date_ts is None:
parsed = dateparser.parse(arg)
if parsed:
due_date_ts = int(parsed.timestamp())
i += 1
else:
i += 1
if text is None:
text = arg
else:
text = text + " " + arg
else:
i += 1
if text is None:
text = arg
else:
text = text + " " + arg
return text, link, due_date_ts, clear_link, clear_date
def format_bounty(b, show_id: bool = True, room_id: int | None = None) -> str:
parts = []
if show_id:
parts.append(f"[#{b.id}]")
if b.text:
parts.append(b.text)
if b.link:
parts.append(f"🔗 {b.link}")
if b.due_date_ts:
timezone_str = "UTC"
if room_id is not None:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
due_str = format_due_date(b.due_date_ts, timezone_str)
days_left = (b.due_date_ts - int(time.time())) // 86400
if days_left < 0:
parts.append(f"{due_str} (OVERDUE)")
elif days_left == 0:
parts.append(f"{due_str} (TODAY)")
else:
parts.append(f"{due_str} ({days_left}d)")
if b.created_by_user_id:
parts.append(f"by {b.created_by_user_id}")
return " | ".join(parts)
def is_group(update: Update) -> bool:
return update.effective_chat.type != "private"
def get_group_id(update: Update) -> int:
return update.effective_chat.id
def get_user_id(update: Update) -> int:
return update.effective_user.id
def get_room_id(update: Update) -> int:
"""Get room_id for the current context.
For groups: negative group_id
For DMs: positive user_id
"""
if is_group(update):
return get_group_id(update)
return get_user_id(update)
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update)
bounties = BOUNTY_SERVICE.list_bounties(room_id)
if not bounties:
await update.message.reply_text("No bounties yet.")
return
lines = [format_bounty(b, show_id=True, room_id=room_id) for b in bounties]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if is_group(update):
group_id = get_group_id(update)
bounties = TRACKING_SERVICE.get_tracked_bounties(group_id, user_id)
room_id = group_id
else:
room_id = get_room_id(update)
bounties = BOUNTY_SERVICE.list_bounties(room_id)
if not bounties:
msg = (
"You are not tracking any bounties."
if is_group(update)
else "No personal bounties."
)
await update.message.reply_text(msg)
return
lines = [format_bounty(b, show_id=True, room_id=room_id) for b in bounties]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text(
"Usage: /add <text> [link] [due_date]\n"
"Example: /add Fix the bug https://github.com/foo/bar tomorrow"
)
return
text, link, due_date_ts, _, _ = parse_args(args)
if not text and not link:
await update.message.reply_text("A bounty needs at least text or a link.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
bounty = BOUNTY_SERVICE.add_bounty(
room_id=room_id,
user_id=user_id,
text=text,
link=link,
due_date_ts=due_date_ts,
)
due_str = ""
if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}",
disable_web_page_preview=True,
)
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
"Usage: /update <bounty_id> [text] [link] [due_date]\n"
" /update <bounty_id> -link [<url>] - clear or set link\n"
" /update <bounty_id> -date [<date>] - clear or set date\n"
"Examples:\n"
" /update 1 new text - update text only\n"
" /update 1 -link - clear link\n"
" /update 1 -link https://... - set link\n"
" /update 1 -link -date - clear both link and date"
)
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
text, link, due_date_ts, clear_link, clear_date = parse_args(args[1:])
if (
not text
and not link
and due_date_ts is None
and not clear_link
and not clear_date
):
await update.message.reply_text("Nothing to update.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
try:
success = BOUNTY_SERVICE.update_bounty(
room_id=room_id,
bounty_id=bounty_id,
user_id=user_id,
text=text,
link=link,
due_date_ts=due_date_ts,
clear_link=clear_link,
clear_due=clear_date,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
except ValueError as e:
await update.message.reply_text(f"{e}")
return
if success:
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
else:
await update.message.reply_text("Bounty not found.")
cmd_edit = cmd_update
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /delete <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
try:
success = BOUNTY_SERVICE.delete_bounty(
room_id=room_id,
bounty_id=bounty_id,
user_id=user_id,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
if success:
await update.message.reply_text(f"✅ Bounty #{bounty_id} deleted.")
else:
await update.message.reply_text("Bounty not found.")
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if not is_group(update):
await update.message.reply_text("⛔ /track is only available in groups.")
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /track <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
try:
if TRACKING_SERVICE.track_bounty(room_id, user_id, bounty_id):
await update.message.reply_text(f"✅ Now tracking bounty #{bounty_id}.")
else:
await update.message.reply_text(f"Already tracking bounty #{bounty_id}.")
except ValueError as e:
await update.message.reply_text(str(e))
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if not is_group(update):
await update.message.reply_text("⛔ /untrack is only available in groups.")
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /untrack <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_id = get_user_id(update)
room_id = get_room_id(update)
if TRACKING_SERVICE.untrack_bounty(room_id, user_id, bounty_id):
await update.message.reply_text(f"✅ Untracked bounty #{bounty_id}.")
else:
await update.message.reply_text("Not tracking bounty #{bounty_id}.")
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update):
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
)
else:
await update.message.reply_text(
"👻 JIGAIDO activated.\n\n"
"Personal bounty list ready.\n"
"/bounty — list your bounties\n"
"/add — create a bounty\n"
"/my — your tracked bounties"
)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"👻 JIGAIDO Commands:\n\n"
"/bounty — list all bounties\n"
"/my — bounties you're tracking\n"
"/add <text> [link] [due] — add bounty\n"
"/update <id> [text] [link] [due] — update bounty\n"
"/edit <id> [text] [link] [due] — edit bounty (same as update)\n"
" /edit <id> -link [<url>] — clear or set link\n"
" /edit <id> -date [<date>] — clear or set date\n"
"/delete <id> — delete bounty (admin only)\n"
"/track <id> — track a bounty (groups only)\n"
"/untrack <id> — stop tracking (groups only)\n"
"/timezone [tz] — get/set room timezone (admin only)\n"
"/start — re-initialize\n"
"/help — this message",
disable_web_page_preview=True,
)
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
if not args:
current_tz = BOUNTY_SERVICE.get_timezone(room_id)
await update.message.reply_text(f"Current timezone: {current_tz}")
return
timezone_str = args[0]
try:
ZoneInfo(timezone_str)
except (KeyError, ZoneInfoNotFoundError):
await update.message.reply_text(
"⛔ Invalid timezone. Use IANA format (e.g., Asia/Jakarta)"
)
return
try:
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, user_id)
except PermissionError as e:
await update.message.reply_text(f"{e}")
return
await update.message.reply_text(f"✅ Timezone set to {timezone_str}.")