"""SQLite database wrapper for JIGAIDO.""" import sqlite3 import time from pathlib import Path from typing import Optional DB_PATH = Path(__file__).parent / "jigaido.db" def get_conn() -> sqlite3.Connection: # isolation_level=None enables autocommit mode. # row_factory disables SQLite Python's implicit transaction management, # so we need explicit autocommit to make writes work correctly. conn = sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES) conn.isolation_level = None conn.execute("PRAGMA foreign_keys = ON") conn.row_factory = sqlite3.Row return conn def _row_to_dict(row: sqlite3.Row) -> dict: return dict(row) def init_db() -> None: schema = (Path(__file__).parent / "schema.sql").read_text() with get_conn() as conn: conn.executescript(schema) # ── Users ────────────────────────────────────────────────────────────────── def upsert_user(telegram_user_id: int, username: str | None) -> int: with get_conn() as conn: cur = conn.execute( """INSERT INTO users (telegram_user_id, username) VALUES (?, ?) ON CONFLICT (telegram_user_id) DO UPDATE SET username = excluded.username RETURNING id""", (telegram_user_id, username), ) return cur.fetchone()["id"] def get_user_by_telegram_id(telegram_user_id: int) -> Optional[dict]: with get_conn() as conn: row = conn.execute( "SELECT * FROM users WHERE telegram_user_id = ?", (telegram_user_id,), ).fetchone() return _row_to_dict(row) if row else None # ── Groups ───────────────────────────────────────────────────────────────── def upsert_group(telegram_chat_id: int, creator_user_id: int) -> int: """Insert group if not exists. Returns group id.""" with get_conn() as conn: cur = conn.execute( """INSERT INTO groups (telegram_chat_id, creator_user_id) VALUES (?, ?) ON CONFLICT (telegram_chat_id) DO UPDATE SET creator_user_id = excluded.creator_user_id WHERE groups.creator_user_id IS NULL OR groups.creator_user_id = excluded.creator_user_id RETURNING id""", (telegram_chat_id, creator_user_id), ) return cur.fetchone()["id"] def get_group(telegram_chat_id: int) -> Optional[dict]: with get_conn() as conn: row = conn.execute( "SELECT * FROM groups WHERE telegram_chat_id = ?", (telegram_chat_id,), ).fetchone() return _row_to_dict(row) if row else None def get_group_creator_user_id(group_id: int) -> Optional[int]: with get_conn() as conn: row = conn.execute( "SELECT creator_user_id FROM groups WHERE id = ?", (group_id,), ).fetchone() return row["creator_user_id"] if row else None # ── Group Admins ──────────────────────────────────────────────────────────── def add_group_admin(group_id: int, user_id: int) -> bool: """Add user as admin. Returns True if newly added, False if already admin.""" with get_conn() as conn: try: conn.execute( "INSERT INTO group_admins (group_id, user_id) VALUES (?, ?)", (group_id, user_id), ) return True except sqlite3.IntegrityError: return False def remove_group_admin(group_id: int, user_id: int) -> bool: """Remove user from admins. Returns True if removed, False if not an admin.""" with get_conn() as conn: cur = conn.execute( "DELETE FROM group_admins WHERE group_id = ? AND user_id = ?", (group_id, user_id), ) return cur.rowcount > 0 def is_group_admin(group_id: int, user_id: int) -> bool: with get_conn() as conn: row = conn.execute( "SELECT 1 FROM group_admins WHERE group_id = ? AND user_id = ?", (group_id, user_id), ).fetchone() return row is not None def is_group_creator(group_id: int, user_id: int) -> bool: return get_group_creator_user_id(group_id) == user_id def get_user_by_username(username: str) -> Optional[dict]: """Look up user by username (without @).""" with get_conn() as conn: row = conn.execute( "SELECT * FROM users WHERE username = ?", (username,), ).fetchone() return _row_to_dict(row) if row else None # ── Bounties ──────────────────────────────────────────────────────────────── def add_bounty( group_id: int | None, created_by_user_id: int, informed_by_username: str, text: str | None, link: str | None, due_date_ts: int | None, ) -> int: """Add a bounty. Returns bounty id. Raises ValueError on duplicate link.""" with get_conn() as conn: try: cur = conn.execute( """INSERT INTO bounties (group_id, created_by_user_id, informed_by_username, text, link, due_date_ts) VALUES (?, ?, ?, ?, ?, ?) RETURNING id""", (group_id, created_by_user_id, informed_by_username, text, link, due_date_ts), ) return cur.fetchone()["id"] except sqlite3.IntegrityError as e: if "UNIQUE" in str(e) and "link" in str(e): raise ValueError(f"Link already exists in this group: {link}") raise def get_bounty(bounty_id: int) -> Optional[dict]: with get_conn() as conn: row = conn.execute("SELECT * FROM bounties WHERE id = ?", (bounty_id,)).fetchone() return _row_to_dict(row) if row else None def get_group_bounties(group_id: int) -> list[dict]: with get_conn() as conn: return [_row_to_dict(r) for r in conn.execute( "SELECT * FROM bounties WHERE group_id = ? ORDER BY created_at DESC", (group_id,), )] def get_user_personal_bounties(user_id: int) -> list[dict]: """Bounties created by user in DM (group_id IS NULL).""" with get_conn() as conn: return [_row_to_dict(r) for r in conn.execute( "SELECT * FROM bounties WHERE group_id IS NULL AND created_by_user_id = ? ORDER BY created_at DESC", (user_id,), )] def update_bounty( bounty_id: int, text: str | None, link: str | None, due_date_ts: int | None, ) -> bool: """Update bounty fields. Returns True if updated. Raises ValueError on duplicate link.""" with get_conn() as conn: try: cur = conn.execute( """UPDATE bounties SET text = COALESCE(?, text), link = COALESCE(?, link), due_date_ts = COALESCE(?, due_date_ts) WHERE id = ?""", (text, link, due_date_ts, bounty_id), ) return cur.rowcount > 0 except sqlite3.IntegrityError as e: if "UNIQUE" in str(e) and "link" in str(e): raise ValueError(f"Link already exists in this group: {link}") raise def delete_bounty(bounty_id: int) -> bool: with get_conn() as conn: cur = conn.execute("DELETE FROM bounties WHERE id = ?", (bounty_id,)) return cur.rowcount > 0 # ── Tracking ──────────────────────────────────────────────────────────────── def track_bounty(user_id: int, bounty_id: int) -> bool: """Add bounty to user's tracking. Returns True if newly tracked, False if already tracking.""" with get_conn() as conn: try: conn.execute( "INSERT INTO user_bounty_tracking (user_id, bounty_id) VALUES (?, ?)", (user_id, bounty_id), ) return True except sqlite3.IntegrityError: return False def untrack_bounty(user_id: int, bounty_id: int) -> bool: with get_conn() as conn: cur = conn.execute( "DELETE FROM user_bounty_tracking WHERE user_id = ? AND bounty_id = ?", (user_id, bounty_id), ) return cur.rowcount > 0 def is_tracking(user_id: int, bounty_id: int) -> bool: with get_conn() as conn: row = conn.execute( "SELECT 1 FROM user_bounty_tracking WHERE user_id = ? AND bounty_id = ?", (user_id, bounty_id), ).fetchone() return row is not None def get_user_tracked_bounties_in_group(user_id: int, group_id: int) -> list[dict]: with get_conn() as conn: return [_row_to_dict(r) for r in conn.execute( """SELECT b.* FROM bounties b JOIN user_bounty_tracking t ON t.bounty_id = b.id WHERE t.user_id = ? AND b.group_id = ? ORDER BY b.created_at DESC""", (user_id, group_id), )] def get_user_tracked_bounties_personal(user_id: int) -> list[dict]: """Tracked bounties where group_id IS NULL (personal).""" with get_conn() as conn: return [_row_to_dict(r) for r in conn.execute( """SELECT b.* FROM bounties b JOIN user_bounty_tracking t ON t.bounty_id = b.id WHERE t.user_id = ? AND b.group_id IS NULL ORDER BY b.created_at DESC""", (user_id,), )] # ── Reminders ─────────────────────────────────────────────────────────────── def get_bounties_due_soon(user_id: int, days: int = 7) -> list[dict]: """Get tracked bounties with due_date within `days` that haven't been reminded yet.""" now = int(time.time()) deadline = now + days * 86400 with get_conn() as conn: return [_row_to_dict(r) for r in conn.execute( """SELECT b.*, u.username, u.telegram_user_id FROM bounties b JOIN user_bounty_tracking t ON t.bounty_id = b.id JOIN users u ON u.id = b.created_by_user_id WHERE t.user_id = ? AND b.due_date_ts IS NOT NULL AND b.due_date_ts <= ? AND b.due_date_ts >= ? AND b.id NOT IN ( SELECT bounty_id FROM reminder_log WHERE user_id = ? ) ORDER BY b.due_date_ts ASC""", (user_id, deadline, now, user_id), )] def log_reminder(user_id: int, bounty_id: int) -> None: with get_conn() as conn: conn.execute( "INSERT OR IGNORE INTO reminder_log (user_id, bounty_id) VALUES (?, ?)", (user_id, bounty_id), ) def get_all_user_ids() -> list[int]: with get_conn() as conn: return [row["telegram_user_id"] for row in conn.execute("SELECT telegram_user_id FROM users")]