feat: switch admin identification from user_id to username

- Replace admin_user_ids (list[int]) with admin_usernames (list[str])
- Update all service methods to use username for permission checks
- Add delete button to bot responses for message cleanup
- Update tests to match new implementation

Note: Breaking change - existing data files need fresh start
This commit is contained in:
shokollm
2026-04-09 08:02:36 +00:00
parent 7822e65d6c
commit 8494b4621c
6 changed files with 473 additions and 208 deletions

View File

@@ -67,7 +67,7 @@ class JsonFileRoomStorage:
bounties=bounties,
next_id=data["next_id"],
timezone=data.get("timezone"),
admin_user_ids=data.get("admin_user_ids", []),
admin_usernames=data.get("admin_usernames", []),
)
def save(self, room_data: RoomData) -> None:
@@ -76,7 +76,7 @@ class JsonFileRoomStorage:
"room_id": room_data.room_id,
"next_id": room_data.next_id,
"timezone": room_data.timezone,
"admin_user_ids": room_data.admin_user_ids or [],
"admin_usernames": room_data.admin_usernames or [],
"bounties": [
{
"id": b.id,

View File

@@ -285,11 +285,19 @@ async def cmd_bounty(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
displayed_bounties = filtered_bounties[:limit]
if not displayed_bounties:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if show_all:
await update.message.reply_text("No bounties yet.")
await update.message.reply_text(
"No bounties yet.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text(
"No active bounties. Use /bounty all to show expired."
"No active bounties. Use /bounty all to show expired.",
reply_markup=reply_markup,
)
return
@@ -371,29 +379,57 @@ async def cmd_add(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
bounty = BOUNTY_SERVICE.add_bounty(
room_id=room_id,
user_id=user_id,
username=username,
text=text,
link=link,
due_date_ts=due_date_ts,
created_by_username=username,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
except ValueError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
due_str = ""
if due_date_ts:
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
due_str = f" | Due: {format_due_date(due_date_ts, timezone_str)}"
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty added (#{bounty.id}){due_str}",
disable_web_page_preview=True,
reply_markup=reply_markup,
)
async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can edit bounties.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if len(args) < 1:
await update.message.reply_text(
@@ -411,11 +447,16 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try:
bounty_id = int(args[0])
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Invalid bounty ID.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
room_id = get_room_id(update)
timezone_str = BOUNTY_SERVICE.get_timezone(room_id)
try:
@@ -431,19 +472,33 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
and not clear_link
and not clear_date
):
await update.message.reply_text("Nothing to update.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Nothing to update.",
reply_markup=reply_markup,
)
return
old_bounty = BOUNTY_SERVICE.get_bounty(room_id, bounty_id)
if not old_bounty:
await update.message.reply_text("Bounty not found.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Bounty not found.",
reply_markup=reply_markup,
)
return
try:
success = BOUNTY_SERVICE.update_bounty(
room_id=room_id,
bounty_id=bounty_id,
user_id=user_id,
username=username,
text=text,
link=link,
due_date_ts=due_date_ts,
@@ -451,10 +506,18 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
clear_due=clear_date,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
except ValueError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
if success:
@@ -485,11 +548,23 @@ async def cmd_update(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
changes.append(f"Date: {old_date} → (cleared)")
if changes:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty #{bounty_id} updated:\n" + "\n".join(changes)
f"✅ Bounty #{bounty_id} updated:\n" + "\n".join(changes),
reply_markup=reply_markup,
)
else:
await update.message.reply_text(f"✅ Bounty #{bounty_id} updated.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Bounty #{bounty_id} updated.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text("Bounty not found.")
@@ -498,6 +573,22 @@ cmd_edit = cmd_update
async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can delete bounties.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /delete <bounty_id> [bounty_id ...]")
@@ -506,20 +597,28 @@ async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try:
bounty_ids = [int(arg) for arg in args]
except ValueError:
await update.message.reply_text("Invalid bounty ID.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Invalid bounty ID.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
room_id = get_room_id(update)
try:
results = BOUNTY_SERVICE.delete_bounties(
room_id=room_id,
bounty_ids=bounty_ids,
user_id=user_id,
username=username,
)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
response_lines = []
@@ -531,12 +630,24 @@ async def cmd_delete(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
await update.message.reply_text("\n".join(response_lines))
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(response_lines), reply_markup=reply_markup
)
async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if not is_group(update):
await update.message.reply_text("⛔ /track is only available in groups.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ /track is only available in groups.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
@@ -555,16 +666,38 @@ async def cmd_track(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
try:
if TRACKING_SERVICE.track_bounty(room_id, user_id, bounty_id):
await update.message.reply_text(f"✅ Now tracking bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Now tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text(f"Already tracking bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Already tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
except ValueError as e:
await update.message.reply_text(str(e))
async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
if not is_group(update):
await update.message.reply_text("⛔ /untrack is only available in groups.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ /untrack is only available in groups.",
reply_markup=reply_markup,
)
return
args = extract_args(update.message.text)
@@ -582,22 +715,46 @@ async def cmd_untrack(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
room_id = get_room_id(update)
if TRACKING_SERVICE.untrack_bounty(room_id, user_id, bounty_id):
await update.message.reply_text(f"✅ Untracked bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Untracked bounty #{bounty_id}.",
reply_markup=reply_markup,
)
else:
await update.message.reply_text("Not tracking bounty #{bounty_id}.")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Not tracking bounty #{bounty_id}.",
reply_markup=reply_markup,
)
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if is_group(update):
try:
chat_member = await ctx.bot.get_chat_member(room_id, user_id)
if chat_member.status == "creator" and not BOUNTY_SERVICE.is_admin(
room_id, user_id
room_id, username
):
BOUNTY_SERVICE.add_admin(room_id, user_id, user_id)
BOUNTY_SERVICE.add_admin(room_id, username, username)
keyboard = [
[
InlineKeyboardButton(
"🗑️ Delete", callback_data=f"del_msg:{user_id}"
)
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
@@ -605,34 +762,46 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
return
except Exception:
pass
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO is watching.\n\n"
"This group's bounty list is now active.\n"
"/bounty — list bounties\n"
"/add — create a bounty\n"
"/track — track a bounty\n"
"/my — your tracked bounties"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
else:
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
BOUNTY_SERVICE.add_admin(room_id, user_id, user_id)
if not BOUNTY_SERVICE.is_admin(room_id, username):
BOUNTY_SERVICE.add_admin(room_id, username, username)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"👻 JIGAIDO activated.\n\n"
"Personal bounty list ready.\n"
"/bounty — list your bounties\n"
"/add — create a bounty\n"
"/my — your tracked bounties"
"/my — your tracked bounties",
reply_markup=reply_markup,
)
async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
if not args:
await update.message.reply_text("Usage: /show <bounty_id>")
@@ -680,19 +849,44 @@ async def cmd_show(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
created_str = dt_created.strftime("%Y-%m-%d %H:%M")
lines.append(f"📌 Created: {created_str}")
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines), disable_web_page_preview=True, parse_mode=ParseMode.HTML
"\n".join(lines),
disable_web_page_preview=True,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can change timezone.",
reply_markup=reply_markup,
)
return
if not args:
current_tz = BOUNTY_SERVICE.get_timezone(room_id)
await update.message.reply_text(f"Current timezone: {current_tz}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Current timezone: {current_tz}",
reply_markup=reply_markup,
)
return
timezone_str = args[0]
@@ -706,21 +900,39 @@ async def cmd_timezone(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
return
try:
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, user_id)
BOUNTY_SERVICE.set_timezone(room_id, timezone_str, username)
except PermissionError as e:
await update.message.reply_text(f"{e}")
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
return
await update.message.reply_text(f"✅ Timezone set to {timezone_str}.")
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Timezone set to {timezone_str}.",
reply_markup=reply_markup,
)
async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
args = extract_args(update.message.text)
room_id = get_room_id(update)
user_id = get_user_id(update)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
if not BOUNTY_SERVICE.is_admin(room_id, username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
if not args:
@@ -746,7 +958,16 @@ async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
)
lines.append(f"[#{b.id}] {text}{link_str} | 🗑️ Deleted {deleted_str}")
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
reply_markup=reply_markup,
)
return
try:
@@ -755,7 +976,7 @@ async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Invalid bounty ID.")
return
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, user_id)
results = BOUNTY_SERVICE.recover_bounties(room_id, bounty_ids, username)
response_lines = []
for bounty_id, result in results.items():
@@ -768,55 +989,54 @@ async def cmd_recover(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
elif result == "permission_denied":
response_lines.append(f"⛔ Bounty #{bounty_id} permission denied.")
await update.message.reply_text("\n".join(response_lines))
async def _find_user_id_by_username(
ctx: ContextTypes.DEFAULT_TYPE, username: str
) -> int | None:
"""Find user_id by username using Telegram API."""
import logging
log = logging.getLogger(__name__)
try:
chat = await ctx.bot.get_chat(f"@{username}")
log.info(f"Found user {username}: {chat.id}")
return chat.id
except Exception as e:
log.error(f"Failed to find user @{username}: {e}")
return None
def _find_username_by_user_id(room_id: int, user_id: int) -> str | None:
"""Find username by user_id from bounty creators in the room."""
bounties = BOUNTY_SERVICE.list_bounties(room_id)
for bounty in bounties:
if bounty.created_by_user_id == user_id:
return bounty.created_by_username
return None
user_id = get_user_id(update)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(response_lines), reply_markup=reply_markup
)
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
args = extract_args(update.message.text)
room_id = get_room_id(update)
effective_user = update.effective_user
requesting_username = effective_user.username or effective_user.first_name or None
if not args:
admins = BOUNTY_SERVICE.list_admins(get_room_id(update))
if not admins:
await update.message.reply_text("No admins in this room.")
if not BOUNTY_SERVICE.is_admin(room_id, requesting_username):
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
admin_mentions = []
for admin_id in admins:
username = _find_username_by_user_id(get_room_id(update), admin_id)
if username:
admin_mentions.append(
f'<a href="tg://user?id={admin_id}">@{username}</a>'
)
else:
admin_mentions.append(
f'<a href="tg://user?id={admin_id}">{admin_id}</a>'
)
admins = BOUNTY_SERVICE.list_admins(room_id)
if not admins:
user_id = get_user_id(update)
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"No admins in this room.",
reply_markup=reply_markup,
)
return
user_id = get_user_id(update)
admin_mentions = [f"@{admin}" for admin in admins]
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"Room Admins:\n" + "\n".join(f"- {m}" for m in admin_mentions),
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
return
@@ -840,36 +1060,51 @@ async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(f"Usage: /admin {subcommand} @username")
return
username = raw_username[1:]
target_username = raw_username[1:]
user_id = get_user_id(update)
room_id = get_room_id(update)
if not BOUNTY_SERVICE.is_admin(room_id, user_id):
await update.message.reply_text("⛔ Only admins can perform this action.")
return
target_user_id = await _find_user_id_by_username(ctx, username)
if target_user_id is None:
await update.message.reply_text(f"⛔ User @{username} not found.")
if not BOUNTY_SERVICE.is_admin(room_id, requesting_username):
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⛔ Only admins can perform this action.",
reply_markup=reply_markup,
)
return
try:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
if subcommand == "add":
BOUNTY_SERVICE.add_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is now an admin.")
BOUNTY_SERVICE.add_admin(room_id, target_username, requesting_username)
await update.message.reply_text(
f"✅ @{target_username} is now an admin.",
reply_markup=reply_markup,
)
elif subcommand == "remove":
BOUNTY_SERVICE.remove_admin(room_id, target_user_id, user_id)
await update.message.reply_text(f"✅ @{username} is no longer an admin.")
except PermissionError as e:
await update.message.reply_text(f"{e}")
BOUNTY_SERVICE.remove_admin(room_id, target_username, requesting_username)
await update.message.reply_text(
f"✅ @{target_username} is no longer an admin.",
reply_markup=reply_markup,
)
except (PermissionError, ValueError) as e:
keyboard = [
[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(f"{e}", reply_markup=reply_markup)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
user_id = get_user_id(update)
room_id = get_room_id(update)
is_admin = BOUNTY_SERVICE.is_admin(room_id, user_id)
effective_user = update.effective_user
username = effective_user.username or effective_user.first_name or None
is_admin = BOUNTY_SERVICE.is_admin(room_id, username)
if is_admin:
lines = [
@@ -906,4 +1141,10 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE) -> None:
"/help — show this message",
]
await update.message.reply_text("\n".join(lines), disable_web_page_preview=True)
keyboard = [[InlineKeyboardButton("🗑️ Delete", callback_data=f"del_msg:{user_id}")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(lines),
disable_web_page_preview=True,
reply_markup=reply_markup,
)

View File

@@ -44,18 +44,18 @@ class RoomData:
The next_id field is used to generate unique bounty IDs within this room.
The timezone field stores the room's timezone (e.g., "Asia/Jakarta"), default UTC+0.
The admin_user_ids field lists users who have admin privileges in this room.
The admin_usernames field lists usernames who have admin privileges in this room.
"""
room_id: int
bounties: list[Bounty]
next_id: int
timezone: str | None = None
admin_user_ids: list[int] | None = None
admin_usernames: list[str] | None = None
def __post_init__(self):
if self.admin_user_ids is None:
self.admin_user_ids = []
if self.admin_usernames is None:
self.admin_usernames = []
@dataclass

View File

@@ -25,68 +25,79 @@ class BountyService:
def __init__(self, storage: RoomStorage):
self._storage = storage
def is_admin(self, room_id: int, user_id: int) -> bool:
"""Check if user is admin in a room."""
def is_admin(self, room_id: int, username: str | None) -> bool:
"""Check if user is admin in a room by username."""
if not username:
return False
room_data = self._storage.load(room_id)
if room_data is None:
return False
return user_id in (room_data.admin_user_ids or [])
return username in (room_data.admin_usernames or [])
def add_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Add an admin to a room. Requires admin permission, or self-promotion if first admin."""
room_data = self._storage.load(room_id)
has_no_admins = room_data is None or not room_data.admin_user_ids
is_self_promotion = requesting_user_id == admin_user_id
has_no_admins = room_data is None or not room_data.admin_usernames
is_self_promotion = requesting_username == username
if not self.is_admin(room_id, requesting_user_id):
if not self.is_admin(room_id, requesting_username):
if not (has_no_admins and is_self_promotion):
raise PermissionError("Only admins can add admins.")
if room_data is None or room_data.admin_user_ids is None:
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
if admin_user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids.append(admin_user_id)
self._storage.save(room_data)
admin_usernames = room_data.admin_usernames
if admin_usernames is None:
admin_usernames = []
room_data.admin_usernames = []
if username in admin_usernames:
raise ValueError(f"@{username} is already an admin.")
admin_usernames.append(username)
self._storage.save(room_data)
def remove_admin(
self, room_id: int, admin_user_id: int, requesting_user_id: int
self, room_id: int, username: str, requesting_username: str | None
) -> None:
"""Remove an admin from a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can remove admins.")
room_data = self._storage.load(room_id)
if room_data is None:
return
if room_data is None or not (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
if admin_user_id in (room_data.admin_user_ids or []):
(room_data.admin_user_ids or []).remove(admin_user_id)
self._storage.save(room_data)
if username not in (room_data.admin_usernames or []):
raise ValueError(f"@{username} is not an admin.")
def list_admins(self, room_id: int) -> list[int]:
"""List all admin user IDs in a room."""
(room_data.admin_usernames or []).remove(username)
self._storage.save(room_data)
def list_admins(self, room_id: int) -> list[str]:
"""List all admin usernames in a room."""
room_data = self._storage.load(room_id)
if room_data is None:
return []
return list(room_data.admin_user_ids or [])
return list(room_data.admin_usernames or [])
def set_timezone(
self, room_id: int, timezone: str, requesting_user_id: int
self, room_id: int, timezone: str, requesting_username: str | None
) -> None:
"""Set the timezone for a room. Requires admin permission."""
if not self.is_admin(room_id, requesting_user_id):
if not self.is_admin(room_id, requesting_username):
raise PermissionError("Only admins can set timezone.")
room_data = self._storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=1, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=1, admin_usernames=[]
)
room_data.timezone = timezone
@@ -121,13 +132,14 @@ class BountyService:
self,
room_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
created_by_username: Optional[str] = None,
) -> Bounty:
"""Add a new bounty to the room. Requires admin permission."""
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can add bounties.")
if not self.check_link_unique(room_id, link):
@@ -168,12 +180,12 @@ class BountyService:
return b
return None
def recover_bounty(self, room_id: int, bounty_id: int, user_id: int) -> str:
def recover_bounty(self, room_id: int, bounty_id: int, username: str | None) -> str:
"""Recover a soft-deleted bounty. Admin only.
Returns: 'recovered', 'not_found', 'not_deleted', 'permission_denied'
"""
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
return "permission_denied"
bounty = self.get_deleted_bounty(room_id, bounty_id)
@@ -187,7 +199,7 @@ class BountyService:
return "recovered"
def recover_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Recover multiple soft-deleted bounties. Admin only.
@@ -195,7 +207,7 @@ class BountyService:
"""
results = {}
for bounty_id in bounty_ids:
results[bounty_id] = self.recover_bounty(room_id, bounty_id, user_id)
results[bounty_id] = self.recover_bounty(room_id, bounty_id, username)
return results
def get_bounty(self, room_id: int, bounty_id: int) -> Bounty | None:
@@ -209,7 +221,7 @@ class BountyService:
self,
room_id: int,
bounty_id: int,
user_id: int,
username: str | None,
text: Optional[str] = None,
link: Optional[str] = None,
due_date_ts: Optional[int] = None,
@@ -220,7 +232,7 @@ class BountyService:
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can edit bounties.")
if link and not self.check_link_unique(
@@ -243,12 +255,12 @@ class BountyService:
self._storage.update_bounty(room_id, updated)
return True
def delete_bounty(self, room_id: int, bounty_id: int, user_id: int) -> bool:
def delete_bounty(self, room_id: int, bounty_id: int, username: str | None) -> bool:
"""Soft delete a bounty. Only admins can delete."""
bounty = self._storage.get_bounty(room_id, bounty_id)
if not bounty:
return False
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
raise PermissionError("Only admins can delete bounties.")
bounty.deleted_at = int(time.time())
@@ -256,7 +268,7 @@ class BountyService:
return True
def delete_bounties(
self, room_id: int, bounty_ids: list[int], user_id: int
self, room_id: int, bounty_ids: list[int], username: str | None
) -> dict[int, str]:
"""Soft delete multiple bounties. Returns dict of bounty_id -> result.
@@ -268,7 +280,7 @@ class BountyService:
if not bounty:
results[bounty_id] = "not_found"
continue
if not self.is_admin(room_id, user_id):
if not self.is_admin(room_id, username):
results[bounty_id] = "permission_denied"
continue

View File

@@ -122,7 +122,7 @@ class TestRoomData:
assert rd.bounties == []
assert rd.next_id == 1
assert rd.timezone is None
assert rd.admin_user_ids == []
assert rd.admin_usernames == []
def test_create_dm_room_data(self):
rd = RoomData(
@@ -134,7 +134,7 @@ class TestRoomData:
assert rd.bounties == []
assert rd.next_id == 1
assert rd.timezone is None
assert rd.admin_user_ids == []
assert rd.admin_usernames == []
def test_room_data_with_bounties(self):
b = Bounty(
@@ -156,18 +156,18 @@ class TestRoomData:
bounties=[],
next_id=1,
timezone="Asia/Jakarta",
admin_user_ids=[123, 456],
admin_usernames=["alice", "bob"],
)
assert rd.timezone == "Asia/Jakarta"
assert rd.admin_user_ids == [123, 456]
assert rd.admin_usernames == ["alice", "bob"]
def test_room_data_admin_user_ids_defaults_to_empty_list(self):
def test_room_data_admin_usernames_defaults_to_empty_list(self):
rd = RoomData(
room_id=-1001,
bounties=[],
next_id=1,
)
assert rd.admin_user_ids == []
assert rd.admin_usernames == []
class TestTrackingData:

View File

@@ -15,32 +15,34 @@ class TestBountyService:
"""Set up fresh storage and service for each test."""
self.storage = MockRoomStorage()
self.service = BountyService(self.storage)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
self.admin_username = "admin"
self._make_admin(-1001, self.admin_username)
def _make_admin(self, room_id: int, user_id: int):
def _make_admin(self, room_id: int, username: str):
"""Helper to set up a room with an admin user."""
room_data = self.storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
if username not in (room_data.admin_usernames or []):
room_data.admin_usernames = room_data.admin_usernames or []
room_data.admin_usernames.append(username)
self.storage.save(room_data)
def test_add_bounty_creates_room_if_not_exists(self):
"""Test that add_bounty creates a new room if it doesn't exist."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=self.admin_user_id,
user_id=123,
username=self.admin_username,
text="Fix bug",
link="https://github.com/issue/1",
created_by_username=self.admin_username,
)
assert bounty.id == 1
assert bounty.text == "Fix bug"
assert bounty.created_by_user_id == self.admin_user_id
assert bounty.created_by_user_id == 123
room = self.storage.load(-1001)
assert room is not None
@@ -49,13 +51,13 @@ class TestBountyService:
def test_add_bounty_increments_id(self):
"""Test that add_bounty increments bounty ID for each new bounty."""
b1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="First"
room_id=-1001, user_id=123, username=self.admin_username, text="First"
)
b2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
room_id=-1001, user_id=123, username=self.admin_username, text="Second"
)
b3 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Third"
room_id=-1001, user_id=123, username=self.admin_username, text="Third"
)
assert b1.id == 1
@@ -65,7 +67,9 @@ class TestBountyService:
def test_add_bounty_requires_admin(self):
"""Test that add_bounty raises PermissionError when non-admin tries to add."""
with pytest.raises(PermissionError, match="Only admins can add bounties"):
self.service.add_bounty(room_id=-1001, user_id=999, text="Not admin")
self.service.add_bounty(
room_id=-1001, user_id=999, username="nonadmin", text="Not admin"
)
def test_list_bounties_empty_room(self):
"""Test list_bounties returns empty list for non-existent room."""
@@ -74,14 +78,16 @@ class TestBountyService:
def test_list_bounties_returns_all_bounties(self):
"""Test list_bounties returns all bounties in a room."""
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="First")
self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Second"
room_id=-1001, user_id=123, username=self.admin_username, text="First"
)
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Second"
)
# Add bounty to different room to verify isolation
self._make_admin(-999, self.admin_user_id)
self._make_admin(-999, "otheradmin")
self.service.add_bounty(
room_id=-999, user_id=self.admin_user_id, text="Other room"
room_id=-999, user_id=456, username="otheradmin", text="Other room"
)
bounties = self.service.list_bounties(-1001)
@@ -91,7 +97,7 @@ class TestBountyService:
def test_get_bounty_found(self):
"""Test get_bounty returns bounty when it exists."""
created = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Test"
room_id=-1001, user_id=123, username=self.admin_username, text="Test"
)
found = self.service.get_bounty(-1001, created.id)
assert found is not None
@@ -104,19 +110,21 @@ class TestBountyService:
def test_get_bounty_wrong_room(self):
"""Test get_bounty returns None when bounty is in different room."""
self.service.add_bounty(room_id=-1001, user_id=self.admin_user_id, text="Test")
self.service.add_bounty(
room_id=-1001, user_id=123, username=self.admin_username, text="Test"
)
found = self.service.get_bounty(-999, 1) # room -999 doesn't have bounty 1
assert found is None
def test_update_bounty_success(self):
"""Test update_bounty succeeds when admin updates their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Original"
room_id=-1001, user_id=123, username=self.admin_username, text="Original"
)
result = self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=self.admin_user_id,
username=self.admin_username,
text="Updated",
)
assert result is True
@@ -126,13 +134,13 @@ class TestBountyService:
def test_update_bounty_not_admin_raises_permission_error(self):
"""Test update_bounty raises PermissionError when non-admin tries to update."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="Original"
room_id=-1001, user_id=123, username=self.admin_username, text="Original"
)
with pytest.raises(PermissionError, match="Only admins can edit bounties"):
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=999, # different user, not admin
username="nonadmin", # different user, not admin
text="Hacked",
)
@@ -141,7 +149,7 @@ class TestBountyService:
result = self.service.update_bounty(
room_id=-1001,
bounty_id=999,
user_id=self.admin_user_id,
username=self.admin_username,
text="Updated",
)
assert result is False
@@ -150,14 +158,15 @@ class TestBountyService:
"""Test update_bounty only updates provided fields."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=self.admin_user_id,
user_id=123,
username=self.admin_username,
text="Original",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=self.admin_user_id,
username=self.admin_username,
text="Updated only text",
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -168,14 +177,15 @@ class TestBountyService:
"""Test update_bounty can clear link."""
bounty = self.service.add_bounty(
room_id=-1001,
user_id=self.admin_user_id,
user_id=123,
username=self.admin_username,
text="Test",
link="https://original.link",
)
self.service.update_bounty(
room_id=-1001,
bounty_id=bounty.id,
user_id=self.admin_user_id,
username=self.admin_username,
clear_link=True,
)
updated = self.service.get_bounty(-1001, bounty.id)
@@ -184,9 +194,9 @@ class TestBountyService:
def test_delete_bounty_success(self):
"""Test delete_bounty soft deletes when admin deletes their bounty."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_user_id)
result = self.service.delete_bounty(-1001, bounty.id, self.admin_username)
assert result is True
# Soft delete - bounty should not be found via get_bounty
assert self.service.get_bounty(-1001, bounty.id) is None
@@ -198,28 +208,26 @@ class TestBountyService:
def test_delete_bounty_not_admin_raises_permission_error(self):
"""Test delete_bounty raises PermissionError when non-admin tries to delete."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
with pytest.raises(PermissionError, match="Only admins can delete bounties"):
self.service.delete_bounty(
-1001, bounty.id, 999
) # different user, not admin
self.service.delete_bounty(-1001, bounty.id, "nonadmin")
def test_delete_bounty_not_found(self):
"""Test delete_bounty returns False when bounty doesn't exist."""
result = self.service.delete_bounty(-1001, 999, self.admin_user_id)
result = self.service.delete_bounty(-1001, 999, self.admin_username)
assert result is False
def test_delete_bounties_multi_id_success(self):
"""Test delete_bounties returns individual results for multiple bounties."""
bounty1 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 1"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete 1"
)
bounty2 = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete 2"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete 2"
)
results = self.service.delete_bounties(
-1001, [bounty1.id, bounty2.id], self.admin_user_id
-1001, [bounty1.id, bounty2.id], self.admin_username
)
assert results == {bounty1.id: "deleted", bounty2.id: "deleted"}
# Verify both are soft deleted
@@ -229,22 +237,22 @@ class TestBountyService:
def test_delete_bounties_mixed_results(self):
"""Test delete_bounties returns not_found for non-existent bounties."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
results = self.service.delete_bounties(
-1001, [bounty.id, 999, 888], self.admin_user_id
-1001, [bounty.id, 999, 888], self.admin_username
)
assert results == {bounty.id: "deleted", 999: "not_found", 888: "not_found"}
def test_delete_bounties_permission_denied(self):
"""Test delete_bounties returns permission_denied for non-admin users."""
bounty = self.service.add_bounty(
room_id=-1001, user_id=self.admin_user_id, text="To delete"
room_id=-1001, user_id=123, username=self.admin_username, text="To delete"
)
results = self.service.delete_bounties(
-1001,
[bounty.id],
999, # non-admin user
"nonadmin", # non-admin user
)
assert results == {bounty.id: "permission_denied"}
# Verify bounty was NOT deleted
@@ -259,29 +267,31 @@ class TestTrackingService:
self.room_storage = MockRoomStorage()
self.tracking_storage = MockTrackingStorage()
self.service = TrackingService(self.tracking_storage, self.room_storage)
self.admin_user_id = 123
self._make_admin(-1001, self.admin_user_id)
self.admin_username = "admin"
self._make_admin(-1001, self.admin_username)
def _make_admin(self, room_id: int, user_id: int):
def _make_admin(self, room_id: int, username: str):
"""Helper to set up a room with an admin user."""
room_data = self.room_storage.load(room_id)
if room_data is None:
room_data = RoomData(
room_id=room_id, bounties=[], next_id=0, admin_user_ids=[]
room_id=room_id, bounties=[], next_id=0, admin_usernames=[]
)
if user_id not in (room_data.admin_user_ids or []):
room_data.admin_user_ids = room_data.admin_user_ids or []
room_data.admin_user_ids.append(user_id)
if username not in (room_data.admin_usernames or []):
room_data.admin_usernames = room_data.admin_usernames or []
room_data.admin_usernames.append(username)
self.room_storage.save(room_data)
def _add_bounty(self, room_id=-1001, user_id=123, text="Test bounty"):
def _add_bounty(self, room_id=-1001, username="admin", text="Test bounty"):
"""Helper to add a bounty for tracking tests."""
if self.room_storage.load(room_id) is None or user_id not in (
self.room_storage.load(room_id).admin_user_ids or []
if self.room_storage.load(room_id) is None or username not in (
self.room_storage.load(room_id).admin_usernames or []
):
self._make_admin(room_id, user_id)
self._make_admin(room_id, username)
bounty_service = BountyService(self.room_storage)
return bounty_service.add_bounty(room_id=room_id, user_id=user_id, text=text)
return bounty_service.add_bounty(
room_id=room_id, user_id=123, username=username, text=text
)
def test_track_bounty_success(self):
"""Test track_bounty successfully tracks a bounty."""
@@ -369,11 +379,13 @@ class TestTrackingService:
def test_get_tracked_bounties_ignores_deleted_bounties(self):
"""Test get_tracked_bounties ignores bounties that were deleted."""
bounty_service = BountyService(self.room_storage)
bounty = bounty_service.add_bounty(room_id=-1001, user_id=123, text="To delete")
bounty = bounty_service.add_bounty(
room_id=-1001, user_id=123, username="admin", text="To delete"
)
self.service.track_bounty(-1001, 123456, bounty.id)
# Delete the bounty
bounty_service.delete_bounty(-1001, bounty.id, 123)
bounty_service.delete_bounty(-1001, bounty.id, "admin")
tracked = self.service.get_tracked_bounties(-1001, 123456)
assert len(tracked) == 0 # deleted bounty not returned