Files
jigaido/apps/telegram-bot/commands.py
shokollm 8bb964fdd0 feat: Replace SQLite with per-user JSON storage (fixes #2)
- Add storage.py with load_user(), save_user(), next_bounty_id()
- Rewrite commands.py to use JSON storage (simplified)
- Remove db.py, schema.sql, cron.py, test_db.py
- Update SPEC.md to reflect new architecture
- Admin model removed (anyone can add, creator only can edit/delete)
- No reminders in v1
2026-04-01 10:02:51 +00:00

401 lines
12 KiB
Python

"""Telegram command handlers for JIGAIDO."""
import json
import re
import time
from functools import wraps
import dateparser
from telegram import Update
from telegram.ext import ContextTypes
import storage
TELEGRAM_BOT_USERNAME = "your_bot_username"
REMINDER_WINDOW_DAYS = 7
def extract_args(text: str) -> list[str]:
if not text:
return []
tokens = text.strip().split()
return tokens[1:] if len(tokens) > 1 else []
def parse_args(args: list[str]) -> tuple[str | None, str | None, int | None]:
text = None
link = None
due_date_ts = None
remaining = []
for arg in args:
if not link and (arg.startswith("http://") or arg.startswith("https://")):
link = arg
elif due_date_ts is None:
parsed = dateparser.parse(arg)
if parsed:
due_date_ts = int(parsed.timestamp())
else:
remaining.append(arg)
else:
remaining.append(arg)
text = " ".join(remaining) if remaining else None
return text, link, due_date_ts
def format_bounty(b: dict, show_id: bool = True) -> str:
parts = []
if show_id:
parts.append(f"[#{b['id']}]")
if b["text"]:
parts.append(b["text"])
if b.get("link"):
parts.append(f"🔗 {b['link']}")
if b.get("due_date_ts"):
due_str = time.strftime("%Y-%m-%d", time.localtime(b["due_date_ts"]))
days_left = (b["due_date_ts"] - int(time.time())) // 86400
if days_left < 0:
parts.append(f"{due_str} (OVERDUE)")
elif days_left == 0:
parts.append(f"{due_str} (TODAY)")
else:
parts.append(f"{due_str} ({days_left}d)")
parts.append(f"by @{b.get('informed_by_username', 'unknown')}")
return " | ".join(parts)
def is_group(update: Update) -> bool:
return update.effective_chat.type != "private"
def ensure_user(update: Update) -> dict:
user = update.effective_user
user_data = storage.load_user(user.id)
user_data["username"] = user.username
storage.save_user(user_data)
return user_data
def get_user_by_username(username: str) -> dict | None:
for path in storage.USERS_DIR.glob("*.json"):
with open(path) as f:
data = json.load(f)
if data.get("username") == username:
return data
return None
def is_group_admin_or_creator(update: Update, group_id: int, user_data: dict) -> bool:
return True
def is_group_creator(update: Update, group_id: int, user_data: dict) -> bool:
return True
def get_all_group_bounties(group_id: int) -> list[dict]:
all_bounties = []
for path in storage.USERS_DIR.glob("*.json"):
with open(path) as f:
user_data = json.load(f)
for bounty in user_data.get("personal_bounties", []):
if bounty.get("group_id") == group_id:
bounty["creator_username"] = user_data.get("username")
all_bounties.append(bounty)
return sorted(all_bounties, key=lambda b: b.get("created_at", 0), reverse=True)
def admin_only(func):
@wraps(func)
async def wrapper(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if is_group(update):
await update.message.reply_text("⛔ Admin only.")
return
return await func(update, ctx)
return wrapper
def creator_only(func):
@wraps(func)
async def wrapper(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if is_group(update):
await update.message.reply_text("⛔ Group creator only.")
return
return await func(update, ctx)
return wrapper
async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
if is_group(update):
bounties = get_all_group_bounties(update.effective_chat.id)
else:
user_data = ensure_user(update)
bounties = user_data.get("personal_bounties", [])
if not bounties:
await update.message.reply_text("No bounties yet.")
return
lines = [format_bounty(dict(b), show_id=True) for b in bounties]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
async def cmd_my(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_data = ensure_user(update)
tracked = user_data.get("tracked_bounties", [])
if not tracked:
await update.message.reply_text("You are not tracking any bounties.")
return
bounty_lines = []
for tracked_bounty in tracked:
bounty_id = tracked_bounty.get("bounty_id")
group_id = tracked_bounty.get("group_id")
for path in storage.USERS_DIR.glob("*.json"):
with open(path) as f:
creator_data = json.load(f)
for bounty in creator_data.get("personal_bounties", []):
if bounty.get("id") == bounty_id and bounty.get("group_id") == group_id:
bounty["creator_username"] = creator_data.get("username")
bounty_lines.append(format_bounty(bounty, show_id=True))
break
if not bounty_lines:
await update.message.reply_text("You are not tracking any bounties.")
return
await update.message.reply_text(
"\n".join(bounty_lines), disable_web_page_preview=True
)
async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text(
"Usage: /add <text> [link] [due_date]\n"
"Example: /add Fix the bug https://github.com/foo/bar tomorrow"
)
return
text, link, due_date_ts = parse_args(args)
if not text and not link:
await update.message.reply_text("A bounty needs at least text or a link.")
return
user_data = ensure_user(update)
group_id = None if is_group(update) else None
if is_group(update):
group_id = update.effective_chat.id
informed_by = update.effective_user.username or str(update.effective_user.id)
created_at = int(time.time())
bounty = {
"id": storage.next_bounty_id(user_data),
"text": text,
"link": link,
"due_date_ts": due_date_ts,
"group_id": group_id,
"informed_by_username": informed_by,
"created_at": created_at,
}
user_data.setdefault("personal_bounties", []).append(bounty)
storage.save_user(user_data)
due_str = ""
if due_date_ts:
due_str = f" | Due: {time.strftime('%Y-%m-%d', time.localtime(due_date_ts))}"
await update.message.reply_text(
f"✅ Bounty added (#{bounty['id']}){due_str}",
disable_web_page_preview=True,
)
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
"Usage: /update <bounty_id> [text] [link] [due_date]"
)
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
text, link, due_date_ts = parse_args(args[1:])
if not text and not link and due_date_ts is None:
await update.message.reply_text("Nothing to update.")
return
user_data = ensure_user(update)
group_id = None if is_group(update) else None
if is_group(update):
group_id = update.effective_chat.id
for bounty in user_data.get("personal_bounties", []):
if bounty.get("id") == bounty_id and bounty.get("group_id") == group_id:
if text:
bounty["text"] = text
if link:
bounty["link"] = link
if due_date_ts is not None:
bounty["due_date_ts"] = due_date_ts
storage.save_user(user_data)
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
return
await update.message.reply_text("Bounty not found.")
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /delete <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_data = ensure_user(update)
group_id = None if is_group(update) else None
if is_group(update):
group_id = update.effective_chat.id
for i, bounty in enumerate(user_data.get("personal_bounties", [])):
if bounty.get("id") == bounty_id and bounty.get("group_id") == group_id:
user_data["personal_bounties"].pop(i)
storage.save_user(user_data)
await update.message.reply_text(f"✅ Bounty #{bounty_id} deleted.")
return
await update.message.reply_text("Bounty not found.")
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /track <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
group_id = None
if is_group(update):
group_id = update.effective_chat.id
user_data = ensure_user(update)
for tracked in user_data.get("tracked_bounties", []):
if (
tracked.get("bounty_id") == bounty_id
and tracked.get("group_id") == group_id
):
await update.message.reply_text(f"Already tracking bounty #{bounty_id}.")
return
user_data.setdefault("tracked_bounties", []).append(
{
"bounty_id": bounty_id,
"group_id": group_id,
"created_at": int(time.time()),
}
)
storage.save_user(user_data)
await update.message.reply_text(f"✅ Now tracking bounty #{bounty_id}.")
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /untrack <bounty_id>")
return
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
return
user_data = ensure_user(update)
group_id = None if is_group(update) else None
tracked = user_data.get("tracked_bounties", [])
for i, t in enumerate(tracked):
if t.get("bounty_id") == bounty_id and t.get("group_id") == group_id:
tracked.pop(i)
storage.save_user(user_data)
await update.message.reply_text(f"✅ Untracked bounty #{bounty_id}.")
return
await update.message.reply_text(f"Not tracking bounty #{bounty_id}.")
async def cmd_admin_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"Admin management has been removed in this version."
)
async def cmd_admin_remove(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"Admin management has been removed in this version."
)
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
ensure_user(update)
if is_group(update):
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
)
else:
await update.message.reply_text(
"👻 JIGAIDO activated.\n\n"
"Personal bounty list ready.\n"
"/bounty — list your bounties\n"
"/add — create a bounty\n"
"/my — your tracked bounties"
)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"👻 JIGAIDO Commands:\n\n"
"/bounty — list all bounties\n"
"/my — bounties you're tracking\n"
"/add <text> [link] [due] — add bounty\n"
"/update <id> [text] [link] [due] — update bounty\n"
"/delete <id> — delete bounty\n"
"/track <id> — track a bounty\n"
"/untrack <id> — stop tracking\n"
"/start — re-initialize\n"
"/help — this message",
disable_web_page_preview=True,
)