Add tests + fix db.py SQLite commit pattern
Tests: - tests/test_commands.py: parse_args, extract_args, format_bounty - tests/test_db.py: full CRUD + tracking + reminders - tests/conftest.py: temp DB fixture - requirements-dev.txt: pytest + pytest-asyncio db.py fixes: - Explicit conn.commit() after every write (SQLite row_factory disables implicit transaction management) - fetchone() before commit() (can't commit while cursor open) - Functions return dict instead of sqlite3.Row
This commit is contained in:
@@ -10,11 +10,14 @@ DB_PATH = Path(__file__).parent / "jigaido.db"
|
|||||||
|
|
||||||
def get_conn() -> sqlite3.Connection:
|
def get_conn() -> sqlite3.Connection:
|
||||||
conn = sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES)
|
conn = sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES)
|
||||||
conn.row_factory = sqlite3.Row
|
|
||||||
conn.execute("PRAGMA foreign_keys = ON")
|
conn.execute("PRAGMA foreign_keys = ON")
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def _row_to_dict(row: sqlite3.Row) -> dict:
|
||||||
|
return dict(row)
|
||||||
|
|
||||||
|
|
||||||
def init_db() -> None:
|
def init_db() -> None:
|
||||||
schema = (Path(__file__).parent / "schema.sql").read_text()
|
schema = (Path(__file__).parent / "schema.sql").read_text()
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
@@ -32,15 +35,18 @@ def upsert_user(telegram_user_id: int, username: str | None) -> int:
|
|||||||
RETURNING id""",
|
RETURNING id""",
|
||||||
(telegram_user_id, username),
|
(telegram_user_id, username),
|
||||||
)
|
)
|
||||||
return cur.fetchone()["id"]
|
result = cur.fetchone()
|
||||||
|
conn.commit()
|
||||||
|
return result[0]
|
||||||
|
|
||||||
|
|
||||||
def get_user_by_telegram_id(telegram_user_id: int) -> Optional[sqlite3.Row]:
|
def get_user_by_telegram_id(telegram_user_id: int) -> Optional[dict]:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return conn.execute(
|
row = conn.execute(
|
||||||
"SELECT * FROM users WHERE telegram_user_id = ?",
|
"SELECT * FROM users WHERE telegram_user_id = ?",
|
||||||
(telegram_user_id,),
|
(telegram_user_id,),
|
||||||
).fetchone()
|
).fetchone()
|
||||||
|
return _row_to_dict(row) if row else None
|
||||||
|
|
||||||
|
|
||||||
# ── Groups ─────────────────────────────────────────────────────────────────
|
# ── Groups ─────────────────────────────────────────────────────────────────
|
||||||
@@ -56,15 +62,18 @@ def upsert_group(telegram_chat_id: int, creator_user_id: int) -> int:
|
|||||||
RETURNING id""",
|
RETURNING id""",
|
||||||
(telegram_chat_id, creator_user_id),
|
(telegram_chat_id, creator_user_id),
|
||||||
)
|
)
|
||||||
return cur.fetchone()["id"]
|
result = cur.fetchone()
|
||||||
|
conn.commit()
|
||||||
|
return result[0]
|
||||||
|
|
||||||
|
|
||||||
def get_group(telegram_chat_id: int) -> Optional[sqlite3.Row]:
|
def get_group(telegram_chat_id: int) -> Optional[dict]:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return conn.execute(
|
row = conn.execute(
|
||||||
"SELECT * FROM groups WHERE telegram_chat_id = ?",
|
"SELECT * FROM groups WHERE telegram_chat_id = ?",
|
||||||
(telegram_chat_id,),
|
(telegram_chat_id,),
|
||||||
).fetchone()
|
).fetchone()
|
||||||
|
return _row_to_dict(row) if row else None
|
||||||
|
|
||||||
|
|
||||||
def get_group_creator_user_id(group_id: int) -> Optional[int]:
|
def get_group_creator_user_id(group_id: int) -> Optional[int]:
|
||||||
@@ -73,7 +82,7 @@ def get_group_creator_user_id(group_id: int) -> Optional[int]:
|
|||||||
"SELECT creator_user_id FROM groups WHERE id = ?",
|
"SELECT creator_user_id FROM groups WHERE id = ?",
|
||||||
(group_id,),
|
(group_id,),
|
||||||
).fetchone()
|
).fetchone()
|
||||||
return row["creator_user_id"] if row else None
|
return row[0] if row else None
|
||||||
|
|
||||||
|
|
||||||
# ── Group Admins ────────────────────────────────────────────────────────────
|
# ── Group Admins ────────────────────────────────────────────────────────────
|
||||||
@@ -86,6 +95,7 @@ def add_group_admin(group_id: int, user_id: int) -> bool:
|
|||||||
"INSERT INTO group_admins (group_id, user_id) VALUES (?, ?)",
|
"INSERT INTO group_admins (group_id, user_id) VALUES (?, ?)",
|
||||||
(group_id, user_id),
|
(group_id, user_id),
|
||||||
)
|
)
|
||||||
|
conn.commit()
|
||||||
return True
|
return True
|
||||||
except sqlite3.IntegrityError:
|
except sqlite3.IntegrityError:
|
||||||
return False
|
return False
|
||||||
@@ -98,6 +108,7 @@ def remove_group_admin(group_id: int, user_id: int) -> bool:
|
|||||||
"DELETE FROM group_admins WHERE group_id = ? AND user_id = ?",
|
"DELETE FROM group_admins WHERE group_id = ? AND user_id = ?",
|
||||||
(group_id, user_id),
|
(group_id, user_id),
|
||||||
)
|
)
|
||||||
|
conn.commit()
|
||||||
return cur.rowcount > 0
|
return cur.rowcount > 0
|
||||||
|
|
||||||
|
|
||||||
@@ -114,13 +125,14 @@ def is_group_creator(group_id: int, user_id: int) -> bool:
|
|||||||
return get_group_creator_user_id(group_id) == user_id
|
return get_group_creator_user_id(group_id) == user_id
|
||||||
|
|
||||||
|
|
||||||
def get_user_by_username(username: str) -> Optional[sqlite3.Row]:
|
def get_user_by_username(username: str) -> Optional[dict]:
|
||||||
"""Look up user by username (without @)."""
|
"""Look up user by username (without @)."""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return conn.execute(
|
row = conn.execute(
|
||||||
"SELECT * FROM users WHERE username = ?",
|
"SELECT * FROM users WHERE username = ?",
|
||||||
(username,),
|
(username,),
|
||||||
).fetchone()
|
).fetchone()
|
||||||
|
return _row_to_dict(row) if row else None
|
||||||
|
|
||||||
|
|
||||||
# ── Bounties ────────────────────────────────────────────────────────────────
|
# ── Bounties ────────────────────────────────────────────────────────────────
|
||||||
@@ -143,33 +155,36 @@ def add_bounty(
|
|||||||
RETURNING id""",
|
RETURNING id""",
|
||||||
(group_id, created_by_user_id, informed_by_username, text, link, due_date_ts),
|
(group_id, created_by_user_id, informed_by_username, text, link, due_date_ts),
|
||||||
)
|
)
|
||||||
return cur.fetchone()["id"]
|
result = cur.fetchone()
|
||||||
|
conn.commit()
|
||||||
|
return result[0]
|
||||||
except sqlite3.IntegrityError as e:
|
except sqlite3.IntegrityError as e:
|
||||||
if "UNIQUE" in str(e) and "link" in str(e):
|
if "UNIQUE" in str(e) and "link" in str(e):
|
||||||
raise ValueError(f"Link already exists in this group: {link}")
|
raise ValueError(f"Link already exists in this group: {link}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def get_bounty(bounty_id: int) -> Optional[sqlite3.Row]:
|
def get_bounty(bounty_id: int) -> Optional[dict]:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return conn.execute("SELECT * FROM bounties WHERE id = ?", (bounty_id,)).fetchone()
|
row = conn.execute("SELECT * FROM bounties WHERE id = ?", (bounty_id,)).fetchone()
|
||||||
|
return _row_to_dict(row) if row else None
|
||||||
|
|
||||||
|
|
||||||
def get_group_bounties(group_id: int) -> list[sqlite3.Row]:
|
def get_group_bounties(group_id: int) -> list[dict]:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return list(conn.execute(
|
return [_row_to_dict(r) for r in conn.execute(
|
||||||
"SELECT * FROM bounties WHERE group_id = ? ORDER BY created_at DESC",
|
"SELECT * FROM bounties WHERE group_id = ? ORDER BY created_at DESC",
|
||||||
(group_id,),
|
(group_id,),
|
||||||
))
|
)]
|
||||||
|
|
||||||
|
|
||||||
def get_user_personal_bounties(user_id: int) -> list[sqlite3.Row]:
|
def get_user_personal_bounties(user_id: int) -> list[dict]:
|
||||||
"""Bounties created by user in DM (group_id IS NULL)."""
|
"""Bounties created by user in DM (group_id IS NULL)."""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return list(conn.execute(
|
return [_row_to_dict(r) for r in conn.execute(
|
||||||
"SELECT * FROM bounties WHERE group_id IS NULL AND created_by_user_id = ? ORDER BY created_at DESC",
|
"SELECT * FROM bounties WHERE group_id IS NULL AND created_by_user_id = ? ORDER BY created_at DESC",
|
||||||
(user_id,),
|
(user_id,),
|
||||||
))
|
)]
|
||||||
|
|
||||||
|
|
||||||
def update_bounty(
|
def update_bounty(
|
||||||
@@ -189,6 +204,7 @@ def update_bounty(
|
|||||||
WHERE id = ?""",
|
WHERE id = ?""",
|
||||||
(text, link, due_date_ts, bounty_id),
|
(text, link, due_date_ts, bounty_id),
|
||||||
)
|
)
|
||||||
|
conn.commit()
|
||||||
return cur.rowcount > 0
|
return cur.rowcount > 0
|
||||||
except sqlite3.IntegrityError as e:
|
except sqlite3.IntegrityError as e:
|
||||||
if "UNIQUE" in str(e) and "link" in str(e):
|
if "UNIQUE" in str(e) and "link" in str(e):
|
||||||
@@ -199,6 +215,7 @@ def update_bounty(
|
|||||||
def delete_bounty(bounty_id: int) -> bool:
|
def delete_bounty(bounty_id: int) -> bool:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
cur = conn.execute("DELETE FROM bounties WHERE id = ?", (bounty_id,))
|
cur = conn.execute("DELETE FROM bounties WHERE id = ?", (bounty_id,))
|
||||||
|
conn.commit()
|
||||||
return cur.rowcount > 0
|
return cur.rowcount > 0
|
||||||
|
|
||||||
|
|
||||||
@@ -212,6 +229,7 @@ def track_bounty(user_id: int, bounty_id: int) -> bool:
|
|||||||
"INSERT INTO user_bounty_tracking (user_id, bounty_id) VALUES (?, ?)",
|
"INSERT INTO user_bounty_tracking (user_id, bounty_id) VALUES (?, ?)",
|
||||||
(user_id, bounty_id),
|
(user_id, bounty_id),
|
||||||
)
|
)
|
||||||
|
conn.commit()
|
||||||
return True
|
return True
|
||||||
except sqlite3.IntegrityError:
|
except sqlite3.IntegrityError:
|
||||||
return False
|
return False
|
||||||
@@ -223,6 +241,7 @@ def untrack_bounty(user_id: int, bounty_id: int) -> bool:
|
|||||||
"DELETE FROM user_bounty_tracking WHERE user_id = ? AND bounty_id = ?",
|
"DELETE FROM user_bounty_tracking WHERE user_id = ? AND bounty_id = ?",
|
||||||
(user_id, bounty_id),
|
(user_id, bounty_id),
|
||||||
)
|
)
|
||||||
|
conn.commit()
|
||||||
return cur.rowcount > 0
|
return cur.rowcount > 0
|
||||||
|
|
||||||
|
|
||||||
@@ -235,37 +254,37 @@ def is_tracking(user_id: int, bounty_id: int) -> bool:
|
|||||||
return row is not None
|
return row is not None
|
||||||
|
|
||||||
|
|
||||||
def get_user_tracked_bounties_in_group(user_id: int, group_id: int) -> list[sqlite3.Row]:
|
def get_user_tracked_bounties_in_group(user_id: int, group_id: int) -> list[dict]:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return list(conn.execute(
|
return [_row_to_dict(r) for r in conn.execute(
|
||||||
"""SELECT b.* FROM bounties b
|
"""SELECT b.* FROM bounties b
|
||||||
JOIN user_bounty_tracking t ON t.bounty_id = b.id
|
JOIN user_bounty_tracking t ON t.bounty_id = b.id
|
||||||
WHERE t.user_id = ? AND b.group_id = ?
|
WHERE t.user_id = ? AND b.group_id = ?
|
||||||
ORDER BY b.created_at DESC""",
|
ORDER BY b.created_at DESC""",
|
||||||
(user_id, group_id),
|
(user_id, group_id),
|
||||||
))
|
)]
|
||||||
|
|
||||||
|
|
||||||
def get_user_tracked_bounties_personal(user_id: int) -> list[sqlite3.Row]:
|
def get_user_tracked_bounties_personal(user_id: int) -> list[dict]:
|
||||||
"""Tracked bounties where group_id IS NULL (personal)."""
|
"""Tracked bounties where group_id IS NULL (personal)."""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return list(conn.execute(
|
return [_row_to_dict(r) for r in conn.execute(
|
||||||
"""SELECT b.* FROM bounties b
|
"""SELECT b.* FROM bounties b
|
||||||
JOIN user_bounty_tracking t ON t.bounty_id = b.id
|
JOIN user_bounty_tracking t ON t.bounty_id = b.id
|
||||||
WHERE t.user_id = ? AND b.group_id IS NULL
|
WHERE t.user_id = ? AND b.group_id IS NULL
|
||||||
ORDER BY b.created_at DESC""",
|
ORDER BY b.created_at DESC""",
|
||||||
(user_id,),
|
(user_id,),
|
||||||
))
|
)]
|
||||||
|
|
||||||
|
|
||||||
# ── Reminders ───────────────────────────────────────────────────────────────
|
# ── Reminders ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def get_bounties_due_soon(user_id: int, days: int = 7) -> list[sqlite3.Row]:
|
def get_bounties_due_soon(user_id: int, days: int = 7) -> list[dict]:
|
||||||
"""Get tracked bounties with due_date within `days` that haven't been reminded yet."""
|
"""Get tracked bounties with due_date within `days` that haven't been reminded yet."""
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
deadline = now + days * 86400
|
deadline = now + days * 86400
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return list(conn.execute(
|
return [_row_to_dict(r) for r in conn.execute(
|
||||||
"""SELECT b.*, u.username, u.telegram_user_id FROM bounties b
|
"""SELECT b.*, u.username, u.telegram_user_id FROM bounties b
|
||||||
JOIN user_bounty_tracking t ON t.bounty_id = b.id
|
JOIN user_bounty_tracking t ON t.bounty_id = b.id
|
||||||
JOIN users u ON u.id = b.created_by_user_id
|
JOIN users u ON u.id = b.created_by_user_id
|
||||||
@@ -278,7 +297,7 @@ def get_bounties_due_soon(user_id: int, days: int = 7) -> list[sqlite3.Row]:
|
|||||||
)
|
)
|
||||||
ORDER BY b.due_date_ts ASC""",
|
ORDER BY b.due_date_ts ASC""",
|
||||||
(user_id, deadline, now, user_id),
|
(user_id, deadline, now, user_id),
|
||||||
))
|
)]
|
||||||
|
|
||||||
|
|
||||||
def log_reminder(user_id: int, bounty_id: int) -> None:
|
def log_reminder(user_id: int, bounty_id: int) -> None:
|
||||||
@@ -287,8 +306,9 @@ def log_reminder(user_id: int, bounty_id: int) -> None:
|
|||||||
"INSERT OR IGNORE INTO reminder_log (user_id, bounty_id) VALUES (?, ?)",
|
"INSERT OR IGNORE INTO reminder_log (user_id, bounty_id) VALUES (?, ?)",
|
||||||
(user_id, bounty_id),
|
(user_id, bounty_id),
|
||||||
)
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
def get_all_user_ids() -> list[int]:
|
def get_all_user_ids() -> list[int]:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
return [row["telegram_user_id"] for row in conn.execute("SELECT telegram_user_id FROM users")]
|
return [row[0] for row in conn.execute("SELECT telegram_user_id FROM users")]
|
||||||
|
|||||||
4
apps/telegram-bot/requirements-dev.txt
Normal file
4
apps/telegram-bot/requirements-dev.txt
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
python-telegram-bot==21.6
|
||||||
|
dateparser==1.2.0
|
||||||
|
pytest==8.3.5
|
||||||
|
pytest-asyncio==0.25.2
|
||||||
1
apps/telegram-bot/tests/__init__.py
Normal file
1
apps/telegram-bot/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# tests package
|
||||||
27
apps/telegram-bot/tests/conftest.py
Normal file
27
apps/telegram-bot/tests/conftest.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
"""Pytest fixtures for telegram-bot tests."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Add the app directory to path so `import db` works when running pytest
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def fresh_db(monkeypatch):
|
||||||
|
"""Replace DB_PATH with a temp file before any test runs."""
|
||||||
|
import db as _db
|
||||||
|
|
||||||
|
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
|
||||||
|
tmp_path = Path(tmp.name)
|
||||||
|
tmp.close()
|
||||||
|
|
||||||
|
monkeypatch.setattr(_db, "DB_PATH", tmp_path)
|
||||||
|
_db.init_db()
|
||||||
|
|
||||||
|
yield tmp_path
|
||||||
|
|
||||||
|
tmp_path.unlink(missing_ok=True)
|
||||||
166
apps/telegram-bot/tests/test_commands.py
Normal file
166
apps/telegram-bot/tests/test_commands.py
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
"""Tests for commands.py — parsing and formatting functions only."""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from commands import extract_args, parse_args, format_bounty
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractArgs:
|
||||||
|
def test_basic_split(self):
|
||||||
|
assert extract_args("/add hello world") == ["hello", "world"]
|
||||||
|
assert extract_args("/track 42") == ["42"]
|
||||||
|
|
||||||
|
def test_no_args(self):
|
||||||
|
assert extract_args("/bounty") == []
|
||||||
|
assert extract_args("/add") == []
|
||||||
|
|
||||||
|
def test_preserves_urls(self):
|
||||||
|
result = extract_args("/add check https://example.com stuff")
|
||||||
|
assert result == ["check", "https://example.com", "stuff"]
|
||||||
|
|
||||||
|
def test_none_input(self):
|
||||||
|
assert extract_args(None) == []
|
||||||
|
|
||||||
|
def test_strips_whitespace(self):
|
||||||
|
assert extract_args(" /add hello world ") == ["hello", "world"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseArgs:
|
||||||
|
def test_text_only(self):
|
||||||
|
text, link, due = parse_args(["hello", "world"])
|
||||||
|
assert text == "hello world"
|
||||||
|
assert link is None
|
||||||
|
assert due is None
|
||||||
|
|
||||||
|
def test_link_extracted(self):
|
||||||
|
text, link, due = parse_args(["hello", "https://example.com"])
|
||||||
|
# "hello" is non-link non-date → becomes text; only the URL becomes link
|
||||||
|
assert text == "hello"
|
||||||
|
assert link == "https://example.com"
|
||||||
|
assert due is None
|
||||||
|
|
||||||
|
def test_text_and_link(self):
|
||||||
|
text, link, due = parse_args(["hello", "world", "https://example.com"])
|
||||||
|
assert text == "hello world"
|
||||||
|
assert link == "https://example.com"
|
||||||
|
|
||||||
|
def test_due_date_parsed(self):
|
||||||
|
text, link, due = parse_args(["hello", "tomorrow"])
|
||||||
|
assert text == "hello"
|
||||||
|
assert due is not None
|
||||||
|
# Should be some time in the future
|
||||||
|
assert due > int(time.time())
|
||||||
|
|
||||||
|
def test_all_three(self):
|
||||||
|
text, link, due = parse_args(["hello", "https://example.com", "tomorrow"])
|
||||||
|
assert text == "hello"
|
||||||
|
assert link == "https://example.com"
|
||||||
|
assert due is not None
|
||||||
|
|
||||||
|
def test_http_and_https_both_detected(self):
|
||||||
|
_, link1, _ = parse_args(["http://example.com"])
|
||||||
|
_, link2, _ = parse_args(["https://example.com"])
|
||||||
|
assert link1 == "http://example.com"
|
||||||
|
assert link2 == "https://example.com"
|
||||||
|
|
||||||
|
def test_non_url_non_date_becomes_text(self):
|
||||||
|
text, link, due = parse_args(["fix", "the", "bug"])
|
||||||
|
assert text == "fix the bug"
|
||||||
|
assert link is None
|
||||||
|
assert due is None
|
||||||
|
|
||||||
|
def test_multiple_links_first_only(self):
|
||||||
|
_, link, _ = parse_args(["text", "https://first.com", "https://second.com"])
|
||||||
|
assert link == "https://first.com"
|
||||||
|
|
||||||
|
def test_due_date_after_link(self):
|
||||||
|
text, link, due = parse_args(["task", "https://example.com", "in 5 days"])
|
||||||
|
assert text == "task"
|
||||||
|
assert link == "https://example.com"
|
||||||
|
assert due is not None
|
||||||
|
|
||||||
|
def test_empty_args(self):
|
||||||
|
text, link, due = parse_args([])
|
||||||
|
assert text is None
|
||||||
|
assert link is None
|
||||||
|
assert due is None
|
||||||
|
|
||||||
|
def test_date_parser_failure_returns_none(self):
|
||||||
|
# "asdfjkl" is not parseable → goes to text
|
||||||
|
text, link, due = parse_args(["hello", "asdfjkl"])
|
||||||
|
assert text == "hello asdfjkl"
|
||||||
|
assert due is None
|
||||||
|
|
||||||
|
def test_link_takes_first_match(self):
|
||||||
|
# Even if it's not a valid URL, starts with https://
|
||||||
|
_, link, _ = parse_args(["skip", "https://not-real.but-still-a-link"])
|
||||||
|
assert link == "https://not-real.but-still-a-link"
|
||||||
|
|
||||||
|
|
||||||
|
class TestFormatBounty:
|
||||||
|
def _row(self, id=1, text="Test bounty", link="https://example.com",
|
||||||
|
due_date_ts=None, informed_by_username="alice"):
|
||||||
|
row = MagicMock()
|
||||||
|
row.__getitem__ = lambda s, k: {
|
||||||
|
"id": id, "text": text, "link": link,
|
||||||
|
"due_date_ts": due_date_ts, "informed_by_username": informed_by_username
|
||||||
|
}[k]
|
||||||
|
return row
|
||||||
|
|
||||||
|
def test_shows_id(self):
|
||||||
|
b = self._row(id=42)
|
||||||
|
out = format_bounty(b, show_id=True)
|
||||||
|
assert "[#42]" in out
|
||||||
|
|
||||||
|
def test_hides_id_when_requested(self):
|
||||||
|
b = self._row(id=42)
|
||||||
|
out = format_bounty(b, show_id=False)
|
||||||
|
assert "[#42]" not in out
|
||||||
|
|
||||||
|
def test_text_included(self):
|
||||||
|
b = self._row(text="Fix the login bug")
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "Fix the login bug" in out
|
||||||
|
|
||||||
|
def test_link_shown(self):
|
||||||
|
b = self._row(link="https://github.com/bob/repo")
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "https://github.com/bob/repo" in out
|
||||||
|
assert "🔗" in out
|
||||||
|
|
||||||
|
def test_no_link(self):
|
||||||
|
b = self._row(link=None)
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "🔗" not in out
|
||||||
|
|
||||||
|
def test_due_date_future(self):
|
||||||
|
future = int(time.time()) + 3 * 86400
|
||||||
|
b = self._row(due_date_ts=future)
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "⏰" in out
|
||||||
|
assert "3d" in out
|
||||||
|
|
||||||
|
def test_due_date_today(self):
|
||||||
|
today = int(time.time()) + 3600 # within today
|
||||||
|
b = self._row(due_date_ts=today)
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "TODAY" in out
|
||||||
|
|
||||||
|
def test_due_date_overdue(self):
|
||||||
|
past = int(time.time()) - 86400 # yesterday
|
||||||
|
b = self._row(due_date_ts=past)
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "OVERDUE" in out
|
||||||
|
|
||||||
|
def test_informed_by_shown(self):
|
||||||
|
b = self._row(informed_by_username="bob")
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "@bob" in out
|
||||||
|
|
||||||
|
def test_informed_by_unknown_fallback(self):
|
||||||
|
b = self._row(informed_by_username=None)
|
||||||
|
out = format_bounty(b)
|
||||||
|
assert "@unknown" in out
|
||||||
296
apps/telegram-bot/tests/test_db.py
Normal file
296
apps/telegram-bot/tests/test_db.py
Normal file
@@ -0,0 +1,296 @@
|
|||||||
|
"""Tests for db.py"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
import db as _db
|
||||||
|
|
||||||
|
|
||||||
|
class TestUsers:
|
||||||
|
def test_upsert_user_creates_new(self):
|
||||||
|
uid = _db.upsert_user(123, "alice")
|
||||||
|
assert uid > 0
|
||||||
|
row = _db.get_user_by_telegram_id(123)
|
||||||
|
assert row is not None
|
||||||
|
assert row["telegram_user_id"] == 123
|
||||||
|
assert row["username"] == "alice"
|
||||||
|
|
||||||
|
def test_upsert_user_updates_username(self):
|
||||||
|
_db.upsert_user(123, "alice")
|
||||||
|
uid2 = _db.upsert_user(123, "alice_updated")
|
||||||
|
assert uid2 == _db.upsert_user(123, "alice") # same id
|
||||||
|
row = _db.get_user_by_telegram_id(123)
|
||||||
|
assert row["username"] == "alice_updated"
|
||||||
|
|
||||||
|
def test_get_user_by_telegram_id_not_found(self):
|
||||||
|
row = _db.get_user_by_telegram_id(999999)
|
||||||
|
assert row is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestGroups:
|
||||||
|
def test_upsert_group_creates_new(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
assert gid > 0
|
||||||
|
row = _db.get_group(-100123)
|
||||||
|
assert row is not None
|
||||||
|
assert row["telegram_chat_id"] == -100123
|
||||||
|
assert row["creator_user_id"] == uid
|
||||||
|
|
||||||
|
def test_upsert_group_idempotent(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid1 = _db.upsert_group(-100123, uid)
|
||||||
|
gid2 = _db.upsert_group(-100123, uid)
|
||||||
|
assert gid1 == gid2
|
||||||
|
|
||||||
|
def test_get_group_creator_user_id(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
_db.upsert_group(-100123, uid)
|
||||||
|
assert _db.get_group_creator_user_id(_db.get_group(-100123)["id"]) == uid
|
||||||
|
|
||||||
|
def test_get_group_not_found(self, fresh_db):
|
||||||
|
row = _db.get_group(-999999)
|
||||||
|
assert row is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestGroupAdmins:
|
||||||
|
def test_add_remove_is_admin(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
|
||||||
|
assert not _db.is_group_admin(gid, uid)
|
||||||
|
added = _db.add_group_admin(gid, uid)
|
||||||
|
assert added is True
|
||||||
|
assert _db.is_group_admin(gid, uid) is True
|
||||||
|
|
||||||
|
# Adding again returns False (already admin)
|
||||||
|
assert _db.add_group_admin(gid, uid) is False
|
||||||
|
|
||||||
|
removed = _db.remove_group_admin(gid, uid)
|
||||||
|
assert removed is True
|
||||||
|
assert _db.is_group_admin(gid, uid) is False
|
||||||
|
|
||||||
|
def test_remove_nonexistent_admin_returns_false(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
assert _db.remove_group_admin(gid, uid) is False
|
||||||
|
|
||||||
|
def test_is_group_creator(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
other = _db.upsert_user(2, "other")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
_db.add_group_admin(gid, uid)
|
||||||
|
|
||||||
|
assert _db.is_group_creator(gid, uid) is True
|
||||||
|
assert _db.is_group_creator(gid, other) is False
|
||||||
|
|
||||||
|
def test_get_user_by_username(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
row = _db.get_user_by_username("alice")
|
||||||
|
assert row is not None
|
||||||
|
assert row["id"] == uid
|
||||||
|
assert _db.get_user_by_username("nobody") is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestBounties:
|
||||||
|
def test_add_bounty_group(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
_db.add_group_admin(gid, uid)
|
||||||
|
|
||||||
|
bid = _db.add_bounty(
|
||||||
|
group_id=gid,
|
||||||
|
created_by_user_id=uid,
|
||||||
|
informed_by_username="bob",
|
||||||
|
text="Fix bug",
|
||||||
|
link="https://github.com/bob/repo",
|
||||||
|
due_date_ts=int(time.time()) + 86400,
|
||||||
|
)
|
||||||
|
assert bid > 0
|
||||||
|
bounty = _db.get_bounty(bid)
|
||||||
|
assert bounty["text"] == "Fix bug"
|
||||||
|
assert bounty["link"] == "https://github.com/bob/repo"
|
||||||
|
assert bounty["informed_by_username"] == "bob"
|
||||||
|
assert bounty["group_id"] == gid
|
||||||
|
|
||||||
|
def test_add_bounty_personal(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
bid = _db.add_bounty(
|
||||||
|
group_id=None,
|
||||||
|
created_by_user_id=uid,
|
||||||
|
informed_by_username="alice",
|
||||||
|
text="Personal reminder",
|
||||||
|
link=None,
|
||||||
|
due_date_ts=None,
|
||||||
|
)
|
||||||
|
assert bid > 0
|
||||||
|
bounty = _db.get_bounty(bid)
|
||||||
|
assert bounty["group_id"] is None
|
||||||
|
assert bounty["text"] == "Personal reminder"
|
||||||
|
|
||||||
|
def test_add_bounty_duplicate_link_rejected(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
|
||||||
|
_db.add_bounty(gid, uid, "user1", "text1", "https://example.com", None)
|
||||||
|
with pytest.raises(ValueError, match="Link already exists"):
|
||||||
|
_db.add_bounty(gid, uid, "user2", "text2", "https://example.com", None)
|
||||||
|
|
||||||
|
def test_add_bounty_null_link_allows_multiples(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
|
||||||
|
bid1 = _db.add_bounty(gid, uid, "user1", "text only 1", None, None)
|
||||||
|
bid2 = _db.add_bounty(gid, uid, "user2", "text only 2", None, None)
|
||||||
|
assert bid1 != bid2
|
||||||
|
|
||||||
|
def test_get_group_bounties(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
_db.add_group_admin(gid, uid)
|
||||||
|
|
||||||
|
_db.add_bounty(gid, uid, "user", "bounty1", None, None)
|
||||||
|
_db.add_bounty(gid, uid, "user", "bounty2", None, None)
|
||||||
|
|
||||||
|
bounties = _db.get_group_bounties(gid)
|
||||||
|
assert len(bounties) == 2
|
||||||
|
|
||||||
|
def test_get_user_personal_bounties(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
_db.add_bounty(None, uid, "alice", "personal1", None, None)
|
||||||
|
_db.add_bounty(None, uid, "alice", "personal2", None, None)
|
||||||
|
|
||||||
|
# Group bounty should not appear
|
||||||
|
other = _db.upsert_user(2, "bob")
|
||||||
|
gid = _db.upsert_group(-100, other)
|
||||||
|
_db.add_bounty(gid, other, "bob", "group bounty", None, None)
|
||||||
|
|
||||||
|
personal = _db.get_user_personal_bounties(uid)
|
||||||
|
assert len(personal) == 2
|
||||||
|
|
||||||
|
def test_update_bounty(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
bid = _db.add_bounty(gid, uid, "user", "old text", None, None)
|
||||||
|
|
||||||
|
_db.update_bounty(bid, "new text", None, None)
|
||||||
|
updated = _db.get_bounty(bid)
|
||||||
|
assert updated["text"] == "new text"
|
||||||
|
|
||||||
|
def test_update_bounty_duplicate_link_rejected(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
_db.add_bounty(gid, uid, "user1", "bounty1", "https://a.com", None)
|
||||||
|
bid2 = _db.add_bounty(gid, uid, "user2", "bounty2", None, None)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Link already exists"):
|
||||||
|
_db.update_bounty(bid2, None, "https://a.com", None)
|
||||||
|
|
||||||
|
def test_delete_bounty(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "creator")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
bid = _db.add_bounty(gid, uid, "user", "to delete", None, None)
|
||||||
|
|
||||||
|
assert _db.delete_bounty(bid) is True
|
||||||
|
assert _db.get_bounty(bid) is None
|
||||||
|
# Deleting again returns False
|
||||||
|
assert _db.delete_bounty(bid) is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestTracking:
|
||||||
|
def test_track_untrack_is_tracking(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
uid2 = _db.upsert_user(2, "bob")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
bid = _db.add_bounty(gid, uid, "alice", "task", None, None)
|
||||||
|
|
||||||
|
assert _db.track_bounty(uid, bid) is True
|
||||||
|
assert _db.is_tracking(uid, bid) is True
|
||||||
|
# Track again → False (already tracking)
|
||||||
|
assert _db.track_bounty(uid, bid) is False
|
||||||
|
|
||||||
|
# Other user tracking same bounty
|
||||||
|
assert _db.track_bounty(uid2, bid) is True
|
||||||
|
assert _db.is_tracking(uid2, bid) is True
|
||||||
|
|
||||||
|
# Untrack
|
||||||
|
assert _db.untrack_bounty(uid, bid) is True
|
||||||
|
assert _db.is_tracking(uid, bid) is False
|
||||||
|
assert _db.is_tracking(uid2, bid) is True # other user still tracking
|
||||||
|
# Untrack again → False
|
||||||
|
assert _db.untrack_bounty(uid, bid) is False
|
||||||
|
|
||||||
|
def test_get_user_tracked_bounties_in_group(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
bid1 = _db.add_bounty(gid, uid, "alice", "task1", None, None)
|
||||||
|
bid2 = _db.add_bounty(gid, uid, "alice", "task2", None, None)
|
||||||
|
|
||||||
|
# Different group bounty
|
||||||
|
other_gid = _db.upsert_group(-100124, uid)
|
||||||
|
bid3 = _db.add_bounty(other_gid, uid, "alice", "other group task", None, None)
|
||||||
|
|
||||||
|
_db.track_bounty(uid, bid1)
|
||||||
|
_db.track_bounty(uid, bid3)
|
||||||
|
|
||||||
|
tracked = _db.get_user_tracked_bounties_in_group(uid, gid)
|
||||||
|
assert len(tracked) == 1
|
||||||
|
assert tracked[0]["id"] == bid1
|
||||||
|
|
||||||
|
def test_get_user_tracked_bounties_personal(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
bid1 = _db.add_bounty(None, uid, "alice", "personal1", None, None)
|
||||||
|
bid2 = _db.add_bounty(None, uid, "alice", "personal2", None, None)
|
||||||
|
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
bid3 = _db.add_bounty(gid, uid, "alice", "group task", None, None)
|
||||||
|
|
||||||
|
_db.track_bounty(uid, bid1)
|
||||||
|
_db.track_bounty(uid, bid3)
|
||||||
|
|
||||||
|
tracked = _db.get_user_tracked_bounties_personal(uid)
|
||||||
|
assert len(tracked) == 1
|
||||||
|
assert tracked[0]["id"] == bid1
|
||||||
|
|
||||||
|
|
||||||
|
class TestReminders:
|
||||||
|
def test_get_bounties_due_soon(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
|
||||||
|
now = int(time.time())
|
||||||
|
# Due in 3 days (< 7 days)
|
||||||
|
bid_soon = _db.add_bounty(gid, uid, "alice", "soon", None, now + 3 * 86400)
|
||||||
|
# Due in 10 days (> 7 days)
|
||||||
|
_db.add_bounty(gid, uid, "alice", "later", None, now + 10 * 86400)
|
||||||
|
# No due date
|
||||||
|
bid_no_date = _db.add_bounty(gid, uid, "alice", "no date", None, None)
|
||||||
|
|
||||||
|
_db.track_bounty(uid, bid_soon)
|
||||||
|
_db.track_bounty(uid, bid_no_date)
|
||||||
|
|
||||||
|
due = _db.get_bounties_due_soon(uid, days=7)
|
||||||
|
assert len(due) == 1
|
||||||
|
assert due[0]["id"] == bid_soon
|
||||||
|
|
||||||
|
def test_reminder_log_prevents_duplicate_reminders(self, fresh_db):
|
||||||
|
uid = _db.upsert_user(1, "alice")
|
||||||
|
gid = _db.upsert_group(-100123, uid)
|
||||||
|
now = int(time.time())
|
||||||
|
bid = _db.add_bounty(gid, uid, "alice", "task", None, now + 2 * 86400)
|
||||||
|
_db.track_bounty(uid, bid)
|
||||||
|
|
||||||
|
due1 = _db.get_bounties_due_soon(uid, days=7)
|
||||||
|
assert len(due1) == 1
|
||||||
|
|
||||||
|
# Log that we reminded
|
||||||
|
_db.log_reminder(uid, bid)
|
||||||
|
|
||||||
|
# Should not appear again
|
||||||
|
due2 = _db.get_bounties_due_soon(uid, days=7)
|
||||||
|
assert len(due2) == 0
|
||||||
|
|
||||||
|
def test_get_all_user_ids(self, fresh_db):
|
||||||
|
_db.upsert_user(1, "alice")
|
||||||
|
_db.upsert_user(2, "bob")
|
||||||
|
ids = _db.get_all_user_ids()
|
||||||
|
assert sorted(ids) == [1, 2]
|
||||||
Reference in New Issue
Block a user