1 Commits

Author SHA1 Message Date
shoko
d018e87b35 Fix #14: Refactor print_browse/send_to_telegram into single pipeline
Replace duplicate inline formatting with unified format+render pipeline.

New functions:
- format_match_event(e) — canonical dict for match events
- format_non_match_event(e) — canonical dict for non-match events
- render_match_lines(event_dict, i, mode) — text/HTML renderer
- render_non_match_lines(event_dict, i, mode) — text/HTML renderer
- send_chunked(...) — extracted Telegram chunking logic

Also fixed send_chunked() chunking bug: the original '. ' in line
check never matched event lines (period is followed by '</b>' not space).

Tests: 38 total, all passing.

Fixes: #14
2026-03-25 17:47:03 +00:00
3 changed files with 275 additions and 1725 deletions

View File

@@ -34,7 +34,7 @@ hermes mcp add polymarket https://docs.polymarket.com/mcp
## Usage ## Usage
``` ```
polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram] [--no-cache] [--max-total N] polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram]
``` ```
## Arguments ## Arguments
@@ -49,8 +49,6 @@ polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non
- `--detail` : Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable. - `--detail` : Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.
- `--list-categories` : List available game categories and exit - `--list-categories` : List available game categories and exit
- `--raw` : Show all events without tradeable filter (for debugging). Includes fetch stats. - `--raw` : Show all events without tradeable filter (for debugging). Includes fetch stats.
- `--no-cache` : Disable caching and fetch fresh data from the API.
- `--max-total` : Maximum total events to fetch before early exit. Default: no limit. Useful for quick snapshots.
- `--telegram` : Send results to Telegram. Requires `BOT_TOKEN` and `CHAT_ID` in environment variables. - `--telegram` : Send results to Telegram. Requires `BOT_TOKEN` and `CHAT_ID` in environment variables.
## Output Format ## Output Format
@@ -120,32 +118,13 @@ Use `--raw` to disable the tradeable filter and see all match markets regardless
## Pagination ## Pagination
The script fetches **ALL pages** until the API runs out of results. The script fetches **ALL pages** until the API runs out of results (up to 100 pages as a safety cap).
### Parallel Fetching
Pages are fetched in **parallel batches of 5** using ThreadPoolExecutor. This significantly reduces fetch time:
| Scenario | Without Parallelization | With Parallelization |
|----------|------------------------|---------------------|
| 10 pages (50 events) | ~20s (2s per page × 10) | ~4s (2s per batch × 2 batches) |
| 20 pages (100 events) | ~40s | ~8s |
The script first fetches page 1 to determine total pages, then fetches remaining pages in parallel batches of 5.
## Rate Limiting ## Rate Limiting
- Exponential backoff: 2s → 4s → 8s → 16s → 32s - Exponential backoff: 2s → 4s → 8s → 16s → 32s
- Max 5 retries before aborting - Max 5 retries before aborting
## Caching
Results are cached in `~/.cache/polymarket-browse/` with a **5-minute TTL** to reduce redundant API calls.
- Use `--no-cache` to bypass the cache and fetch fresh data
- Cached data is automatically used when available and not expired
- Useful when running the script repeatedly (e.g., for monitoring)
## Odds Format ## Odds Format
All odds are shown in **cents** format: All odds are shown in **cents** format:

View File

@@ -8,88 +8,10 @@ import html
import json import json
import time import time
import argparse import argparse
import hashlib
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Any, Callable, TypedDict
from urllib.parse import urlencode from urllib.parse import urlencode
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
class TimeData(TypedDict):
time_status: str
time_urgency: int
abs_time: str
class MatchEvent(TypedDict):
title: str
title_clean: str
tournament: str
url: str
time_status: str
time_urgency: int
abs_time: str
team_a: str
team_b: str
odds_a: str
odds_b: str
vol: float
class NonMatchEvent(TypedDict):
title: str
url: str
time_status: str
time_urgency: int
abs_time: str
market_count: int
total_vol: int
class Market(TypedDict):
type: str
question: str
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
url: str
class DetailEvent(TypedDict):
title: str
time_status: str
abs_time: str
url: str
livestream: str | None
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
markets: list[Market]
class BrowseResult(TypedDict):
query: str
total_raw: int
total_fetched: int
total_match: int
total_non_match: int
match_events: list[Any]
non_match_events: list[Any]
partial: bool
class FetchResult(TypedDict):
events: list[Any]
total_raw: int
partial: bool
# ============================================================ # ============================================================
# CONFIG # CONFIG
# ============================================================ # ============================================================
@@ -111,200 +33,72 @@ GAME_CATEGORIES = {
"Tennis": "Tennis", "Tennis": "Tennis",
} }
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "polymarket-browse")
CACHE_TTL = 300 # 5 minutes default
MAX_PARALLEL_FETCHES = 5
# ============================================================
# CACHE
# ============================================================
def _get_cache_key(q: str) -> str:
return hashlib.md5(q.encode()).hexdigest()
def _get_cache_path(q: str) -> str:
os.makedirs(CACHE_DIR, exist_ok=True)
return os.path.join(CACHE_DIR, f"{_get_cache_key(q)}.json")
def _read_cache(q: str) -> dict[str, Any] | None:
cache_path = _get_cache_path(q)
if not os.path.exists(cache_path):
return None
try:
mtime = os.path.getmtime(cache_path)
age = time.time() - mtime
if age > CACHE_TTL:
return None
with open(cache_path) as f:
return json.load(f)
except Exception:
return None
def _write_cache(q: str, data: dict[str, Any]) -> None:
try:
cache_path = _get_cache_path(q)
with open(cache_path, "w") as f:
json.dump(data, f)
except Exception:
pass
# ============================================================ # ============================================================
# FETCH # FETCH
# ============================================================ # ============================================================
def fetch_page(q, page=1, max_retries=MAX_RETRIES, initial_delay=INITIAL_RETRY_DELAY):
def fetch_page(
q: str,
page: int = 1,
max_retries: int = MAX_RETRIES,
initial_delay: float = INITIAL_RETRY_DELAY,
) -> dict[str, Any] | None:
base = "https://gamma-api.polymarket.com/public-search" base = "https://gamma-api.polymarket.com/public-search"
url = ( url = (f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}" f"&search_profiles=false&search_tags=false"
f"&search_profiles=false&search_tags=false" f"&keep_closed_markets=0&events_status=active&cache=false")
f"&keep_closed_markets=0&events_status=active&cache=false"
)
delay = initial_delay delay = initial_delay
for attempt in range(max_retries): for attempt in range(max_retries):
if attempt > 0: time.sleep(delay)
time.sleep(delay) r = subprocess.run(
try: ["curl", "-s", url, "--max-time", "10", "-H", "User-Agent: curl/7.88.1"],
req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) capture_output=True
with urlopen(req, timeout=10) as r: )
return json.loads(r.read())
except Exception: if r.returncode == 0 and len(r.stdout) > 0:
try:
return json.loads(r.stdout.decode('utf-8'))
except json.JSONDecodeError:
if attempt < max_retries - 1:
delay *= 2 # Exponential backoff
continue
return None
else:
# Rate limit or other error - exponential backoff
if attempt < max_retries - 1: if attempt < max_retries - 1:
delay *= 2 delay *= 2
continue continue
return None return None
return None return None
def fetch_all_pages(q, max_pages=100):
def _fetch_page_with_index(q: str, page: int) -> tuple[int, dict[str, Any] | None]:
return page, fetch_page(q, page)
def fetch_all_pages(
q: str,
matches_max: int | None = None,
non_matches_max: int | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> FetchResult:
""" """
Fetch pages until pagination ends, or until quotas are satisfied. Fetch ALL pages until pagination ends.
max_pages is a safety cap to prevent infinite loops.
Args:
q: search query
matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit)
max_total: stop early once we have this many total events (None = no limit)
use_cache: whether to use cache (default True)
Returns:
FetchResult with events, total_raw, and partial flag
""" """
cached = _read_cache(q) if use_cache else None all_events = []
if cached is not None:
events = cached.get("events", [])
total_raw = cached.get("total_raw", 0)
if events:
return {"events": events, "total_raw": total_raw, "partial": False}
total_raw = 0 total_raw = 0
page_count = 0 for page in range(1, max_pages + 1):
page1_data = None time.sleep(0.2) # small delay between pages (API rate limit is generous)
data = fetch_page(q, page)
while True:
page_count += 1
data = fetch_page(q, page_count)
if data is None: if data is None:
break break
events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0) total_raw = data.get("pagination", {}).get("totalResults", 0)
if page_count == 1: all_events.extend(events)
page1_data = data # Stop when we get 0 events (no more pages),
if total_raw > 0: # OR when we've fetched >= total results
if len(events) == 0:
break break
if not data.get("events"): if len(all_events) >= total_raw:
break break
partial = (total_raw > 0 and len(all_events) < total_raw)
if total_raw == 0 or page1_data is None: return {"events": all_events, "total_raw": total_raw, "partial": partial}
return {"events": [], "total_raw": 0, "partial": False}
page1_events = page1_data.get("events", [])
actual_page_size = len(page1_events)
# Use actual events per page from API for ceiling division
# ceil(total_raw / actual_page_size) = (total_raw + actual_page_size - 1) // actual_page_size
total_pages = (total_raw + actual_page_size - 1) // actual_page_size
concurrency = min(MAX_PARALLEL_FETCHES, total_pages)
all_page_data: dict[int, list[Any]] = {1: page1_events}
if total_pages > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(_fetch_page_with_index, q, page): page
for page in range(2, total_pages + 1)
}
for future in as_completed(futures):
try:
page_num, data = future.result()
if data is not None:
all_page_data[page_num] = data.get("events", [])
except Exception:
pass
all_events = []
for page_num in sorted(all_page_data.keys()):
all_events.extend(all_page_data[page_num])
_write_cache(q, {"events": all_events, "total_raw": total_raw})
match_count = 0
non_match_count = 0
filtered_events = []
total_seen = 0
for e in all_events:
is_match = is_match_market(e)
if is_match:
match_count += 1
else:
non_match_count += 1
filtered_events.append(e)
if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max:
break
if max_total is not None:
total_seen += 1
if total_seen >= max_total:
break
partial = len(all_events) < total_raw
return {"events": filtered_events, "total_raw": total_raw, "partial": partial}
# ============================================================ # ============================================================
# FILTERS # FILTERS
# ============================================================ # ============================================================
def is_match_market(e):
def is_match_market(e: dict[str, Any]) -> bool:
return (e.get("seriesSlug") and e.get("gameId")) or " vs " in e.get("title", "") return (e.get("seriesSlug") and e.get("gameId")) or " vs " in e.get("title", "")
def get_event_url(e):
def get_event_url(e: dict[str, Any]) -> str:
"""Return the correct Polymarket URL for an event. """Return the correct Polymarket URL for an event.
Match markets use /market/, non-match events use /event/. Match markets use /market/, non-match events use /event/.
""" """
@@ -314,20 +108,17 @@ def get_event_url(e: dict[str, Any]) -> str:
else: else:
return f"https://polymarket.com/event/{slug}" return f"https://polymarket.com/event/{slug}"
def get_ml_market(e):
def get_ml_market(e: dict[str, Any]) -> dict[str, Any] | None:
for m in e.get("markets", []): for m in e.get("markets", []):
if m.get("sportsMarketType") == "moneyline": if m.get("sportsMarketType") == "moneyline":
return m return m
return None return None
def get_ml_volume(e):
def get_ml_volume(e: dict[str, Any]) -> float:
ml = get_ml_market(e) ml = get_ml_market(e)
return float(ml.get("volume", 0)) if ml else 0.0 return float(ml.get("volume", 0)) if ml else 0.0
def is_bo2_tie(e):
def is_bo2_tie(e: dict[str, Any]) -> bool:
""" """
Detect if this is a BO2 that ended in a tie (1-1). Detect if this is a BO2 that ended in a tie (1-1).
Returns True if all child_moneyline markets are closed (match is over but tied). Returns True if all child_moneyline markets are closed (match is over but tied).
@@ -336,11 +127,7 @@ def is_bo2_tie(e: dict[str, Any]) -> bool:
if "BO2" not in title: if "BO2" not in title:
return False return False
child_markets = [ child_markets = [m for m in e.get("markets", []) if m.get("sportsMarketType") == "child_moneyline"]
m
for m in e.get("markets", [])
if m.get("sportsMarketType") == "child_moneyline"
]
if len(child_markets) != 2: if len(child_markets) != 2:
return False return False
@@ -348,8 +135,7 @@ def is_bo2_tie(e: dict[str, Any]) -> bool:
all_closed = all(m.get("closed", False) for m in child_markets) all_closed = all(m.get("closed", False) for m in child_markets)
return all_closed return all_closed
def is_tradeable_event(e):
def is_tradeable_event(e: dict[str, Any]) -> bool:
ml = get_ml_market(e) ml = get_ml_market(e)
if not ml: if not ml:
return False return False
@@ -379,7 +165,7 @@ def is_tradeable_event(e: dict[str, Any]) -> bool:
end_str = e.get("endDate", "") end_str = e.get("endDate", "")
if end_str: if end_str:
try: try:
end_dt = datetime.fromisoformat(end_str.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
if end_dt < now: if end_dt < now:
return False return False
@@ -390,7 +176,7 @@ def is_tradeable_event(e: dict[str, Any]) -> bool:
start_str = e.get("startTime") or e.get("startDate", "") start_str = e.get("startTime") or e.get("startDate", "")
if start_str: if start_str:
try: try:
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00")) start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
if start_dt < now: if start_dt < now:
# Check if it's recently started (within 4h) — consider those "live" still # Check if it's recently started (within 4h) — consider those "live" still
@@ -402,8 +188,7 @@ def is_tradeable_event(e: dict[str, Any]) -> bool:
return True return True
def is_tradeable_market(m):
def is_tradeable_market(m: dict[str, Any]) -> bool:
accepting = m.get("acceptingOrders", False) accepting = m.get("acceptingOrders", False)
closed = m.get("closed", True) closed = m.get("closed", True)
best_ask = float(m.get("bestAsk", 0)) best_ask = float(m.get("bestAsk", 0))
@@ -423,26 +208,22 @@ def is_tradeable_market(m: dict[str, Any]) -> bool:
return True return True
# ============================================================ # ============================================================
# FORMATTING # FORMATTING
# ============================================================ # ============================================================
def prob_to_cents(p):
def prob_to_cents(p: float) -> int:
return int(round(p * 100)) return int(round(p * 100))
def format_odds(p):
def format_odds(p: float) -> str:
return f"{prob_to_cents(p)}c" return f"{prob_to_cents(p)}c"
def format_spread(bid, ask):
def format_spread(bid: float, ask: float) -> str:
spread = ask - bid spread = ask - bid
return f"{prob_to_cents(spread)}c" return f"{prob_to_cents(spread)}c"
def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData: def _get_time_data(e, tz=None):
""" """
Unified time data extraction for event timestamps. Unified time data extraction for event timestamps.
@@ -456,7 +237,11 @@ def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
Defaults to WIB (UTC+7). Defaults to WIB (UTC+7).
Returns: Returns:
TimeData with time_status, time_urgency, and abs_time {
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
""" """
tz = tz or WIB tz = tz or WIB
start_str = e.get("startTime") or e.get("startDate", "") start_str = e.get("startTime") or e.get("startDate", "")
@@ -465,13 +250,13 @@ def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"} return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
try: try:
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00")) start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc delta = start_dt - now_utc
total_sec = delta.total_seconds() total_sec = delta.total_seconds()
if total_sec <= 0: if total_sec < 0:
# Event is in the past or happening now # Event is in the past
hours_ago = abs(total_sec) / 3600 hours_ago = abs(total_sec) / 3600
if hours_ago < 1: if hours_ago < 1:
time_status = "LIVE" time_status = "LIVE"
@@ -506,24 +291,18 @@ def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
abs_time += "WIB" abs_time += "WIB"
else: else:
abs_time += start_dt.astimezone(tz).strftime("%Z") abs_time += start_dt.astimezone(tz).strftime("%Z")
return { return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
"time_status": time_status,
"time_urgency": time_urgency,
"abs_time": abs_time,
}
except Exception: except Exception:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"} return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
def filter_events( def filter_events(events, tradeable_only=True):
events: list[dict[str, Any]], tradeable_only: bool = True
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
""" """
Classify events into match_markets and non_match_markets. Classify events into match_markets and non_match_markets.
If tradeable_only=True, also filter out non-tradeable events. If tradeable_only=True, also filter out non-tradeable events.
""" """
match_events: list[dict[str, Any]] = [] match_events = []
non_match_events: list[dict[str, Any]] = [] non_match_events = []
for e in events: for e in events:
if is_match_market(e): if is_match_market(e):
@@ -535,79 +314,53 @@ def filter_events(
return match_events, non_match_events return match_events, non_match_events
def sort_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]: def sort_events(events):
return sorted(events, key=get_ml_volume, reverse=True) return sorted(events, key=get_ml_volume, reverse=True)
# ============================================================ # ============================================================
# BROWSE # BROWSE
# ============================================================ # ============================================================
def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
def browse_events( result = fetch_all_pages(q)
q: str,
matches_max: int = 10,
non_matches_max: int = 10,
tradeable_only: bool = True,
sort_by: str | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> BrowseResult:
"""
Browse Polymarket events.
Args:
q: search query
matches_max: max number of match markets to return
non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
max_total: max total events to fetch before early exit (None = no limit)
use_cache: whether to use cache (default True)
"""
use_early_exit = sort_by is None
fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages(
q,
matches_max=fetch_matches_max,
non_matches_max=fetch_non_matches_max,
max_total=max_total,
use_cache=use_cache,
)
events = result["events"] events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only) match_events, non_match_events = filter_events(events, tradeable_only)
sorted_match = sort_events(match_events)
# Sort if requested; otherwise preserve API order
if sort_by == "volume":
match_events = sort_events(match_events)
non_match_events = sort_events(non_match_events)
return { return {
"query": q, "query": q,
"total_raw": result["total_raw"], "total_raw": result["total_raw"],
"total_fetched": len(events), "total_fetched": len(events),
"total_match": len(match_events), "total_match": len(match_events),
"total_non_match": len(non_match_events), "total_non_match": len(non_match_events),
"match_events": match_events[:matches_max], "match_events": sorted_match[:matches_max],
"non_match_events": non_match_events[:non_matches_max], "non_match_events": non_match_events[:non_matches_max],
"partial": result.get("partial", False), "partial": result.get("partial", False),
} }
# ============================================================ # ============================================================
# FORMAT — EVENT # FORMAT — EVENT
# ============================================================ # ============================================================
def format_match_event(e):
def format_match_event(e: dict[str, Any]) -> MatchEvent:
""" """
Format a match event into a canonical dict for rendering. Format a match event into a canonical dict for rendering.
All computing done here; renderers just template. All computing done here; renderers just template.
Returns: Returns:
MatchEvent with all required fields {
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
""" """
ml = get_ml_market(e) ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
@@ -643,12 +396,20 @@ def format_match_event(e: dict[str, Any]) -> MatchEvent:
} }
def format_non_match_event(e: dict[str, Any]) -> NonMatchEvent: def format_non_match_event(e):
""" """
Format a non-match event into a canonical dict for rendering. Format a non-match event into a canonical dict for rendering.
Returns: Returns:
NonMatchEvent with all required fields {
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
""" """
td = _get_time_data(e) td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
@@ -669,8 +430,7 @@ def format_non_match_event(e: dict[str, Any]) -> NonMatchEvent:
# FORMAT — RENDER # FORMAT — RENDER
# ============================================================ # ============================================================
def render_match_lines(event_dict, i, mode):
def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
""" """
Render a formatted match event dict into lines of text. Render a formatted match event dict into lines of text.
@@ -697,7 +457,9 @@ def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
lines = [] lines = []
if mode == "html": if mode == "html":
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title_clean)}</a>') lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
else: else:
lines.append(f"{i}. [{title_clean}]({url})") lines.append(f"{i}. [{title_clean}]({url})")
@@ -712,7 +474,7 @@ def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
return lines return lines
def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list[str]: def render_non_match_lines(event_dict, i, mode):
""" """
Render a formatted non-match event dict into lines of text. Render a formatted non-match event dict into lines of text.
@@ -734,7 +496,7 @@ def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list
lines = [] lines = []
if mode == "html": if mode == "html":
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title)}</a>') lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
else: else:
lines.append(f"{i}. [{title}]({url})") lines.append(f"{i}. [{title}]({url})")
@@ -748,8 +510,7 @@ def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list
# FORMAT — LEGACY # FORMAT — LEGACY
# ============================================================ # ============================================================
def format_event(e):
def format_event(e: dict[str, Any]) -> dict[str, Any]:
ml = get_ml_market(e) ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else [] prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
@@ -771,18 +532,14 @@ def format_event(e: dict[str, Any]) -> dict[str, Any]:
"volume": vol, "volume": vol,
} }
def format_detail_event(e):
def format_detail_event(e: dict[str, Any]) -> DetailEvent:
ml = get_ml_market(e) ml = get_ml_market(e)
active_markets = [ active_markets = [
m m for m in e.get("markets", [])
for m in e.get("markets", [])
if float(m.get("volume", 0)) > 0 and is_tradeable_market(m) if float(m.get("volume", 0)) > 0 and is_tradeable_market(m)
] ]
active_markets = sorted( active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True
)
td = _get_time_data(e) td = _get_time_data(e)
@@ -812,21 +569,18 @@ def format_detail_event(e: dict[str, Any]) -> DetailEvent:
], ],
} }
# ============================================================ # ============================================================
# DISPLAY # DISPLAY
# ============================================================ # ============================================================
def get_header_date():
def get_header_date() -> str:
"""Return current date string like 'Mar 25, 2026'""" """Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
return now_utc7.strftime("%b %d, %Y") return now_utc7.strftime("%b %d, %Y")
def get_tournament(title):
def get_tournament(title: str) -> str:
"""Extract tournament name from event title. Title format: 'Category: Team A vs Team B (BO/X) - Tournament Name'""" """Extract tournament name from event title. Title format: 'Category: Team A vs Team B (BO/X) - Tournament Name'"""
if " - " in title: if " - " in title:
parts = title.split(" - ") parts = title.split(" - ")
@@ -834,23 +588,8 @@ def get_tournament(title: str) -> str:
return " - ".join(parts[1:]).strip() return " - ".join(parts[1:]).strip()
return "" return ""
def print_browse(match_events, non_match_events, category, total_raw, total_fetched, total_match, total_non_match, raw_mode=False, partial=False, non_matches_max=5, matches_only=False, non_matches_only=False):
def print_browse(
match_events,
non_match_events,
category,
total_raw,
total_fetched,
total_match,
total_non_match,
raw_mode=False,
partial=False,
non_matches_max=5,
matches_only=False,
non_matches_only=False,
):
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
@@ -860,9 +599,7 @@ def print_browse(
print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}") print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}")
if raw_mode: if raw_mode:
print( print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}")
f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}"
)
if partial: if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete") print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
@@ -896,60 +633,38 @@ def print_browse(
for line in render_non_match_lines(fd, i, mode="text"): for line in render_non_match_lines(fd, i, mode="text"):
print(line) print(line)
def print_detail(e, detail):
def print_detail(e: dict[str, Any], detail: DetailEvent) -> None:
print(f"\n{detail['title']}") print(f"\n{detail['title']}")
print(f"URL: {detail['url']}") print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}") print(f"Livestream: {detail['livestream']}")
spread_str = ( spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
format_spread(detail["best_bid"], detail["best_ask"])
if detail["best_bid"] and detail["best_ask"]
else "N/A"
)
print(f"\n{detail['time_status']}") print(f"\n{detail['time_status']}")
print( print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
f"ML: {detail['outcomes'][0]} "
f"{format_odds(float(detail['prices'][0]))} vs "
f"{detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}"
)
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}") print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
print(f"\nMarkets ({len(detail['markets'])}):") print(f"\nMarkets ({len(detail['markets'])}):")
for m in detail["markets"]: for m in detail["markets"]:
spread_str = ( spread_str = format_spread(m["best_bid"], m["best_ask"]) if m["best_bid"] and m["best_ask"] else "N/A"
format_spread(m["best_bid"], m["best_ask"])
if m["best_bid"] and m["best_ask"]
else "N/A"
)
print(f" [{m['type']}]") print(f" [{m['type']}]")
print( print(f" {m['outcomes'][0]} {format_odds(float(m['prices'][0]))} vs {m['outcomes'][1]} {format_odds(float(m['prices'][1]))}")
f" {m['outcomes'][0]} "
f"{format_odds(float(m['prices'][0]))} vs "
f"{m['outcomes'][1]} {format_odds(float(m['prices'][1]))}"
)
print(f" Vol: ${m['volume']:,.0f} | {spread_str}") print(f" Vol: ${m['volume']:,.0f} | {spread_str}")
print(f" URL: {m['url']}") print(f" URL: {m['url']}")
# ============================================================ # ============================================================
# TELEGRAM # TELEGRAM
# ============================================================ # ============================================================
def escape_html(text):
def escape_html(text: str) -> str:
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML.""" """Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return ( return (text
text.replace("&", "&amp;") .replace("&", "&amp;")
.replace("<", "&lt;") .replace("<", "&lt;")
.replace(">", "&gt;") .replace(">", "&gt;")
.replace('"', "&quot;") .replace('"', "&quot;"))
)
def send_telegram_message( def send_telegram_message(bot_token, chat_id, text, timeout=10):
bot_token: str, chat_id: str, text: str, timeout: int = 10
) -> int:
"""Send a message via Telegram bot API. Returns the message ID on success. """Send a message via Telegram bot API. Returns the message ID on success.
Raises: Raises:
@@ -957,14 +672,12 @@ def send_telegram_message(
URLError/HTTPError: On network or HTTP-level failures. URLError/HTTPError: On network or HTTP-level failures.
""" """
url = f"https://api.telegram.org/bot{bot_token}/sendMessage" url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode( data = urlencode({
{ "chat_id": chat_id,
"chat_id": chat_id, "text": text,
"text": text, "parse_mode": "HTML",
"parse_mode": "HTML", "disable_web_page_preview": "true",
"disable_web_page_preview": "true", }).encode("utf-8")
}
).encode("utf-8")
req = Request(url, data=data, method="POST") req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp: with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read()) result = json.loads(resp.read())
@@ -973,23 +686,15 @@ def send_telegram_message(
return result["result"]["message_id"] return result["result"]["message_id"]
def send_to_telegram( def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
match_events: list[dict[str, Any]],
non_match_events: list[dict[str, Any]],
category: str,
matches_only: bool = False,
non_matches_only: bool = False,
) -> None:
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment.""" """Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
import os import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID") chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id: if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment") raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
@@ -1032,14 +737,7 @@ def send_to_telegram(
send_chunked(lines, send, category, header_date, show_matches, show_non_matches) send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
def send_chunked( def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
all_lines: list[str],
send_fn: Callable[[str], None],
category: str,
header_date: str,
show_matches: bool,
show_non_matches: bool,
) -> None:
""" """
Split already-built lines into Telegram-safe chunks and send them. Split already-built lines into Telegram-safe chunks and send them.
@@ -1102,83 +800,31 @@ def send_chunked(
# MAIN # MAIN
# ============================================================ # ============================================================
def main():
def main() -> None: parser = argparse.ArgumentParser(description="Browse Polymarket tradeable events by game category.")
parser = argparse.ArgumentParser( parser.add_argument("--category", default="Counter Strike",
description="Browse Polymarket tradeable events by game category." choices=list(GAME_CATEGORIES.keys()),
) help="Game category to browse")
parser.add_argument( parser.add_argument("--limit", type=int, default=5,
"--category", help="Max events per section (match + non-match). Default: 5")
default="Counter Strike", parser.add_argument("--matches", type=int, default=None,
choices=list(GAME_CATEGORIES.keys()), help="Max match markets to show. Default: --limit")
help="Game category to browse", parser.add_argument("--non-matches", type=int, default=None,
) help="Max non-match markets to show. Default: --limit")
parser.add_argument( parser.add_argument("--search", type=str, default=None,
"--limit", help="Free-text team/term search within the selected category. Overrides default query.")
type=int, parser.add_argument("--matches-only", action="store_true",
default=5, help="Show only match markets (suppress non-match section).")
help="Max events per section (match + non-match). Default: 5", parser.add_argument("--non-matches-only", action="store_true",
) help="Show only non-match markets (suppress match section).")
parser.add_argument( parser.add_argument("--list-categories", action="store_true",
"--matches", help="List available game categories and exit")
type=int, parser.add_argument("--detail", type=int, default=1,
default=None, help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.")
help="Max match markets to show. Default: --limit", parser.add_argument("--raw", action="store_true",
) help="Show all events without tradeable filter (for debugging).")
parser.add_argument( parser.add_argument("--telegram", action="store_true",
"--non-matches", help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
type=int,
default=None,
help="Max non-match markets to show. Default: --limit",
)
parser.add_argument(
"--search",
type=str,
default=None,
help="Free-text team/term search within the selected category. Overrides default query.",
)
parser.add_argument(
"--matches-only",
action="store_true",
help="Show only match markets (suppress non-match section).",
)
parser.add_argument(
"--non-matches-only",
action="store_true",
help="Show only non-match markets (suppress match section).",
)
parser.add_argument(
"--list-categories",
action="store_true",
help="List available game categories and exit",
)
parser.add_argument(
"--detail",
type=int,
default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.",
)
parser.add_argument(
"--raw",
action="store_true",
help="Show all events without tradeable filter (for debugging).",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable cache and fetch fresh data from API.",
)
parser.add_argument(
"--max-total",
type=int,
default=None,
help="Max total events to fetch before early exit. Default: no limit.",
)
parser.add_argument(
"--telegram",
action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).",
)
args = parser.parse_args() args = parser.parse_args()
if args.list_categories: if args.list_categories:
@@ -1198,14 +844,7 @@ def main() -> None:
else: else:
print(f"\nFetching {args.category} events...") print(f"\nFetching {args.category} events...")
result = browse_events( result = browse_events(search_term, matches_max=matches_max, non_matches_max=non_matches_max, tradeable_only=tradeable_only)
search_term,
matches_max=matches_max,
non_matches_max=non_matches_max,
tradeable_only=tradeable_only,
max_total=args.max_total,
use_cache=not args.no_cache,
)
print_browse( print_browse(
result["match_events"], result["match_events"],
@@ -1219,7 +858,7 @@ def main() -> None:
partial=result.get("partial", False), partial=result.get("partial", False),
non_matches_max=non_matches_max, non_matches_max=non_matches_max,
matches_only=args.matches_only, matches_only=args.matches_only,
non_matches_only=args.non_matches_only, non_matches_only=args.non_matches_only
) )
# Print detail for selected event if any # Print detail for selected event if any
@@ -1239,9 +878,8 @@ def main() -> None:
result["non_match_events"], result["non_match_events"],
args.category, args.category,
matches_only=args.matches_only, matches_only=args.matches_only,
non_matches_only=args.non_matches_only, non_matches_only=args.non_matches_only
) )
if __name__ == "__main__": if __name__ == "__main__":
main() main()

File diff suppressed because it is too large Load Diff