- Add recover_bounty method to BountyService for recovering soft-deleted bounties - Add cmd_recover function to CLI with list and recover modes - List mode: jigaido-cli recover --group-id <id> - Recover mode: jigaido-cli recover <id>... --group-id <id> - Admin-only for recover, everyone for list - Fixes #49
291 lines
9.2 KiB
Python
291 lines
9.2 KiB
Python
"""JIGAIDO CLI - Command line interface for bounty tracking."""
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import dateparser
|
|
|
|
from adapters.storage.json_file import JsonFileRoomStorage, JsonFileTrackingStorage
|
|
from config import Config
|
|
from core.services import BountyService, TrackingService
|
|
|
|
|
|
def parse_due_date(due_str: str | None) -> int | None:
|
|
"""Parse due date string to Unix timestamp."""
|
|
if not due_str:
|
|
return None
|
|
parsed = dateparser.parse(due_str)
|
|
if parsed:
|
|
return int(parsed.timestamp())
|
|
return None
|
|
|
|
|
|
def create_services():
|
|
"""Create service instances with storage."""
|
|
config = Config()
|
|
config.ensure_data_dir()
|
|
room_storage = JsonFileRoomStorage(config.data_dir / "room")
|
|
tracking_storage = JsonFileTrackingStorage(config.data_dir / "tracking")
|
|
bounty_service = BountyService(room_storage)
|
|
tracking_service = TrackingService(tracking_storage, room_storage)
|
|
return bounty_service, tracking_service
|
|
|
|
|
|
def cmd_add(args):
|
|
"""Add a new bounty."""
|
|
bounty_service, _ = create_services()
|
|
due_ts = parse_due_date(args.due)
|
|
bounty = bounty_service.add_bounty(
|
|
room_id=args.group_id or args.user_id,
|
|
user_id=args.user_id or 0,
|
|
text=args.text,
|
|
link=args.link,
|
|
due_date_ts=due_ts,
|
|
)
|
|
print(f"Added bounty #{bounty.id}")
|
|
|
|
|
|
def cmd_list(args):
|
|
"""List all bounties in a room."""
|
|
bounty_service, _ = create_services()
|
|
bounties = bounty_service.list_bounties(room_id=args.group_id or args.user_id)
|
|
if not bounties:
|
|
print("No bounties")
|
|
return
|
|
for b in bounties:
|
|
due_str = (
|
|
f", due: {dateparser.parse(str(b.due_date_ts)).strftime('%Y-%m-%d')}"
|
|
if b.due_date_ts
|
|
else ""
|
|
)
|
|
link_str = f", link: {b.link}" if b.link else ""
|
|
print(f"#{b.id}: {b.text or '(no text)'}{link_str}{due_str}")
|
|
|
|
|
|
def cmd_my(args):
|
|
"""List tracked bounties for a user."""
|
|
_, tracking_service = create_services()
|
|
room_id = args.group_id or args.user_id
|
|
tracked = tracking_service.get_tracked_bounties(
|
|
room_id=room_id, user_id=args.user_id
|
|
)
|
|
if not tracked:
|
|
print("Not tracking any bounties")
|
|
return
|
|
for b in tracked:
|
|
due_str = (
|
|
f", due: {dateparser.parse(str(b.due_date_ts)).strftime('%Y-%m-%d')}"
|
|
if b.due_date_ts
|
|
else ""
|
|
)
|
|
link_str = f", link: {b.link}" if b.link else ""
|
|
print(f"#{b.id}: {b.text or '(no text)'}{link_str}{due_str}")
|
|
|
|
|
|
def cmd_update(args):
|
|
"""Update a bounty."""
|
|
bounty_service, _ = create_services()
|
|
due_ts = parse_due_date(args.due)
|
|
try:
|
|
success = bounty_service.update_bounty(
|
|
room_id=args.group_id or args.user_id,
|
|
bounty_id=args.bounty_id,
|
|
user_id=args.user_id or 0,
|
|
text=args.text,
|
|
link=args.link,
|
|
due_date_ts=due_ts,
|
|
clear_link=args.clear_link,
|
|
clear_due=args.clear_due,
|
|
)
|
|
if success:
|
|
print(f"Updated bounty #{args.bounty_id}")
|
|
else:
|
|
print(f"Bounty #{args.bounty_id} not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
except PermissionError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_delete(args):
|
|
"""Delete a bounty."""
|
|
bounty_service, _ = create_services()
|
|
try:
|
|
success = bounty_service.delete_bounty(
|
|
room_id=args.group_id or args.user_id,
|
|
bounty_id=args.bounty_id,
|
|
user_id=args.user_id or 0,
|
|
)
|
|
if success:
|
|
print(f"Deleted bounty #{args.bounty_id}")
|
|
else:
|
|
print(f"Bounty #{args.bounty_id} not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
except PermissionError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_recover(args):
|
|
"""List or recover soft-deleted bounties."""
|
|
bounty_service, _ = create_services()
|
|
room_id = args.group_id or args.user_id
|
|
user_id = args.user_id or 0
|
|
|
|
if not args.bounty_ids:
|
|
deleted = bounty_service.list_deleted_bounties(room_id)
|
|
if not deleted:
|
|
print("No recoverable bounties")
|
|
return
|
|
print("Recoverable bounties:")
|
|
for b in deleted:
|
|
from datetime import datetime
|
|
|
|
deleted_str = datetime.fromtimestamp(b.deleted_at).strftime("%d %b %Y")
|
|
print(f" [#{b.id}] {b.text or '(no text)'} | Deleted {deleted_str}")
|
|
return
|
|
|
|
if not bounty_service.is_admin(room_id, user_id):
|
|
print("Error: Only admins can recover bounties.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
for bounty_id in args.bounty_ids:
|
|
try:
|
|
success, msg = bounty_service.recover_bounty(
|
|
room_id=room_id, bounty_id=bounty_id, user_id=user_id
|
|
)
|
|
print(msg)
|
|
except PermissionError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_track(args):
|
|
"""Track a bounty."""
|
|
_, tracking_service = create_services()
|
|
try:
|
|
success = tracking_service.track_bounty(
|
|
room_id=args.group_id,
|
|
user_id=args.user_id,
|
|
bounty_id=args.bounty_id,
|
|
)
|
|
if success:
|
|
print(f"Tracking bounty #{args.bounty_id}")
|
|
else:
|
|
print(f"Already tracking bounty #{args.bounty_id}")
|
|
except ValueError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_untrack(args):
|
|
"""Untrack a bounty."""
|
|
_, tracking_service = create_services()
|
|
success = tracking_service.untrack_bounty(
|
|
room_id=args.group_id,
|
|
user_id=args.user_id,
|
|
bounty_id=args.bounty_id,
|
|
)
|
|
if success:
|
|
print(f"Untracked bounty #{args.bounty_id}")
|
|
else:
|
|
print(f"Not tracking bounty #{args.bounty_id}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="jigaido-cli", description="JIGAIDO bounty tracker CLI"
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", help="Commands")
|
|
|
|
parser_add = subparsers.add_parser("add", help="Add a new bounty")
|
|
parser_add.add_argument("text", help="Bounty description")
|
|
parser_add.add_argument("--link", help="Optional link")
|
|
parser_add.add_argument(
|
|
"--due", help="Optional due date (e.g., 'tomorrow', 'in 3 days')"
|
|
)
|
|
|
|
parser_list = subparsers.add_parser("list", help="List all bounties")
|
|
|
|
parser_my = subparsers.add_parser("my", help="List tracked bounties")
|
|
|
|
parser_update = subparsers.add_parser("update", help="Update a bounty")
|
|
parser_update.add_argument("bounty_id", type=int, help="Bounty ID to update")
|
|
parser_update.add_argument("text", nargs="?", help="New description")
|
|
parser_update.add_argument("--link", help="New link")
|
|
parser_update.add_argument("--due", help="New due date")
|
|
parser_update.add_argument("--clear-link", action="store_true", help="Clear link")
|
|
parser_update.add_argument(
|
|
"--clear-due", action="store_true", help="Clear due date"
|
|
)
|
|
|
|
parser_delete = subparsers.add_parser("delete", help="Delete a bounty")
|
|
parser_delete.add_argument("bounty_id", type=int, help="Bounty ID to delete")
|
|
|
|
parser_track = subparsers.add_parser("track", help="Track a bounty")
|
|
parser_track.add_argument("bounty_id", type=int, help="Bounty ID to track")
|
|
|
|
parser_untrack = subparsers.add_parser("untrack", help="Untrack a bounty")
|
|
parser_untrack.add_argument("bounty_id", type=int, help="Bounty ID to untrack")
|
|
|
|
parser_recover = subparsers.add_parser("recover", help="List or recover soft-deleted bounties")
|
|
parser_recover.add_argument("bounty_ids", nargs="*", type=int, help="Bounty ID(s) to recover (optional)")
|
|
|
|
for sp in [
|
|
parser_add,
|
|
parser_list,
|
|
parser_my,
|
|
parser_update,
|
|
parser_delete,
|
|
parser_track,
|
|
parser_untrack,
|
|
parser_recover,
|
|
]:
|
|
sp.add_argument(
|
|
"--group-id", type=int, help="Group context (use group room ID)"
|
|
)
|
|
sp.add_argument(
|
|
"--user-id", type=int, help="User context (use Telegram user ID)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
if not args.group_id and not args.user_id:
|
|
print("Error: either --group-id or --user-id is required", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.command in ("add", "list", "my", "update", "delete", "recover"):
|
|
if not (args.group_id or args.user_id):
|
|
print("Error: --group-id or --user-id required", file=sys.stderr)
|
|
sys.exit(1)
|
|
if args.command == "add" and not args.text:
|
|
print("Error: text is required for add", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
command_map = {
|
|
"add": cmd_add,
|
|
"list": cmd_list,
|
|
"my": cmd_my,
|
|
"update": cmd_update,
|
|
"delete": cmd_delete,
|
|
"track": cmd_track,
|
|
"untrack": cmd_untrack,
|
|
"recover": cmd_recover,
|
|
}
|
|
|
|
if args.command in command_map:
|
|
command_map[args.command](args)
|
|
else:
|
|
print(f"Unknown command: {args.command}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|