- Add storage.py with load_user(), save_user(), next_bounty_id() - Rewrite commands.py to use JSON storage (simplified) - Remove db.py, schema.sql, cron.py, test_db.py - Update SPEC.md to reflect new architecture - Admin model removed (anyone can add, creator only can edit/delete) - No reminders in v1
50 lines
1.4 KiB
Python
50 lines
1.4 KiB
Python
"""Per-user JSON file storage for JIGAIDO."""
|
|
|
|
import json
|
|
import tempfile
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
USERS_DIR = DATA_DIR / "users"
|
|
|
|
|
|
def _ensure_dirs() -> None:
|
|
USERS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _user_file_path(telegram_user_id: int) -> Path:
|
|
return USERS_DIR / f"{telegram_user_id}.json"
|
|
|
|
|
|
def load_user(telegram_user_id: int) -> dict:
|
|
"""Load user data from JSON file. Returns empty user structure if not found."""
|
|
_ensure_dirs()
|
|
path = _user_file_path(telegram_user_id)
|
|
if not path.exists():
|
|
return {
|
|
"user_id": telegram_user_id,
|
|
"username": None,
|
|
"personal_bounties": [],
|
|
"tracked_bounties": [],
|
|
}
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_user(user_data: dict) -> None:
|
|
"""Atomically save user data to JSON file."""
|
|
_ensure_dirs()
|
|
path = _user_file_path(user_data["user_id"])
|
|
with tempfile.NamedTemporaryFile(mode="w", dir=USERS_DIR, delete=False) as tmp:
|
|
json.dump(user_data, tmp, indent=2)
|
|
tmp_path = tmp.name
|
|
os.rename(tmp_path, path)
|
|
|
|
|
|
def next_bounty_id(user_data: dict) -> int:
|
|
"""Get next sequential bounty ID for user's file."""
|
|
existing_ids = [b["id"] for b in user_data.get("personal_bounties", [])]
|
|
return (max(existing_ids) + 1) if existing_ids else 1
|