2 Commits

Author SHA1 Message Date
shoko
27c8cb3597 Add security audit to polymarket-browse review
CRITICAL: Telegram bot token exposed in process command line
HIGH: HTML injection in Telegram messages
MEDIUM: Insufficient --search URL encoding
MEDIUM: No bounds check on --detail
MEDIUM: Potential DoS via large API response
LOW: Bare except: clauses
LOW: No API rate limiting

Includes fix recommendations and immediate actions for users.
2026-03-25 09:27:28 +00:00
shoko
4a33d6924e Add polymarket-browse skill review (2026-03-25)
- Deep analysis of SKILL.md and browse.py
- Line length analysis (worst: 209 chars at print_browse signature)
- Duplicate code patterns (3 time functions, 2 tradeable checkers)
- Bug findings (bare except:, unused variables, 11-param function)
- Recommendations for refactoring and unit testing
- Proposed test structure under tests/
- Summary table categorized by priority/effort
2026-03-25 09:12:05 +00:00
5 changed files with 257 additions and 1122 deletions

1
.gitignore vendored
View File

@@ -2,4 +2,3 @@ __pycache__/
*.pyc *.pyc
*.pyo *.pyo
.DS_Store .DS_Store
.worktrees/

View File

@@ -4,13 +4,11 @@ Polymarket Event Browser
Browse tradeable Polymarket events by game category. Browse tradeable Polymarket events by game category.
""" """
import html import subprocess
import json import json
import time import time
import argparse import argparse
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from urllib.parse import urlencode
from urllib.request import urlopen, Request
# ============================================================ # ============================================================
# CONFIG # CONFIG
@@ -19,7 +17,6 @@ from urllib.request import urlopen, Request
PAGE_SIZE = 50 PAGE_SIZE = 50
MAX_RETRIES = 5 MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
GAME_CATEGORIES = { GAME_CATEGORIES = {
"All Esports": "Esports", "All Esports": "Esports",
@@ -222,79 +219,94 @@ def format_spread(bid, ask):
spread = ask - bid spread = ask - bid
return f"{prob_to_cents(spread)}c" return f"{prob_to_cents(spread)}c"
def get_match_time_status(e):
def _get_time_data(e, tz=None):
""" """
Unified time data extraction for event timestamps. Return a human-readable match time status.
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
Uses startTime (preferred) or startDate as the event start time. Uses startTime for actual match start time.
Datetime parsing and all relative calculations are UTC-based. Displays times in WIB (UTC+7 for Indonesian users).
The tz parameter only affects the abs_time formatting.
Args:
e: Event dict with 'startTime' or 'startDate' key.
tz: datetime.timezone for abs_time formatting.
Defaults to WIB (UTC+7).
Returns:
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
""" """
tz = tz or WIB # Use startTime for actual match start, not startDate (which is market creation time)
start_str = e.get("startTime") or e.get("startDate", "") start_str = e.get("startTime") or e.get("startDate", "")
if not start_str: if not start_str:
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"} return "TBD", 0
try: try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00')) start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc utc7 = timezone(timedelta(hours=7))
total_sec = delta.total_seconds() now = now_utc.astimezone(utc7)
start_utc7 = start_dt.astimezone(utc7)
if total_sec < 0: delta = start_dt - now_utc
# Event is in the past
hours_ago = abs(total_sec) / 3600 if delta.total_seconds() < 0:
# Started already
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1: if hours_ago < 1:
time_status = "LIVE" return "LIVE", 3
time_urgency = 3
elif hours_ago < 4: elif hours_ago < 4:
time_status = f"LIVE {int(hours_ago)}h" return f"LIVE {int(hours_ago)}h", 3
time_urgency = 3
elif hours_ago < 24: elif hours_ago < 24:
time_status = f"{int(hours_ago)}h ago" return f"Started {int(hours_ago)}h ago", 1
time_urgency = 1
else: else:
days = int(hours_ago / 24) days = int(hours_ago / 24)
time_status = f"{days}d ago" return f"{days}d ago", 0
time_urgency = 0
else: else:
# Event is in the future # Starts in future
if total_sec < 3600: hours_until = delta.total_seconds() / 3600
mins = int(total_sec / 60) if hours_until <= 0:
time_status = f"In {mins}m" return "LIVE", 3
time_urgency = 3 elif hours_until < 1:
elif total_sec < 86400: mins = int(delta.total_seconds() / 60)
hours_until = int(total_sec / 3600) return f"In {mins}m", 3
time_status = f"In {hours_until}h" elif hours_until < 24:
time_urgency = 2 return f"In {int(hours_until)}h", 2
else: else:
days = int(total_sec / 86400) days = int(hours_until / 24)
time_status = f"In {days}d" return f"In {days}d", 1
time_urgency = 1 except:
return "", 0
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ") def get_match_time_str(e):
if tz == WIB: """
abs_time += "WIB" Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
else: Uses startTime for actual match start time.
abs_time += start_dt.astimezone(tz).strftime("%Z") """
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time} start_str = e.get("startTime") or e.get("startDate", "")
except Exception: if not start_str:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"} return "TBD"
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
return "LIVE"
elif hours_ago < 4:
return f"LIVE {int(hours_ago)}h"
elif hours_ago < 24:
return f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
return f"{days}d ago"
else:
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
return "LIVE"
elif hours_until < 1:
mins = int(delta.total_seconds() / 60)
return f"In {mins}m"
elif hours_until < 24:
return f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
return f"In {days}d"
except:
return ""
def filter_events(events, tradeable_only=True): def filter_events(events, tradeable_only=True):
""" """
@@ -313,7 +325,6 @@ def filter_events(events, tradeable_only=True):
return match_events, non_match_events return match_events, non_match_events
def sort_events(events): def sort_events(events):
return sorted(events, key=get_ml_volume, reverse=True) return sorted(events, key=get_ml_volume, reverse=True)
@@ -338,176 +349,7 @@ def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
} }
# ============================================================ # ============================================================
# FORMAT — EVENT # FORMAT
# ============================================================
def format_match_event(e):
"""
Format a match event into a canonical dict for rendering.
All computing done here; renderers just template.
Returns:
{
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
"""
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
td = _get_time_data(e)
title = e.get("title", "")
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
if " - " in title:
title_clean = title.split(" - ")[0].strip()
else:
title_clean = title
tournament = get_tournament(title)
return {
"title": title,
"title_clean": title_clean,
"tournament": tournament,
"url": get_event_url(e),
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"abs_time": td["abs_time"],
"team_a": team_a,
"team_b": team_b,
"odds_a": odds_a,
"odds_b": odds_b,
"vol": get_ml_volume(e),
}
def format_non_match_event(e):
"""
Format a non-match event into a canonical dict for rendering.
Returns:
{
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
"""
td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
return {
"title": e.get("title", "?"),
"url": get_event_url(e),
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"abs_time": td["abs_time"],
"market_count": market_count,
"total_vol": int(total_vol),
}
# ============================================================
# FORMAT — RENDER
# ============================================================
def render_match_lines(event_dict, i, mode):
"""
Render a formatted match event dict into lines of text.
Args:
event_dict: canonical dict from format_match_event()
i: 1-based index for the event number
mode: "text" for plain text/Markdown, "html" for Telegram HTML
Returns:
List[str], one line per element (no trailing blank line).
Caller adds the blank line separator between events.
"""
title_clean = event_dict["title_clean"]
url = event_dict["url"]
abs_time = event_dict["abs_time"]
time_status = event_dict["time_status"]
vol = event_dict["vol"]
tournament = event_dict["tournament"]
team_a = event_dict["team_a"]
team_b = event_dict["team_b"]
odds_a = event_dict["odds_a"]
odds_b = event_dict["odds_b"]
lines = []
if mode == "html":
lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
else:
lines.append(f"{i}. [{title_clean}]({url})")
lines.append(f" {abs_time} | {time_status}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
lines.append(f" Tournament: {tournament}")
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
return lines
def render_non_match_lines(event_dict, i, mode):
"""
Render a formatted non-match event dict into lines of text.
Args:
event_dict: canonical dict from format_non_match_event()
i: 1-based index for the event number
mode: "text" for plain text/Markdown, "html" for Telegram HTML
Returns:
List[str], one line per element (no trailing blank line).
"""
title = event_dict["title"]
url = event_dict["url"]
abs_time = event_dict["abs_time"]
time_status = event_dict["time_status"]
market_count = event_dict["market_count"]
total_vol = event_dict["total_vol"]
lines = []
if mode == "html":
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
else:
lines.append(f"{i}. [{title}]({url})")
lines.append(f" {abs_time} | {time_status}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
return lines
# ============================================================
# FORMAT — LEGACY
# ============================================================ # ============================================================
def format_event(e): def format_event(e):
@@ -517,12 +359,12 @@ def format_event(e):
best_bid = float(ml.get("bestBid", 0)) if ml else 0 best_bid = float(ml.get("bestBid", 0)) if ml else 0
best_ask = float(ml.get("bestAsk", 0)) if ml else 0 best_ask = float(ml.get("bestAsk", 0)) if ml else 0
vol = get_ml_volume(e) vol = get_ml_volume(e)
td = _get_time_data(e) time_status, urgency = get_match_time_status(e)
return { return {
"title": e.get("title", ""), "title": e.get("title", ""),
"time_status": td["time_status"], "time_status": time_status,
"time_urgency": td["time_urgency"], "time_urgency": urgency,
"url": get_event_url(e), "url": get_event_url(e),
"livestream": e.get("resolutionSource"), "livestream": e.get("resolutionSource"),
"outcomes": outcomes, "outcomes": outcomes,
@@ -541,12 +383,11 @@ def format_detail_event(e):
] ]
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True) active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
td = _get_time_data(e) time_status, urgency = get_match_time_status(e)
return { return {
"title": e.get("title", ""), "title": e.get("title", ""),
"time_status": td["time_status"], "time_status": time_status,
"abs_time": td["abs_time"],
"url": get_event_url(e), "url": get_event_url(e),
"livestream": e.get("resolutionSource"), "livestream": e.get("resolutionSource"),
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [], "outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
@@ -573,6 +414,48 @@ def format_detail_event(e):
# DISPLAY # DISPLAY
# ============================================================ # ============================================================
def get_start_time_wib(e):
"""Return (date_time_str, relative_str) for display."""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD", ""
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
start_utc7 = start_dt.astimezone(utc7)
# Absolute: "Mar 25, 19:00 WIB"
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
# Relative: "In 5h", "In 10h", "LIVE", etc.
delta = start_dt - now_utc
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
rel_str = "LIVE"
elif hours_ago < 24:
rel_str = f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
rel_str = f"{days}d ago"
else:
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
rel_str = "LIVE"
elif hours_until < 1:
mins_until = int(delta.total_seconds() / 60)
rel_str = f"In {mins_until}m"
elif hours_until < 24:
rel_str = f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
rel_str = f"In {days}d"
return abs_str, rel_str
except:
return "TBD", ""
def get_header_date(): def get_header_date():
"""Return current date string like 'Mar 25, 2026'""" """Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
@@ -603,8 +486,9 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
if partial: if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete") print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
# Determine sections to show # --- MATCH MARKETS ---
if not matches_only and not non_matches_only: if not matches_only and not non_matches_only:
# Default: show both
show_matches = True show_matches = True
show_non_matches = True show_non_matches = True
elif matches_only: elif matches_only:
@@ -614,31 +498,68 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
show_matches = False show_matches = False
show_non_matches = True show_non_matches = True
# Match events
if show_matches: if show_matches:
print("\nMATCH MARKETS") print(f"\nMATCH MARKETS")
if not match_events: if not match_events:
print(" No match markets found.") print(" No match markets found.")
else: else:
for i, e in enumerate(match_events, 1): for i, e in enumerate(match_events, 1):
fd = format_match_event(e) f = format_event(e)
for line in render_match_lines(fd, i, mode="text"): ml = get_ml_market(e)
print(line) outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
vol = f["volume"]
title = f["title"]
url = f["url"]
start_time_wib, rel_time = get_start_time_wib(e)
# Non-match events team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
if " - " in title:
title_clean = title.split(" - ")[0].strip()
else:
title_clean = title
tournament = get_tournament(title)
print(f"\n {i}. [{title_clean}]({url})")
print(f" {start_time_wib} | {rel_time}")
print(f" Vol: ${vol:,.0f}")
if tournament:
print(f" Tournament: {tournament}")
print(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
# --- NON-MATCH MARKETS ---
if show_non_matches and non_match_events: if show_non_matches and non_match_events:
print("\nNON-MATCH MARKETS") print(f"\nNON-MATCH MARKETS")
for i, e in enumerate(non_match_events[:non_matches_max], 1): for i, e in enumerate(non_match_events[:non_matches_max], 1):
fd = format_non_match_event(e) title = e.get("title", "?")
for line in render_non_match_lines(fd, i, mode="text"): url = get_event_url(e)
print(line) start_time_wib, rel_time = get_start_time_wib(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
print(f"\n {i}. [{title}]({url})")
print(f" {start_time_wib} | {rel_time}")
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
def print_detail(e, detail): def print_detail(e, detail):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
print(f"\n{detail['title']}") print(f"\n{detail['title']}")
print(f"URL: {detail['url']}") print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}") print(f"Livestream: {detail['livestream']}")
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A" spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
time_str = get_match_time_str(e)
print(f"\n{detail['time_status']}") print(f"\n{detail['time_status']}")
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}") print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}") print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
@@ -655,44 +576,14 @@ def print_detail(e, detail):
# TELEGRAM # TELEGRAM
# ============================================================ # ============================================================
def escape_html(text):
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
def send_telegram_message(bot_token, chat_id, text, timeout=10):
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result.get('description')}")
return result["result"]["message_id"]
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False): def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment.""" """Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
import os import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") bot_token = os.environ.get("BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID") chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id: if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment") print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
return
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
@@ -705,71 +596,92 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
def send(text): def send(text):
msg_id = send_telegram_message(bot_token, chat_id, text) result = subprocess.run(
print(f" Sent msg {msg_id}") ["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
"-d", f"chat_id={chat_id}",
"-d", f"text={text}",
"-d", "parse_mode=HTML",
"-d", "disable_web_page_preview=true"],
capture_output=True
)
resp = json.loads(result.stdout.decode())
if resp.get("ok"):
print(f" Sent msg {resp['result']['message_id']}")
else:
print(f" Error: {resp.get('description')}")
# Build lines # Build sections
lines = [f"<b>{category.upper()}</b> | {header_date}", ""] lines = [f"<b>{category.upper()}</b> | {header_date}"]
lines.append("")
if show_matches: if show_matches:
lines += ["MATCH MARKETS", ""] lines.append("MATCH MARKETS")
lines.append("")
if not match_events: if not match_events:
lines.append(" No match markets found.") lines.append(" No match markets found.")
else: else:
for i, e in enumerate(match_events, 1): for i, e in enumerate(match_events, 1):
fd = format_match_event(e) ml = get_ml_market(e)
lines += render_match_lines(fd, i, mode="html") outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
vol = get_ml_volume(e)
title = e.get("title", "?")
url = get_event_url(e)
start_time_wib, rel_time = get_start_time_wib(e)
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
tournament = get_tournament(title)
title_clean = title.split(" - ")[0].strip() if " - " in title else title
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
lines.append(f" Tournament: {tournament}")
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
lines.append("") lines.append("")
lines.append("") lines.append("")
if show_non_matches: if show_non_matches:
lines += ["NON-MATCH MARKETS", ""] lines.append("NON-MATCH MARKETS")
lines.append("")
if not non_match_events: if not non_match_events:
lines.append(" No non-match markets found.") lines.append(" No non-match markets found.")
else: else:
for i, e in enumerate(non_match_events, 1): for i, e in enumerate(non_match_events, 1):
fd = format_non_match_event(e) title = e.get("title", "?")
lines += render_non_match_lines(fd, i, mode="html") url = get_event_url(e)
lines.append("") start_time_wib, rel_time = get_start_time_wib(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
lines.append("") lines.append("")
# Chunk and send # Chunk by 10 items (events), respecting 4096 char Telegram limit
send_chunked(lines, send, category, header_date, show_matches, show_non_matches) text = "\n".join(lines)
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
"""
Split already-built lines into Telegram-safe chunks and send them.
Telegram messages are capped at 4096 chars. Chunks are grouped by
section header so no event is split across messages.
Args:
all_lines: Full message lines list (built by caller).
send_fn: Closure that sends a single string and prints confirmation.
category: Category name for header.
header_date: Date string for header.
show_matches: Whether MATCH MARKETS section is present.
show_non_matches: Whether NON-MATCH MARKETS section is present.
"""
text = "\n".join(all_lines)
if len(text) <= 4096: if len(text) <= 4096:
send_fn(text) send(text)
return return
# Split into chunks of 10 events, respecting section headers # Split into chunks of 10 events
all_items = [] all_items = []
in_match = True in_match = True
for line in all_lines: for line in lines:
if line == "MATCH MARKETS": if line == "MATCH MARKETS":
in_match = True in_match = True
elif line == "NON-MATCH MARKETS": elif line == "NON-MATCH MARKETS":
in_match = False in_match = False
elif line.startswith("<b>") and "</a>" in line: elif line.startswith("<b>") and ". " in line and "</a>" in line:
# Event title line: <b>1.</b> <a href="...">Title</a>
all_items.append((in_match, line)) all_items.append((in_match, line))
chunk = [] chunk = []
chunk_len = 0
chunk_num = 1
# Header is always first
header = f"<b>{category.upper()}</b> | {header_date}\n" header = f"<b>{category.upper()}</b> | {header_date}\n"
if show_matches: if show_matches:
header += "\nMATCH MARKETS\n\n" header += "\nMATCH MARKETS\n\n"
@@ -780,8 +692,9 @@ def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_n
test_chunk = chunk + [item_line, ""] test_chunk = chunk + [item_line, ""]
test_text = header + "\n".join(chunk) + "\n".join(test_chunk) test_text = header + "\n".join(chunk) + "\n".join(test_chunk)
if len(test_text) > 4096 or len(chunk) >= 10: if len(test_text) > 4096 or len(chunk) >= 10:
# Send current chunk
msg = header + "\n".join(chunk) msg = header + "\n".join(chunk)
send_fn(msg) send(msg)
chunk = [item_line, ""] chunk = [item_line, ""]
header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n" header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n"
if show_matches and is_match: if show_matches and is_match:
@@ -793,7 +706,7 @@ def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_n
if chunk: if chunk:
msg = header + "\n".join(chunk) msg = header + "\n".join(chunk)
send_fn(msg) send(msg)
# ============================================================ # ============================================================
@@ -824,7 +737,7 @@ def main():
parser.add_argument("--raw", action="store_true", parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).") help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true", parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).") help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
args = parser.parse_args() args = parser.parse_args()
if args.list_categories: if args.list_categories:

View File

@@ -1 +0,0 @@
# Tests package

View File

@@ -1,776 +0,0 @@
"""
Unit tests for browse.py Telegram functions.
Run with: python -m pytest tests/test_browse.py -v
"""
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from browse import send_telegram_message
class TestSendTelegramMessage(unittest.TestCase):
"""Tests for the module-level send_telegram_message function."""
@patch('browse.urlopen')
def test_send_success(self, mock_urlopen):
"""send_telegram_message returns message_id on success."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 123}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
result = send_telegram_message("test_token", "test_chat", "hello world")
self.assertEqual(result, 123)
mock_urlopen.assert_called_once()
call_args = mock_urlopen.call_args
req = call_args[0][0]
self.assertEqual(req.full_url, "https://api.telegram.org/bottest_token/sendMessage")
self.assertEqual(req.method, "POST")
@patch('browse.urlopen')
def test_send_api_error_raises_runtime_error(self, mock_urlopen):
"""send_telegram_message raises RuntimeError when Telegram API returns ok=false."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": false, "description": "Forbidden"}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
with self.assertRaises(RuntimeError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertIn("Telegram API error: Forbidden", str(ctx.exception))
@patch('browse.urlopen')
def test_send_invalid_token_raises_http_error(self, mock_urlopen):
"""send_telegram_message raises HTTPError on invalid token (404)."""
from urllib.error import HTTPError
mock_urlopen.side_effect = HTTPError(
url="https://api.telegram.org/botINVALID/sendMessage",
code=404,
msg="Not Found",
hdrs={},
fp=None
)
with self.assertRaises(HTTPError) as ctx:
send_telegram_message("INVALID", "test_chat", "hello")
self.assertEqual(ctx.exception.code, 404)
@patch('browse.urlopen')
def test_send_rate_limit_raises_http_error(self, mock_urlopen):
"""send_telegram_message raises HTTPError on rate limit (429)."""
from urllib.error import HTTPError
mock_urlopen.side_effect = HTTPError(
url="https://api.telegram.org/bottest_token/sendMessage",
code=429,
msg="Too Many Requests",
hdrs={},
fp=None
)
with self.assertRaises(HTTPError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertEqual(ctx.exception.code, 429)
@patch('browse.urlopen')
def test_send_network_error_raises_url_error(self, mock_urlopen):
"""send_telegram_message raises URLError on network failure."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("Connection refused")
with self.assertRaises(URLError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertIn("Connection refused", str(ctx.exception))
@patch('browse.urlopen')
def test_send_timeout_raises_url_error(self, mock_urlopen):
"""send_telegram_message raises URLError on timeout."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("<urlopen error TimeoutError: timed out>")
with self.assertRaises(URLError):
send_telegram_message("test_token", "test_chat", "hello")
@patch('browse.urlopen')
def test_send_custom_timeout_used(self, mock_urlopen):
"""send_telegram_message respects custom timeout parameter."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 456}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
send_telegram_message("test_token", "test_chat", "hello", timeout=30)
call_kwargs = mock_urlopen.call_args[1]
self.assertEqual(call_kwargs['timeout'], 30)
@patch('browse.urlopen')
def test_send_html_parsing_mode(self, mock_urlopen):
"""send_telegram_message sends with parse_mode=HTML."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 789}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
send_telegram_message("test_token", "test_chat", "<b>bold</b>")
call_args = mock_urlopen.call_args
req = call_args[0][0]
# Verify parse_mode=HTML is in the data
self.assertIn(b"parse_mode=HTML", req.data)
class TestHtmlInjection(unittest.TestCase):
"""Tests for HTML injection prevention in Telegram messages."""
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
@patch('browse.send_telegram_message')
def test_send_to_telegram_html_injection_in_match_title(self, mock_send_msg):
"""
titles in match events are NOT escaped before inserting into HTML.
This test FAILS if HTML chars are unescaped (vulnerable),
and PASSES once escape_html() is implemented.
"""
mock_send_msg.return_value = 123
# Simulate a Polymarket event with HTML injection in the title
malicious_event = {
"title": "<script>alert('XSS')</script> - Team A vs Team B",
"slug": "test-event",
"startTime": "2027-03-26T12:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A", "Team B"]',
"outcomePrices": "[0.55, 0.45]",
"bestBid": "0.54",
"bestAsk": "0.56",
"volume": 50000,
"acceptingOrders": True,
"closed": False,
}],
}
from browse import send_to_telegram
send_to_telegram([malicious_event], [], "Counter Strike")
# Check what was passed to send_telegram_message
self.assertEqual(mock_send_msg.called, True)
sent_text = mock_send_msg.call_args[0][2] # text arg (3rd positional)
# AFTER FIX: <script> should be escaped as &lt;script&gt;
# BEFORE FIX: raw <script> appears in text (vulnerable — test would fail here)
self.assertIn("&lt;script&gt;", sent_text,
"HTML injection still present — title may NOT be escaped")
self.assertIn("&lt;/script&gt;", sent_text)
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
@patch('browse.send_telegram_message')
def test_send_to_telegram_ampersand_in_title(self, mock_send_msg):
"""
Ampersands in titles should be escaped as &amp; when using HTML parse_mode.
BEFORE fix: "&" appears raw in the HTML (vulnerable).
AFTER fix: "&" appears as "&amp;".
"""
mock_send_msg.return_value = 123
event_with_ampersand = {
"title": "Team A & Team B vs Team C",
"slug": "amp-test",
"startTime": "2027-03-26T12:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A & Team B", "Team C"]',
"outcomePrices": "[0.50, 0.50]",
"bestBid": "0.49",
"bestAsk": "0.51",
"volume": 10000,
"acceptingOrders": True,
"closed": False,
}],
}
from browse import send_to_telegram
send_to_telegram([event_with_ampersand], [], "Dota 2")
sent_text = mock_send_msg.call_args[0][2]
# AFTER FIX: & should be escaped as &amp;
# BEFORE FIX: raw & appears (vulnerable — test would fail here)
self.assertIn("&amp;", sent_text,
"Ampersand not escaped — title may NOT be escaped")
class TestTimeFunctions(unittest.TestCase):
"""Tests for _get_time_data() unified helper.
These tests verify the helper returns correct time_status, time_urgency,
and abs_time for various event scenarios. Callers extract the fields they
need from the returned dict.
"""
def _make_event(self, start_time):
"""Helper to create a minimal event with a startTime."""
return {"startTime": start_time}
def _frozen_dt(self, year, month, day, hour, minute, second=0):
return datetime(year, month, day, hour, minute, second,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
"""Return a mock datetime class that freezes now() to the given datetime."""
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
# === _get_time_data core tests ===
def test_get_time_data_tbd(self):
"""No startTime -> TBD/0urgency/abs TBD."""
from browse import _get_time_data
td = _get_time_data({})
self.assertEqual(td["time_status"], "TBD")
self.assertEqual(td["time_urgency"], 0)
self.assertEqual(td["abs_time"], "TBD")
def test_get_time_data_in_30m(self):
"""Starts in 30 minutes -> 'In 30m', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:30:00Z"))
self.assertEqual(td["time_status"], "In 30m")
self.assertEqual(td["time_urgency"], 3)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_in_6h(self):
"""Starts in 6 hours -> 'In 6h', urgency 2."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T18:00:00Z"))
self.assertEqual(td["time_status"], "In 6h")
self.assertEqual(td["time_urgency"], 2)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_in_2d(self):
"""Starts in 2 days -> 'In 2d', urgency 1."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-27T12:00:00Z"))
self.assertEqual(td["time_status"], "In 2d")
self.assertEqual(td["time_urgency"], 1)
def test_get_time_data_live(self):
"""Started 30 minutes ago -> 'LIVE', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 12, 30, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "LIVE")
self.assertEqual(td["time_urgency"], 3)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_started_2h_ago(self):
"""Started 2 hours ago -> 'LIVE 2h', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 14, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "LIVE 2h")
self.assertEqual(td["time_urgency"], 3)
def test_get_time_data_started_12h_ago(self):
"""Started 12 hours ago -> '12h ago', urgency 1."""
frozen = self._frozen_dt(2026, 3, 26, 0, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "12h ago")
self.assertEqual(td["time_urgency"], 1)
def test_get_time_data_started_2d_ago(self):
"""Started 2 days ago -> '2d ago', urgency 0."""
frozen = self._frozen_dt(2026, 3, 27, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "2d ago")
self.assertEqual(td["time_urgency"], 0)
def test_get_time_data_abs_time_format(self):
"""abs_time is formatted correctly in WIB."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
# 19:00 UTC = 02:00 WIB next day
td = _get_time_data(self._make_event("2026-03-26T02:00:00Z"))
self.assertIn("WIB", td["abs_time"])
# UTC 12:00 -> WIB 19:00 same day
td2 = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
class TestFormatMatchEvent(unittest.TestCase):
"""Tests for format_match_event() canonical dict."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
def _make_event(self, title, ml_market=None, start_time="2026-03-25T18:00:00Z"):
import json as _json
e = {
"title": title,
"slug": "test-slug",
"startTime": start_time,
"markets": [],
}
if ml_market:
e["markets"].append(ml_market)
return e
def _make_ml_market(self, outcomes, prices, vol=50000):
import json
return {
"sportsMarketType": "moneyline",
"outcomes": json.dumps(outcomes),
"outcomePrices": json.dumps(prices),
"bestBid": str(float(prices[0]) - 0.01) if prices else "0.49",
"bestAsk": str(float(prices[0]) + 0.01) if prices else "0.51",
"volume": str(vol),
"acceptingOrders": True,
"closed": False,
}
def test_fields_present(self):
"""All canonical fields are present and non-null."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B - ESL Pro League",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertIn("title", fd)
self.assertIn("title_clean", fd)
self.assertIn("tournament", fd)
self.assertIn("url", fd)
self.assertIn("time_status", fd)
self.assertIn("time_urgency", fd)
self.assertIn("abs_time", fd)
self.assertIn("team_a", fd)
self.assertIn("team_b", fd)
self.assertIn("odds_a", fd)
self.assertIn("odds_b", fd)
self.assertIn("vol", fd)
def test_title_clean_no_tournament(self):
"""title_clean strips tournament suffix after ' - '."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B - ESL Pro League",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
self.assertEqual(fd["tournament"], "ESL Pro League")
def test_title_clean_no_dash(self):
"""title_clean is unchanged when no ' - ' separator."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
self.assertEqual(fd["tournament"], "")
def test_missing_ml(self):
"""Returns valid dict with '?' fallbacks when no ML market."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event("Team A vs Team B")
fd = format_match_event(e)
self.assertEqual(fd["team_a"], "?")
self.assertEqual(fd["team_b"], "?")
self.assertEqual(fd["odds_a"], "?")
self.assertEqual(fd["odds_b"], "?")
self.assertEqual(fd["vol"], 0)
def test_missing_outcomes(self):
"""Handles empty outcomes list gracefully."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Team A vs Team B",
self._make_ml_market([], []),
)
fd = format_match_event(e)
self.assertEqual(fd["team_a"], "?")
self.assertEqual(fd["team_b"], "?")
def test_time_data_passed_through(self):
"""Time fields come from _get_time_data."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Team A vs Team B",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
start_time="2026-03-25T18:00:00Z", # 6h in future
)
fd = format_match_event(e)
self.assertEqual(fd["time_status"], "In 6h")
self.assertEqual(fd["time_urgency"], 2)
self.assertIn("WIB", fd["abs_time"])
class TestFormatNonMatchEvent(unittest.TestCase):
"""Tests for format_non_match_event() canonical dict."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
def test_fields_present(self):
"""All canonical fields are present."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Will it rain in Jakarta?",
"slug": "rain-jakarta",
"startTime": "2026-03-25T18:00:00Z",
"markets": [
{"volume": "10000"},
{"volume": "5000"},
],
}
fd = format_non_match_event(e)
self.assertIn("title", fd)
self.assertIn("url", fd)
self.assertIn("time_status", fd)
self.assertIn("time_urgency", fd)
self.assertIn("abs_time", fd)
self.assertIn("market_count", fd)
self.assertIn("total_vol", fd)
def test_market_stats(self):
"""market_count and total_vol computed correctly."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Test",
"slug": "test",
"startTime": "2026-03-25T18:00:00Z",
"markets": [
{"volume": "10000"},
{"volume": "5000"},
],
}
fd = format_non_match_event(e)
self.assertEqual(fd["market_count"], 2)
self.assertEqual(fd["total_vol"], 15000)
def test_time_passed_through(self):
"""Time fields come from _get_time_data."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Test",
"slug": "test",
"startTime": "2026-03-25T18:00:00Z",
"markets": [],
}
fd = format_non_match_event(e)
self.assertEqual(fd["time_status"], "In 6h")
class TestRenderMatchLines(unittest.TestCase):
"""Tests for render_match_lines() text and HTML output."""
def test_text_mode_exact_lines(self):
"""text mode produces expected plain text lines."""
from browse import render_match_lines
fd = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 1, mode="text")
self.assertEqual(lines[0], "1. [Team A vs Team B](https://polymarket.com/market/test)")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Vol: $50,000")
self.assertEqual(lines[3], " Tournament: ESL Pro League")
self.assertEqual(lines[4], " Odds: Team A 55c | 45c Team B")
def test_text_mode_no_tournament(self):
"""text mode omits Tournament line when tournament is empty."""
from browse import render_match_lines
fd = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 2, mode="text")
self.assertEqual(len(lines), 4)
self.assertEqual(lines[0], "2. [Team A vs Team B](https://polymarket.com/market/test)")
self.assertNotIn("Tournament", lines[3])
def test_html_mode_exact(self):
"""html mode produces expected HTML lines with escape_html."""
from browse import render_match_lines
fd = {
"title_clean": "Team A & Team B vs Team C",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "LIVE",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A & Team B",
"team_b": "Team C",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 1, mode="html")
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/market/test\">Team A &amp; Team B vs Team C</a>")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | LIVE")
self.assertEqual(lines[2], " Vol: $50,000")
self.assertEqual(lines[3], " Tournament: ESL Pro League")
self.assertEqual(lines[4], " Odds: Team A & Team B 55c | 45c Team C")
def test_html_mode_xss_prevention(self):
"""html mode escapes < and > to prevent XSS."""
from browse import render_match_lines
fd = {
"title_clean": "<script>alert('xss')</script>",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "LIVE",
"vol": 1000,
"tournament": "",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "50c",
"odds_b": "50c",
}
lines = render_match_lines(fd, 1, mode="html")
self.assertIn("&lt;script&gt;", lines[0])
self.assertNotIn("<script>", lines[0])
class TestRenderNonMatchLines(unittest.TestCase):
"""Tests for render_non_match_lines() text and HTML output."""
def test_text_mode_exact_lines(self):
"""text mode produces expected plain text lines."""
from browse import render_non_match_lines
fd = {
"title": "Will it rain in Jakarta?",
"url": "https://polymarket.com/event/rain-jakarta",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"market_count": 3,
"total_vol": 25000,
}
lines = render_non_match_lines(fd, 1, mode="text")
self.assertEqual(lines[0], "1. [Will it rain in Jakarta?](https://polymarket.com/event/rain-jakarta)")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Markets: 3 | Total Vol: $25,000")
def test_html_mode_exact(self):
"""html mode produces expected HTML lines with escape_html."""
from browse import render_non_match_lines
fd = {
"title": "Rain <or> Sun?",
"url": "https://polymarket.com/event/rain-sun",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"market_count": 2,
"total_vol": 10000,
}
lines = render_non_match_lines(fd, 1, mode="html")
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/event/rain-sun\">Rain &lt;or&gt; Sun?</a>")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Markets: 2 | Total Vol: $10,000")
class TestPrintBrowseIntegration(unittest.TestCase):
"""Integration tests for print_browse using the new pipeline."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
@patch('builtins.print')
def test_print_browse_uses_new_pipeline(self, mock_print):
"""print_browse calls format_match_event and render_match_lines."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import print_browse
match_events = [{
"title": "Counter Strike: Team A vs Team B - ESL Pro League",
"slug": "csa",
"startTime": "2026-03-25T18:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A", "Team B"]',
"outcomePrices": "[0.55, 0.45]",
"bestBid": "0.54",
"bestAsk": "0.56",
"volume": "50000",
"acceptingOrders": True,
"closed": False,
}],
}]
with patch('browse.format_match_event') as mock_fmt, \
patch('browse.render_match_lines') as mock_render:
mock_fmt.return_value = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/csa",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
mock_render.return_value = [
"1. [Team A vs Team B](https://polymarket.com/market/csa)",
" Mar 25, 19:00 WIB | In 6h",
" Vol: $50,000",
" Tournament: ESL Pro League",
" Odds: Team A 55c | 45c Team B",
]
print_browse(match_events, [], "Counter Strike", 1, 1, 1, 0,
non_matches_max=5)
mock_fmt.assert_called_once_with(match_events[0])
mock_render.assert_called_once_with(mock_fmt.return_value, 1, mode="text")
@patch('builtins.print')
def test_print_browse_matches_only(self, mock_print):
"""matches_only suppresses non-match section."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import print_browse
with patch('browse.format_non_match_event') as mock_non_fmt:
print_browse([], [], "Counter Strike", 0, 0, 0, 0,
non_matches_max=5, matches_only=True)
mock_non_fmt.assert_not_called()
class TestSendChunked(unittest.TestCase):
"""Tests for send_chunked() helper."""
def test_small_message_sent_directly(self):
"""Messages under 4096 chars go through without chunking."""
sent_texts = []
def fake_send(text):
sent_texts.append(text)
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", "", "MATCH MARKETS", "", "1. test"]
# This fits in one message
from browse import send_chunked
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
show_matches=True, show_non_matches=False)
self.assertEqual(len(sent_texts), 1)
def test_chunked_message_gets_cont_header(self):
"""Messages over 4096 chars get continuation header."""
sent_texts = []
def fake_send(text):
sent_texts.append(text)
# Build enough content to exceed 4096 chars
# Each event line: ~260 chars. Need ~16 events + headers (~4200 chars)
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", ""]
for i in range(16):
lines += [f"<b>{i+1}.</b> <a href=\"https://polymarket.com/market/{i}\">Team {'X' * 250}</a>", " Mar 25, 19:00 WIB | In 6h", " Vol: $50,000", " Odds: TeamA 55c | 45c TeamB", ""]
lines.append("")
from browse import send_chunked
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
show_matches=True, show_non_matches=False)
# Should have sent more than one message (chunked)
self.assertGreater(len(sent_texts), 1)
# At least one continuation message
cont_found = any("(cont.)" in t for t in sent_texts)
self.assertTrue(cont_found, f"Expected at least one '(cont.)' message. Got {len(sent_texts)} messages.")
if __name__ == "__main__":
unittest.main()