2 Commits

Author SHA1 Message Date
shoko
27c8cb3597 Add security audit to polymarket-browse review
CRITICAL: Telegram bot token exposed in process command line
HIGH: HTML injection in Telegram messages
MEDIUM: Insufficient --search URL encoding
MEDIUM: No bounds check on --detail
MEDIUM: Potential DoS via large API response
LOW: Bare except: clauses
LOW: No API rate limiting

Includes fix recommendations and immediate actions for users.
2026-03-25 09:27:28 +00:00
shoko
4a33d6924e Add polymarket-browse skill review (2026-03-25)
- Deep analysis of SKILL.md and browse.py
- Line length analysis (worst: 209 chars at print_browse signature)
- Duplicate code patterns (3 time functions, 2 tradeable checkers)
- Bug findings (bare except:, unused variables, 11-param function)
- Recommendations for refactoring and unit testing
- Proposed test structure under tests/
- Summary table categorized by priority/effort
2026-03-25 09:12:05 +00:00
5 changed files with 285 additions and 1620 deletions

1
.gitignore vendored
View File

@@ -2,4 +2,3 @@ __pycache__/
*.pyc *.pyc
*.pyo *.pyo
.DS_Store .DS_Store
.worktrees/

View File

@@ -4,13 +4,11 @@ Polymarket Event Browser
Browse tradeable Polymarket events by game category. Browse tradeable Polymarket events by game category.
""" """
import html import subprocess
import json import json
import time import time
import argparse import argparse
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from urllib.parse import urlencode
from urllib.request import urlopen, Request
# ============================================================ # ============================================================
# CONFIG # CONFIG
@@ -19,7 +17,6 @@ from urllib.request import urlopen, Request
PAGE_SIZE = 50 PAGE_SIZE = 50
MAX_RETRIES = 5 MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
GAME_CATEGORIES = { GAME_CATEGORIES = {
"All Esports": "Esports", "All Esports": "Esports",
@@ -42,68 +39,52 @@ def fetch_page(q, page=1, max_retries=MAX_RETRIES, initial_delay=INITIAL_RETRY_D
url = (f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}" url = (f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false" f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false") f"&keep_closed_markets=0&events_status=active&cache=false")
delay = initial_delay delay = initial_delay
for attempt in range(max_retries): for attempt in range(max_retries):
if attempt > 0: time.sleep(delay)
time.sleep(delay) r = subprocess.run(
try: ["curl", "-s", url, "--max-time", "10", "-H", "User-Agent: curl/7.88.1"],
req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) capture_output=True
with urlopen(req, timeout=10) as r: )
return json.loads(r.read())
except Exception: if r.returncode == 0 and len(r.stdout) > 0:
try:
return json.loads(r.stdout.decode('utf-8'))
except json.JSONDecodeError:
if attempt < max_retries - 1:
delay *= 2 # Exponential backoff
continue
return None
else:
# Rate limit or other error - exponential backoff
if attempt < max_retries - 1: if attempt < max_retries - 1:
delay *= 2 delay *= 2
continue continue
return None return None
return None return None
def fetch_all_pages(q, matches_max=None, non_matches_max=None): def fetch_all_pages(q, max_pages=100):
""" """
Fetch pages until pagination ends, or until quotas are satisfied. Fetch ALL pages until pagination ends.
max_pages is a safety cap to prevent infinite loops.
Args:
q: search query
matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit)
Returns:
{"events": [...], "total_raw": N, "partial": bool}
""" """
all_events = [] all_events = []
total_raw = 0 total_raw = 0
match_count = 0 for page in range(1, max_pages + 1):
non_match_count = 0 time.sleep(0.2) # small delay between pages (API rate limit is generous)
page = 0
while True:
page += 1
time.sleep(0.2)
data = fetch_page(q, page) data = fetch_page(q, page)
if data is None: if data is None:
break break
events = data.get("events", []) events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0) total_raw = data.get("pagination", {}).get("totalResults", 0)
all_events.extend(events) all_events.extend(events)
# Stop when we get 0 events (no more pages),
# Count matches/non-matches in this page # OR when we've fetched >= total results
for e in events:
if is_match_market(e):
match_count += 1
else:
non_match_count += 1
# Stop if we got what we wanted (only when caps are set)
if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max:
break
# Stop when we get 0 events (no more pages)
if len(events) == 0: if len(events) == 0:
break break
# Stop when we've fetched all known results
if len(all_events) >= total_raw: if len(all_events) >= total_raw:
break break
partial = (total_raw > 0 and len(all_events) < total_raw) partial = (total_raw > 0 and len(all_events) < total_raw)
return {"events": all_events, "total_raw": total_raw, "partial": partial} return {"events": all_events, "total_raw": total_raw, "partial": partial}
@@ -238,79 +219,94 @@ def format_spread(bid, ask):
spread = ask - bid spread = ask - bid
return f"{prob_to_cents(spread)}c" return f"{prob_to_cents(spread)}c"
def get_match_time_status(e):
def _get_time_data(e, tz=None):
""" """
Unified time data extraction for event timestamps. Return a human-readable match time status.
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
Uses startTime (preferred) or startDate as the event start time. Uses startTime for actual match start time.
Datetime parsing and all relative calculations are UTC-based. Displays times in WIB (UTC+7 for Indonesian users).
The tz parameter only affects the abs_time formatting.
Args:
e: Event dict with 'startTime' or 'startDate' key.
tz: datetime.timezone for abs_time formatting.
Defaults to WIB (UTC+7).
Returns:
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
""" """
tz = tz or WIB # Use startTime for actual match start, not startDate (which is market creation time)
start_str = e.get("startTime") or e.get("startDate", "") start_str = e.get("startTime") or e.get("startDate", "")
if not start_str: if not start_str:
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"} return "TBD", 0
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now = now_utc.astimezone(utc7)
start_utc7 = start_dt.astimezone(utc7)
delta = start_dt - now_utc
if delta.total_seconds() < 0:
# Started already
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
return "LIVE", 3
elif hours_ago < 4:
return f"LIVE {int(hours_ago)}h", 3
elif hours_ago < 24:
return f"Started {int(hours_ago)}h ago", 1
else:
days = int(hours_ago / 24)
return f"{days}d ago", 0
else:
# Starts in future
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
return "LIVE", 3
elif hours_until < 1:
mins = int(delta.total_seconds() / 60)
return f"In {mins}m", 3
elif hours_until < 24:
return f"In {int(hours_until)}h", 2
else:
days = int(hours_until / 24)
return f"In {days}d", 1
except:
return "", 0
def get_match_time_str(e):
"""
Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
Uses startTime for actual match start time.
"""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD"
try: try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00')) start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc delta = start_dt - now_utc
total_sec = delta.total_seconds()
if delta.total_seconds() < 0:
if total_sec < 0: hours_ago = abs(delta.total_seconds()) / 3600
# Event is in the past
hours_ago = abs(total_sec) / 3600
if hours_ago < 1: if hours_ago < 1:
time_status = "LIVE" return "LIVE"
time_urgency = 3
elif hours_ago < 4: elif hours_ago < 4:
time_status = f"LIVE {int(hours_ago)}h" return f"LIVE {int(hours_ago)}h"
time_urgency = 3
elif hours_ago < 24: elif hours_ago < 24:
time_status = f"{int(hours_ago)}h ago" return f"{int(hours_ago)}h ago"
time_urgency = 1
else: else:
days = int(hours_ago / 24) days = int(hours_ago / 24)
time_status = f"{days}d ago" return f"{days}d ago"
time_urgency = 0
else: else:
# Event is in the future hours_until = delta.total_seconds() / 3600
if total_sec < 3600: if hours_until <= 0:
mins = int(total_sec / 60) return "LIVE"
time_status = f"In {mins}m" elif hours_until < 1:
time_urgency = 3 mins = int(delta.total_seconds() / 60)
elif total_sec < 86400: return f"In {mins}m"
hours_until = int(total_sec / 3600) elif hours_until < 24:
time_status = f"In {hours_until}h" return f"In {int(hours_until)}h"
time_urgency = 2
else: else:
days = int(total_sec / 86400) days = int(hours_until / 24)
time_status = f"In {days}d" return f"In {days}d"
time_urgency = 1 except:
return ""
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ")
if tz == WIB:
abs_time += "WIB"
else:
abs_time += start_dt.astimezone(tz).strftime("%Z")
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
except Exception:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
def filter_events(events, tradeable_only=True): def filter_events(events, tradeable_only=True):
""" """
@@ -319,17 +315,16 @@ def filter_events(events, tradeable_only=True):
""" """
match_events = [] match_events = []
non_match_events = [] non_match_events = []
for e in events: for e in events:
if is_match_market(e): if is_match_market(e):
if not tradeable_only or is_tradeable_event(e): if not tradeable_only or is_tradeable_event(e):
match_events.append(e) match_events.append(e)
else: else:
non_match_events.append(e) non_match_events.append(e)
return match_events, non_match_events return match_events, non_match_events
def sort_events(events): def sort_events(events):
return sorted(events, key=get_ml_volume, reverse=True) return sorted(events, key=get_ml_volume, reverse=True)
@@ -337,214 +332,24 @@ def sort_events(events):
# BROWSE # BROWSE
# ============================================================ # ============================================================
def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True, sort_by=None): def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
""" result = fetch_all_pages(q)
Browse Polymarket events.
Args:
q: search query
matches_max: max number of match markets to return
non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
"""
# Pass quotas to fetch_all_pages for early-exit optimization.
# Only use early-exit when sort_by is None (no client-side sort needed).
use_early_exit = (sort_by is None)
fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages(q, matches_max=fetch_matches_max, non_matches_max=fetch_non_matches_max)
events = result["events"] events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only) match_events, non_match_events = filter_events(events, tradeable_only)
sorted_match = sort_events(match_events)
# Sort if requested; otherwise preserve API order
if sort_by == "volume":
match_events = sort_events(match_events)
non_match_events = sort_events(non_match_events)
return { return {
"query": q, "query": q,
"total_raw": result["total_raw"], "total_raw": result["total_raw"],
"total_fetched": len(events), "total_fetched": len(events),
"total_match": len(match_events), "total_match": len(match_events),
"total_non_match": len(non_match_events), "total_non_match": len(non_match_events),
"match_events": match_events[:matches_max], "match_events": sorted_match[:matches_max],
"non_match_events": non_match_events[:non_matches_max], "non_match_events": non_match_events[:non_matches_max],
"partial": result.get("partial", False), "partial": result.get("partial", False),
} }
# ============================================================ # ============================================================
# FORMAT — EVENT # FORMAT
# ============================================================
def format_match_event(e):
"""
Format a match event into a canonical dict for rendering.
All computing done here; renderers just template.
Returns:
{
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
"""
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
td = _get_time_data(e)
title = e.get("title", "")
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
if " - " in title:
title_clean = title.split(" - ")[0].strip()
else:
title_clean = title
tournament = get_tournament(title)
return {
"title": title,
"title_clean": title_clean,
"tournament": tournament,
"url": get_event_url(e),
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"abs_time": td["abs_time"],
"team_a": team_a,
"team_b": team_b,
"odds_a": odds_a,
"odds_b": odds_b,
"vol": get_ml_volume(e),
}
def format_non_match_event(e):
"""
Format a non-match event into a canonical dict for rendering.
Returns:
{
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
"""
td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
return {
"title": e.get("title", "?"),
"url": get_event_url(e),
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"abs_time": td["abs_time"],
"market_count": market_count,
"total_vol": int(total_vol),
}
# ============================================================
# FORMAT — RENDER
# ============================================================
def render_match_lines(event_dict, i, mode):
"""
Render a formatted match event dict into lines of text.
Args:
event_dict: canonical dict from format_match_event()
i: 1-based index for the event number
mode: "text" for plain text/Markdown, "html" for Telegram HTML
Returns:
List[str], one line per element (no trailing blank line).
Caller adds the blank line separator between events.
"""
title_clean = event_dict["title_clean"]
url = event_dict["url"]
abs_time = event_dict["abs_time"]
time_status = event_dict["time_status"]
vol = event_dict["vol"]
tournament = event_dict["tournament"]
team_a = event_dict["team_a"]
team_b = event_dict["team_b"]
odds_a = event_dict["odds_a"]
odds_b = event_dict["odds_b"]
lines = []
if mode == "html":
lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
else:
lines.append(f"{i}. [{title_clean}]({url})")
lines.append(f" {abs_time} | {time_status}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
lines.append(f" Tournament: {tournament}")
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
return lines
def render_non_match_lines(event_dict, i, mode):
"""
Render a formatted non-match event dict into lines of text.
Args:
event_dict: canonical dict from format_non_match_event()
i: 1-based index for the event number
mode: "text" for plain text/Markdown, "html" for Telegram HTML
Returns:
List[str], one line per element (no trailing blank line).
"""
title = event_dict["title"]
url = event_dict["url"]
abs_time = event_dict["abs_time"]
time_status = event_dict["time_status"]
market_count = event_dict["market_count"]
total_vol = event_dict["total_vol"]
lines = []
if mode == "html":
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
else:
lines.append(f"{i}. [{title}]({url})")
lines.append(f" {abs_time} | {time_status}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
return lines
# ============================================================
# FORMAT — LEGACY
# ============================================================ # ============================================================
def format_event(e): def format_event(e):
@@ -554,12 +359,12 @@ def format_event(e):
best_bid = float(ml.get("bestBid", 0)) if ml else 0 best_bid = float(ml.get("bestBid", 0)) if ml else 0
best_ask = float(ml.get("bestAsk", 0)) if ml else 0 best_ask = float(ml.get("bestAsk", 0)) if ml else 0
vol = get_ml_volume(e) vol = get_ml_volume(e)
td = _get_time_data(e) time_status, urgency = get_match_time_status(e)
return { return {
"title": e.get("title", ""), "title": e.get("title", ""),
"time_status": td["time_status"], "time_status": time_status,
"time_urgency": td["time_urgency"], "time_urgency": urgency,
"url": get_event_url(e), "url": get_event_url(e),
"livestream": e.get("resolutionSource"), "livestream": e.get("resolutionSource"),
"outcomes": outcomes, "outcomes": outcomes,
@@ -577,13 +382,12 @@ def format_detail_event(e):
if float(m.get("volume", 0)) > 0 and is_tradeable_market(m) if float(m.get("volume", 0)) > 0 and is_tradeable_market(m)
] ]
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True) active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
td = _get_time_data(e) time_status, urgency = get_match_time_status(e)
return { return {
"title": e.get("title", ""), "title": e.get("title", ""),
"time_status": td["time_status"], "time_status": time_status,
"abs_time": td["abs_time"],
"url": get_event_url(e), "url": get_event_url(e),
"livestream": e.get("resolutionSource"), "livestream": e.get("resolutionSource"),
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [], "outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
@@ -610,6 +414,48 @@ def format_detail_event(e):
# DISPLAY # DISPLAY
# ============================================================ # ============================================================
def get_start_time_wib(e):
"""Return (date_time_str, relative_str) for display."""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD", ""
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
start_utc7 = start_dt.astimezone(utc7)
# Absolute: "Mar 25, 19:00 WIB"
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
# Relative: "In 5h", "In 10h", "LIVE", etc.
delta = start_dt - now_utc
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
rel_str = "LIVE"
elif hours_ago < 24:
rel_str = f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
rel_str = f"{days}d ago"
else:
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
rel_str = "LIVE"
elif hours_until < 1:
mins_until = int(delta.total_seconds() / 60)
rel_str = f"In {mins_until}m"
elif hours_until < 24:
rel_str = f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
rel_str = f"In {days}d"
return abs_str, rel_str
except:
return "TBD", ""
def get_header_date(): def get_header_date():
"""Return current date string like 'Mar 25, 2026'""" """Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
@@ -631,17 +477,18 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
header_date = get_header_date() header_date = get_header_date()
print(f"\n=== {category.upper()}{' [RAW]' if raw_mode else ''} ===") print(f"\n=== {category.upper()}{' [RAW]' if raw_mode else ''} ===")
print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}") print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}")
if raw_mode: if raw_mode:
print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}") print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}")
if partial: if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete") print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
# Determine sections to show # --- MATCH MARKETS ---
if not matches_only and not non_matches_only: if not matches_only and not non_matches_only:
# Default: show both
show_matches = True show_matches = True
show_non_matches = True show_non_matches = True
elif matches_only: elif matches_only:
@@ -650,32 +497,69 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
else: else:
show_matches = False show_matches = False
show_non_matches = True show_non_matches = True
# Match events
if show_matches: if show_matches:
print("\nMATCH MARKETS") print(f"\nMATCH MARKETS")
if not match_events: if not match_events:
print(" No match markets found.") print(" No match markets found.")
else: else:
for i, e in enumerate(match_events, 1): for i, e in enumerate(match_events, 1):
fd = format_match_event(e) f = format_event(e)
for line in render_match_lines(fd, i, mode="text"): ml = get_ml_market(e)
print(line) outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
# Non-match events vol = f["volume"]
title = f["title"]
url = f["url"]
start_time_wib, rel_time = get_start_time_wib(e)
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
if " - " in title:
title_clean = title.split(" - ")[0].strip()
else:
title_clean = title
tournament = get_tournament(title)
print(f"\n {i}. [{title_clean}]({url})")
print(f" {start_time_wib} | {rel_time}")
print(f" Vol: ${vol:,.0f}")
if tournament:
print(f" Tournament: {tournament}")
print(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
# --- NON-MATCH MARKETS ---
if show_non_matches and non_match_events: if show_non_matches and non_match_events:
print("\nNON-MATCH MARKETS") print(f"\nNON-MATCH MARKETS")
for i, e in enumerate(non_match_events[:non_matches_max], 1): for i, e in enumerate(non_match_events[:non_matches_max], 1):
fd = format_non_match_event(e) title = e.get("title", "?")
for line in render_non_match_lines(fd, i, mode="text"): url = get_event_url(e)
print(line) start_time_wib, rel_time = get_start_time_wib(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
print(f"\n {i}. [{title}]({url})")
print(f" {start_time_wib} | {rel_time}")
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
def print_detail(e, detail): def print_detail(e, detail):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
print(f"\n{detail['title']}") print(f"\n{detail['title']}")
print(f"URL: {detail['url']}") print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}") print(f"Livestream: {detail['livestream']}")
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A" spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
time_str = get_match_time_str(e)
print(f"\n{detail['time_status']}") print(f"\n{detail['time_status']}")
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}") print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}") print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
@@ -692,133 +576,125 @@ def print_detail(e, detail):
# TELEGRAM # TELEGRAM
# ============================================================ # ============================================================
def escape_html(text):
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
def send_telegram_message(bot_token, chat_id, text, timeout=10):
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result.get('description')}")
return result["result"]["message_id"]
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False): def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment.""" """Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
import os import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") bot_token = os.environ.get("BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID") chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id: if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment") print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
return
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
header_date = now_utc7.strftime("%b %d, %Y") header_date = now_utc7.strftime("%b %d, %Y")
# Determine sections to show # Determine sections to show
show_matches = (not matches_only and not non_matches_only) or matches_only show_matches = (not matches_only and not non_matches_only) or matches_only
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
def send(text): def send(text):
msg_id = send_telegram_message(bot_token, chat_id, text) result = subprocess.run(
print(f" Sent msg {msg_id}") ["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
"-d", f"chat_id={chat_id}",
# Build lines "-d", f"text={text}",
lines = [f"<b>{category.upper()}</b> | {header_date}", ""] "-d", "parse_mode=HTML",
"-d", "disable_web_page_preview=true"],
capture_output=True
)
resp = json.loads(result.stdout.decode())
if resp.get("ok"):
print(f" Sent msg {resp['result']['message_id']}")
else:
print(f" Error: {resp.get('description')}")
# Build sections
lines = [f"<b>{category.upper()}</b> | {header_date}"]
lines.append("")
if show_matches: if show_matches:
lines += ["MATCH MARKETS", ""] lines.append("MATCH MARKETS")
lines.append("")
if not match_events: if not match_events:
lines.append(" No match markets found.") lines.append(" No match markets found.")
else: else:
for i, e in enumerate(match_events, 1): for i, e in enumerate(match_events, 1):
fd = format_match_event(e) ml = get_ml_market(e)
lines += render_match_lines(fd, i, mode="html") outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
vol = get_ml_volume(e)
title = e.get("title", "?")
url = get_event_url(e)
start_time_wib, rel_time = get_start_time_wib(e)
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
tournament = get_tournament(title)
title_clean = title.split(" - ")[0].strip() if " - " in title else title
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
lines.append(f" Tournament: {tournament}")
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
lines.append("") lines.append("")
lines.append("") lines.append("")
if show_non_matches: if show_non_matches:
lines += ["NON-MATCH MARKETS", ""] lines.append("NON-MATCH MARKETS")
lines.append("")
if not non_match_events: if not non_match_events:
lines.append(" No non-match markets found.") lines.append(" No non-match markets found.")
else: else:
for i, e in enumerate(non_match_events, 1): for i, e in enumerate(non_match_events, 1):
fd = format_non_match_event(e) title = e.get("title", "?")
lines += render_non_match_lines(fd, i, mode="html") url = get_event_url(e)
start_time_wib, rel_time = get_start_time_wib(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
lines.append("") lines.append("")
lines.append("")
# Chunk by 10 items (events), respecting 4096 char Telegram limit
# Chunk and send text = "\n".join(lines)
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
"""
Split already-built lines into Telegram-safe chunks and send them.
Telegram messages are capped at 4096 chars. Chunks are grouped by
section header so no event is split across messages.
Args:
all_lines: Full message lines list (built by caller).
send_fn: Closure that sends a single string and prints confirmation.
category: Category name for header.
header_date: Date string for header.
show_matches: Whether MATCH MARKETS section is present.
show_non_matches: Whether NON-MATCH MARKETS section is present.
"""
text = "\n".join(all_lines)
if len(text) <= 4096: if len(text) <= 4096:
send_fn(text) send(text)
return return
# Split into chunks of 10 events, respecting section headers # Split into chunks of 10 events
all_items = [] all_items = []
in_match = True in_match = True
for line in all_lines: for line in lines:
if line == "MATCH MARKETS": if line == "MATCH MARKETS":
in_match = True in_match = True
elif line == "NON-MATCH MARKETS": elif line == "NON-MATCH MARKETS":
in_match = False in_match = False
elif line.startswith("<b>") and "</a>" in line: elif line.startswith("<b>") and ". " in line and "</a>" in line:
# Event title line: <b>1.</b> <a href="...">Title</a>
all_items.append((in_match, line)) all_items.append((in_match, line))
chunk = [] chunk = []
chunk_len = 0
chunk_num = 1
# Header is always first
header = f"<b>{category.upper()}</b> | {header_date}\n" header = f"<b>{category.upper()}</b> | {header_date}\n"
if show_matches: if show_matches:
header += "\nMATCH MARKETS\n\n" header += "\nMATCH MARKETS\n\n"
if show_non_matches: if show_non_matches:
header += "\nNON-MATCH MARKETS\n\n" header += "\nNON-MATCH MARKETS\n\n"
for is_match, item_line in all_items: for is_match, item_line in all_items:
test_chunk = chunk + [item_line, ""] test_chunk = chunk + [item_line, ""]
test_text = header + "\n".join(chunk) + "\n".join(test_chunk) test_text = header + "\n".join(chunk) + "\n".join(test_chunk)
if len(test_text) > 4096 or len(chunk) >= 10: if len(test_text) > 4096 or len(chunk) >= 10:
# Send current chunk
msg = header + "\n".join(chunk) msg = header + "\n".join(chunk)
send_fn(msg) send(msg)
chunk = [item_line, ""] chunk = [item_line, ""]
header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n" header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n"
if show_matches and is_match: if show_matches and is_match:
@@ -827,10 +703,10 @@ def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_n
header += "\nNON-MATCH MARKETS\n\n" header += "\nNON-MATCH MARKETS\n\n"
else: else:
chunk.extend([item_line, ""]) chunk.extend([item_line, ""])
if chunk: if chunk:
msg = header + "\n".join(chunk) msg = header + "\n".join(chunk)
send_fn(msg) send(msg)
# ============================================================ # ============================================================
@@ -861,7 +737,7 @@ def main():
parser.add_argument("--raw", action="store_true", parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).") help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true", parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).") help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
args = parser.parse_args() args = parser.parse_args()
if args.list_categories: if args.list_categories:

View File

@@ -1 +0,0 @@
# Tests package

File diff suppressed because it is too large Load Diff