2 Commits

Author SHA1 Message Date
shoko
27c8cb3597 Add security audit to polymarket-browse review
CRITICAL: Telegram bot token exposed in process command line
HIGH: HTML injection in Telegram messages
MEDIUM: Insufficient --search URL encoding
MEDIUM: No bounds check on --detail
MEDIUM: Potential DoS via large API response
LOW: Bare except: clauses
LOW: No API rate limiting

Includes fix recommendations and immediate actions for users.
2026-03-25 09:27:28 +00:00
shoko
4a33d6924e Add polymarket-browse skill review (2026-03-25)
- Deep analysis of SKILL.md and browse.py
- Line length analysis (worst: 209 chars at print_browse signature)
- Duplicate code patterns (3 time functions, 2 tradeable checkers)
- Bug findings (bare except:, unused variables, 11-param function)
- Recommendations for refactoring and unit testing
- Proposed test structure under tests/
- Summary table categorized by priority/effort
2026-03-25 09:12:05 +00:00
5 changed files with 158 additions and 453 deletions

1
.gitignore vendored
View File

@@ -2,4 +2,3 @@ __pycache__/
*.pyc *.pyc
*.pyo *.pyo
.DS_Store .DS_Store
.worktrees/

View File

@@ -4,13 +4,11 @@ Polymarket Event Browser
Browse tradeable Polymarket events by game category. Browse tradeable Polymarket events by game category.
""" """
import html import subprocess
import json import json
import time import time
import argparse import argparse
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from urllib.parse import urlencode
from urllib.request import urlopen, Request
# ============================================================ # ============================================================
# CONFIG # CONFIG
@@ -19,7 +17,6 @@ from urllib.request import urlopen, Request
PAGE_SIZE = 50 PAGE_SIZE = 50
MAX_RETRIES = 5 MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
GAME_CATEGORIES = { GAME_CATEGORIES = {
"All Esports": "Esports", "All Esports": "Esports",
@@ -222,79 +219,94 @@ def format_spread(bid, ask):
spread = ask - bid spread = ask - bid
return f"{prob_to_cents(spread)}c" return f"{prob_to_cents(spread)}c"
def get_match_time_status(e):
def _get_time_data(e, tz=None):
""" """
Unified time data extraction for event timestamps. Return a human-readable match time status.
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
Uses startTime (preferred) or startDate as the event start time. Uses startTime for actual match start time.
Datetime parsing and all relative calculations are UTC-based. Displays times in WIB (UTC+7 for Indonesian users).
The tz parameter only affects the abs_time formatting.
Args:
e: Event dict with 'startTime' or 'startDate' key.
tz: datetime.timezone for abs_time formatting.
Defaults to WIB (UTC+7).
Returns:
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
""" """
tz = tz or WIB # Use startTime for actual match start, not startDate (which is market creation time)
start_str = e.get("startTime") or e.get("startDate", "") start_str = e.get("startTime") or e.get("startDate", "")
if not start_str: if not start_str:
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"} return "TBD", 0
try: try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00')) start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc utc7 = timezone(timedelta(hours=7))
total_sec = delta.total_seconds() now = now_utc.astimezone(utc7)
start_utc7 = start_dt.astimezone(utc7)
if total_sec < 0: delta = start_dt - now_utc
# Event is in the past
hours_ago = abs(total_sec) / 3600 if delta.total_seconds() < 0:
# Started already
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1: if hours_ago < 1:
time_status = "LIVE" return "LIVE", 3
time_urgency = 3
elif hours_ago < 4: elif hours_ago < 4:
time_status = f"LIVE {int(hours_ago)}h" return f"LIVE {int(hours_ago)}h", 3
time_urgency = 3
elif hours_ago < 24: elif hours_ago < 24:
time_status = f"{int(hours_ago)}h ago" return f"Started {int(hours_ago)}h ago", 1
time_urgency = 1
else: else:
days = int(hours_ago / 24) days = int(hours_ago / 24)
time_status = f"{days}d ago" return f"{days}d ago", 0
time_urgency = 0
else: else:
# Event is in the future # Starts in future
if total_sec < 3600: hours_until = delta.total_seconds() / 3600
mins = int(total_sec / 60) if hours_until <= 0:
time_status = f"In {mins}m" return "LIVE", 3
time_urgency = 3 elif hours_until < 1:
elif total_sec < 86400: mins = int(delta.total_seconds() / 60)
hours_until = int(total_sec / 3600) return f"In {mins}m", 3
time_status = f"In {hours_until}h" elif hours_until < 24:
time_urgency = 2 return f"In {int(hours_until)}h", 2
else: else:
days = int(total_sec / 86400) days = int(hours_until / 24)
time_status = f"In {days}d" return f"In {days}d", 1
time_urgency = 1 except:
return "", 0
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ") def get_match_time_str(e):
if tz == WIB: """
abs_time += "WIB" Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
Uses startTime for actual match start time.
"""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD"
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
return "LIVE"
elif hours_ago < 4:
return f"LIVE {int(hours_ago)}h"
elif hours_ago < 24:
return f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
return f"{days}d ago"
else: else:
abs_time += start_dt.astimezone(tz).strftime("%Z") hours_until = delta.total_seconds() / 3600
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time} if hours_until <= 0:
except Exception: return "LIVE"
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"} elif hours_until < 1:
mins = int(delta.total_seconds() / 60)
return f"In {mins}m"
elif hours_until < 24:
return f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
return f"In {days}d"
except:
return ""
def filter_events(events, tradeable_only=True): def filter_events(events, tradeable_only=True):
""" """
@@ -313,7 +325,6 @@ def filter_events(events, tradeable_only=True):
return match_events, non_match_events return match_events, non_match_events
def sort_events(events): def sort_events(events):
return sorted(events, key=get_ml_volume, reverse=True) return sorted(events, key=get_ml_volume, reverse=True)
@@ -348,12 +359,12 @@ def format_event(e):
best_bid = float(ml.get("bestBid", 0)) if ml else 0 best_bid = float(ml.get("bestBid", 0)) if ml else 0
best_ask = float(ml.get("bestAsk", 0)) if ml else 0 best_ask = float(ml.get("bestAsk", 0)) if ml else 0
vol = get_ml_volume(e) vol = get_ml_volume(e)
td = _get_time_data(e) time_status, urgency = get_match_time_status(e)
return { return {
"title": e.get("title", ""), "title": e.get("title", ""),
"time_status": td["time_status"], "time_status": time_status,
"time_urgency": td["time_urgency"], "time_urgency": urgency,
"url": get_event_url(e), "url": get_event_url(e),
"livestream": e.get("resolutionSource"), "livestream": e.get("resolutionSource"),
"outcomes": outcomes, "outcomes": outcomes,
@@ -372,12 +383,11 @@ def format_detail_event(e):
] ]
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True) active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
td = _get_time_data(e) time_status, urgency = get_match_time_status(e)
return { return {
"title": e.get("title", ""), "title": e.get("title", ""),
"time_status": td["time_status"], "time_status": time_status,
"abs_time": td["abs_time"],
"url": get_event_url(e), "url": get_event_url(e),
"livestream": e.get("resolutionSource"), "livestream": e.get("resolutionSource"),
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [], "outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
@@ -404,6 +414,48 @@ def format_detail_event(e):
# DISPLAY # DISPLAY
# ============================================================ # ============================================================
def get_start_time_wib(e):
"""Return (date_time_str, relative_str) for display."""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD", ""
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
start_utc7 = start_dt.astimezone(utc7)
# Absolute: "Mar 25, 19:00 WIB"
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
# Relative: "In 5h", "In 10h", "LIVE", etc.
delta = start_dt - now_utc
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
rel_str = "LIVE"
elif hours_ago < 24:
rel_str = f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
rel_str = f"{days}d ago"
else:
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
rel_str = "LIVE"
elif hours_until < 1:
mins_until = int(delta.total_seconds() / 60)
rel_str = f"In {mins_until}m"
elif hours_until < 24:
rel_str = f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
rel_str = f"In {days}d"
return abs_str, rel_str
except:
return "TBD", ""
def get_header_date(): def get_header_date():
"""Return current date string like 'Mar 25, 2026'""" """Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
@@ -459,9 +511,7 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
vol = f["volume"] vol = f["volume"]
title = f["title"] title = f["title"]
url = f["url"] url = f["url"]
td = _get_time_data(e) start_time_wib, rel_time = get_start_time_wib(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
team_a = outcomes[0] if len(outcomes) > 0 else "?" team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?" team_b = outcomes[1] if len(outcomes) > 1 else "?"
@@ -489,9 +539,7 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
for i, e in enumerate(non_match_events[:non_matches_max], 1): for i, e in enumerate(non_match_events[:non_matches_max], 1):
title = e.get("title", "?") title = e.get("title", "?")
url = get_event_url(e) url = get_event_url(e)
td = _get_time_data(e) start_time_wib, rel_time = get_start_time_wib(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", [])) market_count = len(e.get("markets", []))
@@ -501,11 +549,17 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}") print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
def print_detail(e, detail): def print_detail(e, detail):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
print(f"\n{detail['title']}") print(f"\n{detail['title']}")
print(f"URL: {detail['url']}") print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}") print(f"Livestream: {detail['livestream']}")
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A" spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
time_str = get_match_time_str(e)
print(f"\n{detail['time_status']}") print(f"\n{detail['time_status']}")
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}") print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}") print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
@@ -522,44 +576,14 @@ def print_detail(e, detail):
# TELEGRAM # TELEGRAM
# ============================================================ # ============================================================
def escape_html(text):
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
def send_telegram_message(bot_token, chat_id, text, timeout=10):
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result.get('description')}")
return result["result"]["message_id"]
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False): def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment.""" """Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
import os import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") bot_token = os.environ.get("BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID") chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id: if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment") print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
return
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
@@ -572,8 +596,19 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
def send(text): def send(text):
msg_id = send_telegram_message(bot_token, chat_id, text) result = subprocess.run(
print(f" Sent msg {msg_id}") ["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
"-d", f"chat_id={chat_id}",
"-d", f"text={text}",
"-d", "parse_mode=HTML",
"-d", "disable_web_page_preview=true"],
capture_output=True
)
resp = json.loads(result.stdout.decode())
if resp.get("ok"):
print(f" Sent msg {resp['result']['message_id']}")
else:
print(f" Error: {resp.get('description')}")
# Build sections # Build sections
lines = [f"<b>{category.upper()}</b> | {header_date}"] lines = [f"<b>{category.upper()}</b> | {header_date}"]
@@ -592,16 +627,14 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
vol = get_ml_volume(e) vol = get_ml_volume(e)
title = e.get("title", "?") title = e.get("title", "?")
url = get_event_url(e) url = get_event_url(e)
td = _get_time_data(e) start_time_wib, rel_time = get_start_time_wib(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
team_a = outcomes[0] if len(outcomes) > 0 else "?" team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?" team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?" odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?" odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
tournament = get_tournament(title) tournament = get_tournament(title)
title_clean = title.split(" - ")[0].strip() if " - " in title else title title_clean = title.split(" - ")[0].strip() if " - " in title else title
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>") lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
lines.append(f" {start_time_wib} | {rel_time}") lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Vol: ${vol:,.0f}") lines.append(f" Vol: ${vol:,.0f}")
if tournament: if tournament:
@@ -619,12 +652,10 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
for i, e in enumerate(non_match_events, 1): for i, e in enumerate(non_match_events, 1):
title = e.get("title", "?") title = e.get("title", "?")
url = get_event_url(e) url = get_event_url(e)
td = _get_time_data(e) start_time_wib, rel_time = get_start_time_wib(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", [])) market_count = len(e.get("markets", []))
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>") lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
lines.append(f" {start_time_wib} | {rel_time}") lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}") lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
lines.append("") lines.append("")
@@ -706,7 +737,7 @@ def main():
parser.add_argument("--raw", action="store_true", parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).") help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true", parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).") help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
args = parser.parse_args() args = parser.parse_args()
if args.list_categories: if args.list_categories:

View File

@@ -1 +0,0 @@
# Tests package

View File

@@ -1,324 +0,0 @@
"""
Unit tests for browse.py Telegram functions.
Run with: python -m pytest tests/test_browse.py -v
"""
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from browse import send_telegram_message
class TestSendTelegramMessage(unittest.TestCase):
"""Tests for the module-level send_telegram_message function."""
@patch('browse.urlopen')
def test_send_success(self, mock_urlopen):
"""send_telegram_message returns message_id on success."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 123}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
result = send_telegram_message("test_token", "test_chat", "hello world")
self.assertEqual(result, 123)
mock_urlopen.assert_called_once()
call_args = mock_urlopen.call_args
req = call_args[0][0]
self.assertEqual(req.full_url, "https://api.telegram.org/bottest_token/sendMessage")
self.assertEqual(req.method, "POST")
@patch('browse.urlopen')
def test_send_api_error_raises_runtime_error(self, mock_urlopen):
"""send_telegram_message raises RuntimeError when Telegram API returns ok=false."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": false, "description": "Forbidden"}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
with self.assertRaises(RuntimeError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertIn("Telegram API error: Forbidden", str(ctx.exception))
@patch('browse.urlopen')
def test_send_invalid_token_raises_http_error(self, mock_urlopen):
"""send_telegram_message raises HTTPError on invalid token (404)."""
from urllib.error import HTTPError
mock_urlopen.side_effect = HTTPError(
url="https://api.telegram.org/botINVALID/sendMessage",
code=404,
msg="Not Found",
hdrs={},
fp=None
)
with self.assertRaises(HTTPError) as ctx:
send_telegram_message("INVALID", "test_chat", "hello")
self.assertEqual(ctx.exception.code, 404)
@patch('browse.urlopen')
def test_send_rate_limit_raises_http_error(self, mock_urlopen):
"""send_telegram_message raises HTTPError on rate limit (429)."""
from urllib.error import HTTPError
mock_urlopen.side_effect = HTTPError(
url="https://api.telegram.org/bottest_token/sendMessage",
code=429,
msg="Too Many Requests",
hdrs={},
fp=None
)
with self.assertRaises(HTTPError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertEqual(ctx.exception.code, 429)
@patch('browse.urlopen')
def test_send_network_error_raises_url_error(self, mock_urlopen):
"""send_telegram_message raises URLError on network failure."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("Connection refused")
with self.assertRaises(URLError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertIn("Connection refused", str(ctx.exception))
@patch('browse.urlopen')
def test_send_timeout_raises_url_error(self, mock_urlopen):
"""send_telegram_message raises URLError on timeout."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("<urlopen error TimeoutError: timed out>")
with self.assertRaises(URLError):
send_telegram_message("test_token", "test_chat", "hello")
@patch('browse.urlopen')
def test_send_custom_timeout_used(self, mock_urlopen):
"""send_telegram_message respects custom timeout parameter."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 456}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
send_telegram_message("test_token", "test_chat", "hello", timeout=30)
call_kwargs = mock_urlopen.call_args[1]
self.assertEqual(call_kwargs['timeout'], 30)
@patch('browse.urlopen')
def test_send_html_parsing_mode(self, mock_urlopen):
"""send_telegram_message sends with parse_mode=HTML."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 789}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
send_telegram_message("test_token", "test_chat", "<b>bold</b>")
call_args = mock_urlopen.call_args
req = call_args[0][0]
# Verify parse_mode=HTML is in the data
self.assertIn(b"parse_mode=HTML", req.data)
class TestHtmlInjection(unittest.TestCase):
"""Tests for HTML injection prevention in Telegram messages."""
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
@patch('browse.send_telegram_message')
def test_send_to_telegram_html_injection_in_match_title(self, mock_send_msg):
"""
titles in match events are NOT escaped before inserting into HTML.
This test FAILS if HTML chars are unescaped (vulnerable),
and PASSES once escape_html() is implemented.
"""
mock_send_msg.return_value = 123
# Simulate a Polymarket event with HTML injection in the title
malicious_event = {
"title": "<script>alert('XSS')</script> - Team A vs Team B",
"slug": "test-event",
"startTime": "2027-03-26T12:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A", "Team B"]',
"outcomePrices": "[0.55, 0.45]",
"bestBid": "0.54",
"bestAsk": "0.56",
"volume": 50000,
"acceptingOrders": True,
"closed": False,
}],
}
from browse import send_to_telegram
send_to_telegram([malicious_event], [], "Counter Strike")
# Check what was passed to send_telegram_message
self.assertEqual(mock_send_msg.called, True)
sent_text = mock_send_msg.call_args[0][2] # text arg (3rd positional)
# AFTER FIX: <script> should be escaped as &lt;script&gt;
# BEFORE FIX: raw <script> appears in text (vulnerable — test would fail here)
self.assertIn("&lt;script&gt;", sent_text,
"HTML injection still present — title may NOT be escaped")
self.assertIn("&lt;/script&gt;", sent_text)
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
@patch('browse.send_telegram_message')
def test_send_to_telegram_ampersand_in_title(self, mock_send_msg):
"""
Ampersands in titles should be escaped as &amp; when using HTML parse_mode.
BEFORE fix: "&" appears raw in the HTML (vulnerable).
AFTER fix: "&" appears as "&amp;".
"""
mock_send_msg.return_value = 123
event_with_ampersand = {
"title": "Team A & Team B vs Team C",
"slug": "amp-test",
"startTime": "2027-03-26T12:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A & Team B", "Team C"]',
"outcomePrices": "[0.50, 0.50]",
"bestBid": "0.49",
"bestAsk": "0.51",
"volume": 10000,
"acceptingOrders": True,
"closed": False,
}],
}
from browse import send_to_telegram
send_to_telegram([event_with_ampersand], [], "Dota 2")
sent_text = mock_send_msg.call_args[0][2]
# AFTER FIX: & should be escaped as &amp;
# BEFORE FIX: raw & appears (vulnerable — test would fail here)
self.assertIn("&amp;", sent_text,
"Ampersand not escaped — title may NOT be escaped")
class TestTimeFunctions(unittest.TestCase):
"""Tests for _get_time_data() unified helper.
These tests verify the helper returns correct time_status, time_urgency,
and abs_time for various event scenarios. Callers extract the fields they
need from the returned dict.
"""
def _make_event(self, start_time):
"""Helper to create a minimal event with a startTime."""
return {"startTime": start_time}
def _frozen_dt(self, year, month, day, hour, minute, second=0):
return datetime(year, month, day, hour, minute, second,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
"""Return a mock datetime class that freezes now() to the given datetime."""
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
# === _get_time_data core tests ===
def test_get_time_data_tbd(self):
"""No startTime -> TBD/0urgency/abs TBD."""
from browse import _get_time_data
td = _get_time_data({})
self.assertEqual(td["time_status"], "TBD")
self.assertEqual(td["time_urgency"], 0)
self.assertEqual(td["abs_time"], "TBD")
def test_get_time_data_in_30m(self):
"""Starts in 30 minutes -> 'In 30m', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:30:00Z"))
self.assertEqual(td["time_status"], "In 30m")
self.assertEqual(td["time_urgency"], 3)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_in_6h(self):
"""Starts in 6 hours -> 'In 6h', urgency 2."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T18:00:00Z"))
self.assertEqual(td["time_status"], "In 6h")
self.assertEqual(td["time_urgency"], 2)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_in_2d(self):
"""Starts in 2 days -> 'In 2d', urgency 1."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-27T12:00:00Z"))
self.assertEqual(td["time_status"], "In 2d")
self.assertEqual(td["time_urgency"], 1)
def test_get_time_data_live(self):
"""Started 30 minutes ago -> 'LIVE', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 12, 30, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "LIVE")
self.assertEqual(td["time_urgency"], 3)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_started_2h_ago(self):
"""Started 2 hours ago -> 'LIVE 2h', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 14, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "LIVE 2h")
self.assertEqual(td["time_urgency"], 3)
def test_get_time_data_started_12h_ago(self):
"""Started 12 hours ago -> '12h ago', urgency 1."""
frozen = self._frozen_dt(2026, 3, 26, 0, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "12h ago")
self.assertEqual(td["time_urgency"], 1)
def test_get_time_data_started_2d_ago(self):
"""Started 2 days ago -> '2d ago', urgency 0."""
frozen = self._frozen_dt(2026, 3, 27, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "2d ago")
self.assertEqual(td["time_urgency"], 0)
def test_get_time_data_abs_time_format(self):
"""abs_time is formatted correctly in WIB."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
# 19:00 UTC = 02:00 WIB next day
td = _get_time_data(self._make_event("2026-03-26T02:00:00Z"))
self.assertIn("WIB", td["abs_time"])
# UTC 12:00 -> WIB 19:00 same day
td2 = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
if __name__ == "__main__":
unittest.main()