Files
jigaido/apps/telegram-bot/commands.py
shokollm 8e589fbc47 Implement new storage design per issue #2
- Replace SQLite db module with file-based JSON storage in ~/.jigaido/
- Group bounties: ~/.jigaido/{group_id}/group.json
- User tracking: ~/.jigaido/{group_id}/user_{id}.json
- Personal bounties: ~/.jigaido/user_{id}/user.json
- Anyone can add bounties to groups; only creator can edit/delete
- Bounty IDs are sequential per group, not global
- Fix test mock compatibility issues in format_bounty function
2026-04-02 13:48:52 +00:00

442 lines
14 KiB
Python

"""Telegram command handlers for JIGAIDO."""
import json
import re
import time
from functools import wraps
import dateparser
from telegram import Update
from telegram.ext import ContextTypes
import storage
TELEGRAM_BOT_USERNAME = "your_bot_username"
def extract_args(text: str) -> list[str]:
if not text:
return []
tokens = text.strip().split()
return tokens[1:] if len(tokens) > 1 else []
def parse_args(args: list[str]) -> tuple[str | None, str | None, int | None]:
text = None
link = None
due_date_ts = None
remaining = []
for arg in args:
if not link and (arg.startswith("http://") or arg.startswith("https://")):
link = arg
elif due_date_ts is None:
parsed = dateparser.parse(arg)
if parsed:
due_date_ts = int(parsed.timestamp())
else:
remaining.append(arg)
else:
remaining.append(arg)
text = " ".join(remaining) if remaining else None
return text, link, due_date_ts
def format_bounty(b: dict, show_id: bool = True) -> str:
parts = []
if show_id:
parts.append(f"[#{b['id']}]")
if b["text"] is not None:
parts.append(b["text"])
if b["link"] is not None:
parts.append(f"🔗 {b['link']}")
if b["due_date_ts"] is not None:
due_str = time.strftime("%Y-%m-%d", time.localtime(b["due_date_ts"]))
days_left = (b["due_date_ts"] - int(time.time())) // 86400
if days_left < 0:
parts.append(f"{due_str} (OVERDUE)")
elif days_left == 0:
parts.append(f"{due_str} (TODAY)")
else:
parts.append(f"{due_str} ({days_left}d)")
username = b["created_by_username"]
parts.append(f"by @{username if username is not None else 'unknown'}")
return " | ".join(parts)
def is_group(update: Update) -> bool:
return update.effective_chat.type != "private"
def ensure_user_personal(user_id: int, username: str | None) -> dict:
user_data = storage.load_user_personal(user_id)
user_data["username"] = username
storage.save_user_personal(user_data)
return user_data
def ensure_user_tracking(group_id: int, user_id: int) -> dict:
return storage.load_user_tracking(group_id, user_id)
def is_bounty_creator(bounty: dict, user_id: int) -> bool:
return bounty.get("created_by_user_id") == user_id
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update):
group_id = update.effective_chat.id
group_data = storage.load_group(group_id)
bounties = group_data.get("bounties", [])
else:
user_id = update.effective_user.id
username = update.effective_user.username
user_data = ensure_user_personal(user_id, username)
bounties = user_data.get("bounties", [])
if not bounties:
await update.message.reply_text("No bounties yet.")
return
lines = [format_bounty(dict(b), show_id=True) for b in bounties]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if is_group(update):
group_id = update.effective_chat.id
tracking_data = storage.load_user_tracking(group_id, user_id)
tracked = tracking_data.get("tracked", [])
if not tracked:
await update.message.reply_text("You are not tracking any bounties.")
return
group_data = storage.load_group(group_id)
bounty_map = {b["id"]: b for b in group_data.get("bounties", [])}
bounty_lines = []
for t in tracked:
bounty_id = t.get("bounty_id")
if bounty_id in bounty_map:
bounty_lines.append(format_bounty(bounty_map[bounty_id], show_id=True))
if not bounty_lines:
await update.message.reply_text("You are not tracking any bounties.")
return
await update.message.reply_text(
"\n".join(bounty_lines), disable_web_page_preview=True
)
else:
user_data = storage.load_user_personal(user_id)
bounties = user_data.get("bounties", [])
if not bounties:
await update.message.reply_text("No bounties yet.")
return
lines = [format_bounty(dict(b), show_id=True) for b in bounties]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text(
"Usage: /add <text> [link] [due_date]\n"
"Example: /add Fix the bug https://github.com/foo/bar tomorrow"
)
return
text, link, due_date_ts = parse_args(args)
if not text and not link:
await update.message.reply_text("A bounty needs at least text or a link.")
return
user_id = update.effective_user.id
username = update.effective_user.username or str(user_id)
created_at = int(time.time())
if is_group(update):
group_id = update.effective_chat.id
group_data = storage.load_group(group_id)
bounty = {
"id": storage.next_bounty_id(group_data),
"text": text,
"link": link,
"due_date_ts": due_date_ts,
"created_by_user_id": user_id,
"created_by_username": username,
"created_at": created_at,
}
group_data.setdefault("bounties", []).append(bounty)
storage.save_group(group_data)
else:
user_data = ensure_user_personal(user_id, username)
bounty = {
"id": storage.next_personal_bounty_id(user_data),
"text": text,
"link": link,
"due_date_ts": due_date_ts,
"created_by_user_id": user_id,
"created_by_username": username,
"created_at": created_at,
}
user_data.setdefault("bounties", []).append(bounty)
storage.save_user_personal(user_data)
due_str = ""
if due_date_ts:
due_str = f" | Due: {time.strftime('%Y-%m-%d', time.localtime(due_date_ts))}"
await update.message.reply_text(
f"✅ Bounty added{due_str}",
disable_web_page_preview=True,
)
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
"Usage: /update <bounty_id> [text] [link] [due_date]"
)
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
text, link, due_date_ts = parse_args(args[1:])
if not text and not link and due_date_ts is None:
await update.message.reply_text("Nothing to update.")
return
user_id = update.effective_user.id
if is_group(update):
group_id = update.effective_chat.id
group_data = storage.load_group(group_id)
for bounty in group_data.get("bounties", []):
if bounty.get("id") == bounty_id:
if not is_bounty_creator(bounty, user_id):
await update.message.reply_text(
"⛔ Only the creator can edit this bounty."
)
return
if text:
bounty["text"] = text
if link:
bounty["link"] = link
if due_date_ts is not None:
bounty["due_date_ts"] = due_date_ts
storage.save_group(group_data)
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
return
await update.message.reply_text("Bounty not found.")
else:
user_data = storage.load_user_personal(user_id)
for bounty in user_data.get("bounties", []):
if bounty.get("id") == bounty_id:
if text:
bounty["text"] = text
if link:
bounty["link"] = link
if due_date_ts is not None:
bounty["due_date_ts"] = due_date_ts
storage.save_user_personal(user_data)
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
return
await update.message.reply_text("Bounty not found.")
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /delete <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_id = update.effective_user.id
if is_group(update):
group_id = update.effective_chat.id
group_data = storage.load_group(group_id)
for i, bounty in enumerate(group_data.get("bounties", [])):
if bounty.get("id") == bounty_id:
if not is_bounty_creator(bounty, user_id):
await update.message.reply_text(
"⛔ Only the creator can delete this bounty."
)
return
group_data["bounties"].pop(i)
storage.save_group(group_data)
await update.message.reply_text(f"✅ Bounty #{bounty_id} deleted.")
return
await update.message.reply_text("Bounty not found.")
else:
user_data = storage.load_user_personal(user_id)
for i, bounty in enumerate(user_data.get("bounties", [])):
if bounty.get("id") == bounty_id:
user_data["bounties"].pop(i)
storage.save_user_personal(user_data)
await update.message.reply_text(f"✅ Bounty #{bounty_id} deleted.")
return
await update.message.reply_text("Bounty not found.")
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /track <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
if not is_group(update):
await update.message.reply_text("Tracking is only available in groups.")
return
group_id = update.effective_chat.id
user_id = update.effective_user.id
group_data = storage.load_group(group_id)
bounty_exists = any(
b.get("id") == bounty_id for b in group_data.get("bounties", [])
)
if not bounty_exists:
await update.message.reply_text("Bounty not found in this group.")
return
tracking_data = storage.load_user_tracking(group_id, user_id)
for t in tracking_data.get("tracked", []):
if t.get("bounty_id") == bounty_id:
await update.message.reply_text(f"Already tracking bounty #{bounty_id}.")
return
tracking_data.setdefault("tracked", []).append(
{
"bounty_id": bounty_id,
"tracked_at": int(time.time()),
}
)
storage.save_user_tracking(tracking_data, group_id)
await update.message.reply_text(f"✅ Now tracking bounty #{bounty_id}.")
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /untrack <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
if not is_group(update):
await update.message.reply_text("Tracking is only available in groups.")
return
group_id = update.effective_chat.id
user_id = update.effective_user.id
tracking_data = storage.load_user_tracking(group_id, user_id)
tracked = tracking_data.get("tracked", [])
for i, t in enumerate(tracked):
if t.get("bounty_id") == bounty_id:
tracked.pop(i)
tracking_data["tracked"] = tracked
storage.save_user_tracking(tracking_data, group_id)
await update.message.reply_text(f"✅ Untracked bounty #{bounty_id}.")
return
await update.message.reply_text(f"Not tracking bounty #{bounty_id}.")
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
username = update.effective_user.username
if is_group(update):
group_id = update.effective_chat.id
storage.load_group(group_id)
storage.load_user_tracking(group_id, user_id)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
)
else:
ensure_user_personal(user_id, username)
await update.message.reply_text(
"👻 JIGAIDO activated.\n\n"
"Personal bounty list ready.\n"
"/bounty — list your bounties\n"
"/add — create a bounty\n"
"/my — your tracked bounties"
)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update):
await update.message.reply_text(
"👻 JIGAIDO Commands:\n\n"
"/bounty — list all bounties\n"
"/my — bounties you're tracking\n"
"/add <text> [link] [due] — add bounty\n"
"/update <id> [text] [link] [due] — update bounty (creator only)\n"
"/delete <id> — delete bounty (creator only)\n"
"/track <id> — track a bounty\n"
"/untrack <id> — stop tracking\n"
"/start — re-initialize\n"
"/help — this message",
disable_web_page_preview=True,
)
else:
await update.message.reply_text(
"👻 JIGAIDO Commands:\n\n"
"/bounty — list your bounties\n"
"/add <text> [link] [due] — add bounty\n"
"/update <id> [text] [link] [due] — update bounty\n"
"/delete <id> — delete bounty\n"
"/my — your tracked bounties\n"
"/start — re-initialize\n"
"/help — this message",
disable_web_page_preview=True,
)