1 Commits

Author SHA1 Message Date
shoko
d018e87b35 Fix #14: Refactor print_browse/send_to_telegram into single pipeline
Replace duplicate inline formatting with unified format+render pipeline.

New functions:
- format_match_event(e) — canonical dict for match events
- format_non_match_event(e) — canonical dict for non-match events
- render_match_lines(event_dict, i, mode) — text/HTML renderer
- render_non_match_lines(event_dict, i, mode) — text/HTML renderer
- send_chunked(...) — extracted Telegram chunking logic

Also fixed send_chunked() chunking bug: the original '. ' in line
check never matched event lines (period is followed by '</b>' not space).

Tests: 38 total, all passing.

Fixes: #14
2026-03-25 17:47:03 +00:00
3 changed files with 275 additions and 1725 deletions

View File

@@ -34,7 +34,7 @@ hermes mcp add polymarket https://docs.polymarket.com/mcp
## Usage
```
polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram] [--no-cache] [--max-total N]
polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram]
```
## Arguments
@@ -49,8 +49,6 @@ polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non
- `--detail` : Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.
- `--list-categories` : List available game categories and exit
- `--raw` : Show all events without tradeable filter (for debugging). Includes fetch stats.
- `--no-cache` : Disable caching and fetch fresh data from the API.
- `--max-total` : Maximum total events to fetch before early exit. Default: no limit. Useful for quick snapshots.
- `--telegram` : Send results to Telegram. Requires `BOT_TOKEN` and `CHAT_ID` in environment variables.
## Output Format
@@ -120,32 +118,13 @@ Use `--raw` to disable the tradeable filter and see all match markets regardless
## Pagination
The script fetches **ALL pages** until the API runs out of results.
### Parallel Fetching
Pages are fetched in **parallel batches of 5** using ThreadPoolExecutor. This significantly reduces fetch time:
| Scenario | Without Parallelization | With Parallelization |
|----------|------------------------|---------------------|
| 10 pages (50 events) | ~20s (2s per page × 10) | ~4s (2s per batch × 2 batches) |
| 20 pages (100 events) | ~40s | ~8s |
The script first fetches page 1 to determine total pages, then fetches remaining pages in parallel batches of 5.
The script fetches **ALL pages** until the API runs out of results (up to 100 pages as a safety cap).
## Rate Limiting
- Exponential backoff: 2s → 4s → 8s → 16s → 32s
- Max 5 retries before aborting
## Caching
Results are cached in `~/.cache/polymarket-browse/` with a **5-minute TTL** to reduce redundant API calls.
- Use `--no-cache` to bypass the cache and fetch fresh data
- Cached data is automatically used when available and not expired
- Useful when running the script repeatedly (e.g., for monitoring)
## Odds Format
All odds are shown in **cents** format:

View File

@@ -8,88 +8,10 @@ import html
import json
import time
import argparse
import hashlib
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from typing import Any, Callable, TypedDict
from urllib.parse import urlencode
from urllib.request import urlopen, Request
class TimeData(TypedDict):
time_status: str
time_urgency: int
abs_time: str
class MatchEvent(TypedDict):
title: str
title_clean: str
tournament: str
url: str
time_status: str
time_urgency: int
abs_time: str
team_a: str
team_b: str
odds_a: str
odds_b: str
vol: float
class NonMatchEvent(TypedDict):
title: str
url: str
time_status: str
time_urgency: int
abs_time: str
market_count: int
total_vol: int
class Market(TypedDict):
type: str
question: str
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
url: str
class DetailEvent(TypedDict):
title: str
time_status: str
abs_time: str
url: str
livestream: str | None
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
markets: list[Market]
class BrowseResult(TypedDict):
query: str
total_raw: int
total_fetched: int
total_match: int
total_non_match: int
match_events: list[Any]
non_match_events: list[Any]
partial: bool
class FetchResult(TypedDict):
events: list[Any]
total_raw: int
partial: bool
# ============================================================
# CONFIG
# ============================================================
@@ -111,200 +33,72 @@ GAME_CATEGORIES = {
"Tennis": "Tennis",
}
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "polymarket-browse")
CACHE_TTL = 300 # 5 minutes default
MAX_PARALLEL_FETCHES = 5
# ============================================================
# CACHE
# ============================================================
def _get_cache_key(q: str) -> str:
return hashlib.md5(q.encode()).hexdigest()
def _get_cache_path(q: str) -> str:
os.makedirs(CACHE_DIR, exist_ok=True)
return os.path.join(CACHE_DIR, f"{_get_cache_key(q)}.json")
def _read_cache(q: str) -> dict[str, Any] | None:
cache_path = _get_cache_path(q)
if not os.path.exists(cache_path):
return None
try:
mtime = os.path.getmtime(cache_path)
age = time.time() - mtime
if age > CACHE_TTL:
return None
with open(cache_path) as f:
return json.load(f)
except Exception:
return None
def _write_cache(q: str, data: dict[str, Any]) -> None:
try:
cache_path = _get_cache_path(q)
with open(cache_path, "w") as f:
json.dump(data, f)
except Exception:
pass
# ============================================================
# FETCH
# ============================================================
def fetch_page(
q: str,
page: int = 1,
max_retries: int = MAX_RETRIES,
initial_delay: float = INITIAL_RETRY_DELAY,
) -> dict[str, Any] | None:
def fetch_page(q, page=1, max_retries=MAX_RETRIES, initial_delay=INITIAL_RETRY_DELAY):
base = "https://gamma-api.polymarket.com/public-search"
url = (
f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false"
)
url = (f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false")
delay = initial_delay
for attempt in range(max_retries):
if attempt > 0:
time.sleep(delay)
try:
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(req, timeout=10) as r:
return json.loads(r.read())
except Exception:
time.sleep(delay)
r = subprocess.run(
["curl", "-s", url, "--max-time", "10", "-H", "User-Agent: curl/7.88.1"],
capture_output=True
)
if r.returncode == 0 and len(r.stdout) > 0:
try:
return json.loads(r.stdout.decode('utf-8'))
except json.JSONDecodeError:
if attempt < max_retries - 1:
delay *= 2 # Exponential backoff
continue
return None
else:
# Rate limit or other error - exponential backoff
if attempt < max_retries - 1:
delay *= 2
continue
return None
return None
def _fetch_page_with_index(q: str, page: int) -> tuple[int, dict[str, Any] | None]:
return page, fetch_page(q, page)
def fetch_all_pages(
q: str,
matches_max: int | None = None,
non_matches_max: int | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> FetchResult:
def fetch_all_pages(q, max_pages=100):
"""
Fetch pages until pagination ends, or until quotas are satisfied.
Args:
q: search query
matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit)
max_total: stop early once we have this many total events (None = no limit)
use_cache: whether to use cache (default True)
Returns:
FetchResult with events, total_raw, and partial flag
Fetch ALL pages until pagination ends.
max_pages is a safety cap to prevent infinite loops.
"""
cached = _read_cache(q) if use_cache else None
if cached is not None:
events = cached.get("events", [])
total_raw = cached.get("total_raw", 0)
if events:
return {"events": events, "total_raw": total_raw, "partial": False}
all_events = []
total_raw = 0
page_count = 0
page1_data = None
while True:
page_count += 1
data = fetch_page(q, page_count)
for page in range(1, max_pages + 1):
time.sleep(0.2) # small delay between pages (API rate limit is generous)
data = fetch_page(q, page)
if data is None:
break
events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0)
if page_count == 1:
page1_data = data
if total_raw > 0:
all_events.extend(events)
# Stop when we get 0 events (no more pages),
# OR when we've fetched >= total results
if len(events) == 0:
break
if not data.get("events"):
if len(all_events) >= total_raw:
break
if total_raw == 0 or page1_data is None:
return {"events": [], "total_raw": 0, "partial": False}
page1_events = page1_data.get("events", [])
actual_page_size = len(page1_events)
# Use actual events per page from API for ceiling division
# ceil(total_raw / actual_page_size) = (total_raw + actual_page_size - 1) // actual_page_size
total_pages = (total_raw + actual_page_size - 1) // actual_page_size
concurrency = min(MAX_PARALLEL_FETCHES, total_pages)
all_page_data: dict[int, list[Any]] = {1: page1_events}
if total_pages > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(_fetch_page_with_index, q, page): page
for page in range(2, total_pages + 1)
}
for future in as_completed(futures):
try:
page_num, data = future.result()
if data is not None:
all_page_data[page_num] = data.get("events", [])
except Exception:
pass
all_events = []
for page_num in sorted(all_page_data.keys()):
all_events.extend(all_page_data[page_num])
_write_cache(q, {"events": all_events, "total_raw": total_raw})
match_count = 0
non_match_count = 0
filtered_events = []
total_seen = 0
for e in all_events:
is_match = is_match_market(e)
if is_match:
match_count += 1
else:
non_match_count += 1
filtered_events.append(e)
if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max:
break
if max_total is not None:
total_seen += 1
if total_seen >= max_total:
break
partial = len(all_events) < total_raw
return {"events": filtered_events, "total_raw": total_raw, "partial": partial}
partial = (total_raw > 0 and len(all_events) < total_raw)
return {"events": all_events, "total_raw": total_raw, "partial": partial}
# ============================================================
# FILTERS
# ============================================================
def is_match_market(e: dict[str, Any]) -> bool:
def is_match_market(e):
return (e.get("seriesSlug") and e.get("gameId")) or " vs " in e.get("title", "")
def get_event_url(e: dict[str, Any]) -> str:
def get_event_url(e):
"""Return the correct Polymarket URL for an event.
Match markets use /market/, non-match events use /event/.
"""
@@ -314,20 +108,17 @@ def get_event_url(e: dict[str, Any]) -> str:
else:
return f"https://polymarket.com/event/{slug}"
def get_ml_market(e: dict[str, Any]) -> dict[str, Any] | None:
def get_ml_market(e):
for m in e.get("markets", []):
if m.get("sportsMarketType") == "moneyline":
return m
return None
def get_ml_volume(e: dict[str, Any]) -> float:
def get_ml_volume(e):
ml = get_ml_market(e)
return float(ml.get("volume", 0)) if ml else 0.0
def is_bo2_tie(e: dict[str, Any]) -> bool:
def is_bo2_tie(e):
"""
Detect if this is a BO2 that ended in a tie (1-1).
Returns True if all child_moneyline markets are closed (match is over but tied).
@@ -336,11 +127,7 @@ def is_bo2_tie(e: dict[str, Any]) -> bool:
if "BO2" not in title:
return False
child_markets = [
m
for m in e.get("markets", [])
if m.get("sportsMarketType") == "child_moneyline"
]
child_markets = [m for m in e.get("markets", []) if m.get("sportsMarketType") == "child_moneyline"]
if len(child_markets) != 2:
return False
@@ -348,8 +135,7 @@ def is_bo2_tie(e: dict[str, Any]) -> bool:
all_closed = all(m.get("closed", False) for m in child_markets)
return all_closed
def is_tradeable_event(e: dict[str, Any]) -> bool:
def is_tradeable_event(e):
ml = get_ml_market(e)
if not ml:
return False
@@ -379,7 +165,7 @@ def is_tradeable_event(e: dict[str, Any]) -> bool:
end_str = e.get("endDate", "")
if end_str:
try:
end_dt = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
if end_dt < now:
return False
@@ -390,7 +176,7 @@ def is_tradeable_event(e: dict[str, Any]) -> bool:
start_str = e.get("startTime") or e.get("startDate", "")
if start_str:
try:
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
if start_dt < now:
# Check if it's recently started (within 4h) — consider those "live" still
@@ -402,8 +188,7 @@ def is_tradeable_event(e: dict[str, Any]) -> bool:
return True
def is_tradeable_market(m: dict[str, Any]) -> bool:
def is_tradeable_market(m):
accepting = m.get("acceptingOrders", False)
closed = m.get("closed", True)
best_ask = float(m.get("bestAsk", 0))
@@ -423,26 +208,22 @@ def is_tradeable_market(m: dict[str, Any]) -> bool:
return True
# ============================================================
# FORMATTING
# ============================================================
def prob_to_cents(p: float) -> int:
def prob_to_cents(p):
return int(round(p * 100))
def format_odds(p: float) -> str:
def format_odds(p):
return f"{prob_to_cents(p)}c"
def format_spread(bid: float, ask: float) -> str:
def format_spread(bid, ask):
spread = ask - bid
return f"{prob_to_cents(spread)}c"
def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
def _get_time_data(e, tz=None):
"""
Unified time data extraction for event timestamps.
@@ -456,7 +237,11 @@ def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
Defaults to WIB (UTC+7).
Returns:
TimeData with time_status, time_urgency, and abs_time
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
"""
tz = tz or WIB
start_str = e.get("startTime") or e.get("startDate", "")
@@ -465,13 +250,13 @@ def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
try:
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc
total_sec = delta.total_seconds()
if total_sec <= 0:
# Event is in the past or happening now
if total_sec < 0:
# Event is in the past
hours_ago = abs(total_sec) / 3600
if hours_ago < 1:
time_status = "LIVE"
@@ -506,24 +291,18 @@ def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
abs_time += "WIB"
else:
abs_time += start_dt.astimezone(tz).strftime("%Z")
return {
"time_status": time_status,
"time_urgency": time_urgency,
"abs_time": abs_time,
}
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
except Exception:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
def filter_events(
events: list[dict[str, Any]], tradeable_only: bool = True
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
def filter_events(events, tradeable_only=True):
"""
Classify events into match_markets and non_match_markets.
If tradeable_only=True, also filter out non-tradeable events.
"""
match_events: list[dict[str, Any]] = []
non_match_events: list[dict[str, Any]] = []
match_events = []
non_match_events = []
for e in events:
if is_match_market(e):
@@ -535,79 +314,53 @@ def filter_events(
return match_events, non_match_events
def sort_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]:
def sort_events(events):
return sorted(events, key=get_ml_volume, reverse=True)
# ============================================================
# BROWSE
# ============================================================
def browse_events(
q: str,
matches_max: int = 10,
non_matches_max: int = 10,
tradeable_only: bool = True,
sort_by: str | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> BrowseResult:
"""
Browse Polymarket events.
Args:
q: search query
matches_max: max number of match markets to return
non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
max_total: max total events to fetch before early exit (None = no limit)
use_cache: whether to use cache (default True)
"""
use_early_exit = sort_by is None
fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages(
q,
matches_max=fetch_matches_max,
non_matches_max=fetch_non_matches_max,
max_total=max_total,
use_cache=use_cache,
)
def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
result = fetch_all_pages(q)
events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only)
# Sort if requested; otherwise preserve API order
if sort_by == "volume":
match_events = sort_events(match_events)
non_match_events = sort_events(non_match_events)
sorted_match = sort_events(match_events)
return {
"query": q,
"total_raw": result["total_raw"],
"total_fetched": len(events),
"total_match": len(match_events),
"total_non_match": len(non_match_events),
"match_events": match_events[:matches_max],
"match_events": sorted_match[:matches_max],
"non_match_events": non_match_events[:non_matches_max],
"partial": result.get("partial", False),
}
# ============================================================
# FORMAT — EVENT
# ============================================================
def format_match_event(e: dict[str, Any]) -> MatchEvent:
def format_match_event(e):
"""
Format a match event into a canonical dict for rendering.
All computing done here; renderers just template.
Returns:
MatchEvent with all required fields
{
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
"""
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
@@ -643,12 +396,20 @@ def format_match_event(e: dict[str, Any]) -> MatchEvent:
}
def format_non_match_event(e: dict[str, Any]) -> NonMatchEvent:
def format_non_match_event(e):
"""
Format a non-match event into a canonical dict for rendering.
Returns:
NonMatchEvent with all required fields
{
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
"""
td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
@@ -669,8 +430,7 @@ def format_non_match_event(e: dict[str, Any]) -> NonMatchEvent:
# FORMAT — RENDER
# ============================================================
def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
def render_match_lines(event_dict, i, mode):
"""
Render a formatted match event dict into lines of text.
@@ -697,7 +457,9 @@ def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
lines = []
if mode == "html":
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title_clean)}</a>')
lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
else:
lines.append(f"{i}. [{title_clean}]({url})")
@@ -712,7 +474,7 @@ def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
return lines
def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list[str]:
def render_non_match_lines(event_dict, i, mode):
"""
Render a formatted non-match event dict into lines of text.
@@ -734,7 +496,7 @@ def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list
lines = []
if mode == "html":
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title)}</a>')
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
else:
lines.append(f"{i}. [{title}]({url})")
@@ -748,8 +510,7 @@ def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list
# FORMAT — LEGACY
# ============================================================
def format_event(e: dict[str, Any]) -> dict[str, Any]:
def format_event(e):
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
@@ -771,18 +532,14 @@ def format_event(e: dict[str, Any]) -> dict[str, Any]:
"volume": vol,
}
def format_detail_event(e: dict[str, Any]) -> DetailEvent:
def format_detail_event(e):
ml = get_ml_market(e)
active_markets = [
m
for m in e.get("markets", [])
m for m in e.get("markets", [])
if float(m.get("volume", 0)) > 0 and is_tradeable_market(m)
]
active_markets = sorted(
active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True
)
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
td = _get_time_data(e)
@@ -812,21 +569,18 @@ def format_detail_event(e: dict[str, Any]) -> DetailEvent:
],
}
# ============================================================
# DISPLAY
# ============================================================
def get_header_date() -> str:
def get_header_date():
"""Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
return now_utc7.strftime("%b %d, %Y")
def get_tournament(title: str) -> str:
def get_tournament(title):
"""Extract tournament name from event title. Title format: 'Category: Team A vs Team B (BO/X) - Tournament Name'"""
if " - " in title:
parts = title.split(" - ")
@@ -834,23 +588,8 @@ def get_tournament(title: str) -> str:
return " - ".join(parts[1:]).strip()
return ""
def print_browse(
match_events,
non_match_events,
category,
total_raw,
total_fetched,
total_match,
total_non_match,
raw_mode=False,
partial=False,
non_matches_max=5,
matches_only=False,
non_matches_only=False,
):
def print_browse(match_events, non_match_events, category, total_raw, total_fetched, total_match, total_non_match, raw_mode=False, partial=False, non_matches_max=5, matches_only=False, non_matches_only=False):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
@@ -860,9 +599,7 @@ def print_browse(
print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}")
if raw_mode:
print(
f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}"
)
print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}")
if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
@@ -896,60 +633,38 @@ def print_browse(
for line in render_non_match_lines(fd, i, mode="text"):
print(line)
def print_detail(e: dict[str, Any], detail: DetailEvent) -> None:
def print_detail(e, detail):
print(f"\n{detail['title']}")
print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}")
spread_str = (
format_spread(detail["best_bid"], detail["best_ask"])
if detail["best_bid"] and detail["best_ask"]
else "N/A"
)
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
print(f"\n{detail['time_status']}")
print(
f"ML: {detail['outcomes'][0]} "
f"{format_odds(float(detail['prices'][0]))} vs "
f"{detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}"
)
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
print(f"\nMarkets ({len(detail['markets'])}):")
for m in detail["markets"]:
spread_str = (
format_spread(m["best_bid"], m["best_ask"])
if m["best_bid"] and m["best_ask"]
else "N/A"
)
spread_str = format_spread(m["best_bid"], m["best_ask"]) if m["best_bid"] and m["best_ask"] else "N/A"
print(f" [{m['type']}]")
print(
f" {m['outcomes'][0]} "
f"{format_odds(float(m['prices'][0]))} vs "
f"{m['outcomes'][1]} {format_odds(float(m['prices'][1]))}"
)
print(f" {m['outcomes'][0]} {format_odds(float(m['prices'][0]))} vs {m['outcomes'][1]} {format_odds(float(m['prices'][1]))}")
print(f" Vol: ${m['volume']:,.0f} | {spread_str}")
print(f" URL: {m['url']}")
# ============================================================
# TELEGRAM
# ============================================================
def escape_html(text: str) -> str:
def escape_html(text):
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (
text.replace("&", "&amp;")
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
.replace('"', "&quot;"))
def send_telegram_message(
bot_token: str, chat_id: str, text: str, timeout: int = 10
) -> int:
def send_telegram_message(bot_token, chat_id, text, timeout=10):
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
@@ -957,14 +672,12 @@ def send_telegram_message(
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode(
{
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}
).encode("utf-8")
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
@@ -973,23 +686,15 @@ def send_telegram_message(
return result["result"]["message_id"]
def send_to_telegram(
match_events: list[dict[str, Any]],
non_match_events: list[dict[str, Any]],
category: str,
matches_only: bool = False,
non_matches_only: bool = False,
) -> None:
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
@@ -1032,14 +737,7 @@ def send_to_telegram(
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
def send_chunked(
all_lines: list[str],
send_fn: Callable[[str], None],
category: str,
header_date: str,
show_matches: bool,
show_non_matches: bool,
) -> None:
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
"""
Split already-built lines into Telegram-safe chunks and send them.
@@ -1102,83 +800,31 @@ def send_chunked(
# MAIN
# ============================================================
def main() -> None:
parser = argparse.ArgumentParser(
description="Browse Polymarket tradeable events by game category."
)
parser.add_argument(
"--category",
default="Counter Strike",
choices=list(GAME_CATEGORIES.keys()),
help="Game category to browse",
)
parser.add_argument(
"--limit",
type=int,
default=5,
help="Max events per section (match + non-match). Default: 5",
)
parser.add_argument(
"--matches",
type=int,
default=None,
help="Max match markets to show. Default: --limit",
)
parser.add_argument(
"--non-matches",
type=int,
default=None,
help="Max non-match markets to show. Default: --limit",
)
parser.add_argument(
"--search",
type=str,
default=None,
help="Free-text team/term search within the selected category. Overrides default query.",
)
parser.add_argument(
"--matches-only",
action="store_true",
help="Show only match markets (suppress non-match section).",
)
parser.add_argument(
"--non-matches-only",
action="store_true",
help="Show only non-match markets (suppress match section).",
)
parser.add_argument(
"--list-categories",
action="store_true",
help="List available game categories and exit",
)
parser.add_argument(
"--detail",
type=int,
default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.",
)
parser.add_argument(
"--raw",
action="store_true",
help="Show all events without tradeable filter (for debugging).",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable cache and fetch fresh data from API.",
)
parser.add_argument(
"--max-total",
type=int,
default=None,
help="Max total events to fetch before early exit. Default: no limit.",
)
parser.add_argument(
"--telegram",
action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).",
)
def main():
parser = argparse.ArgumentParser(description="Browse Polymarket tradeable events by game category.")
parser.add_argument("--category", default="Counter Strike",
choices=list(GAME_CATEGORIES.keys()),
help="Game category to browse")
parser.add_argument("--limit", type=int, default=5,
help="Max events per section (match + non-match). Default: 5")
parser.add_argument("--matches", type=int, default=None,
help="Max match markets to show. Default: --limit")
parser.add_argument("--non-matches", type=int, default=None,
help="Max non-match markets to show. Default: --limit")
parser.add_argument("--search", type=str, default=None,
help="Free-text team/term search within the selected category. Overrides default query.")
parser.add_argument("--matches-only", action="store_true",
help="Show only match markets (suppress non-match section).")
parser.add_argument("--non-matches-only", action="store_true",
help="Show only non-match markets (suppress match section).")
parser.add_argument("--list-categories", action="store_true",
help="List available game categories and exit")
parser.add_argument("--detail", type=int, default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.")
parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
args = parser.parse_args()
if args.list_categories:
@@ -1198,14 +844,7 @@ def main() -> None:
else:
print(f"\nFetching {args.category} events...")
result = browse_events(
search_term,
matches_max=matches_max,
non_matches_max=non_matches_max,
tradeable_only=tradeable_only,
max_total=args.max_total,
use_cache=not args.no_cache,
)
result = browse_events(search_term, matches_max=matches_max, non_matches_max=non_matches_max, tradeable_only=tradeable_only)
print_browse(
result["match_events"],
@@ -1219,7 +858,7 @@ def main() -> None:
partial=result.get("partial", False),
non_matches_max=non_matches_max,
matches_only=args.matches_only,
non_matches_only=args.non_matches_only,
non_matches_only=args.non_matches_only
)
# Print detail for selected event if any
@@ -1239,9 +878,8 @@ def main() -> None:
result["non_match_events"],
args.category,
matches_only=args.matches_only,
non_matches_only=args.non_matches_only,
non_matches_only=args.non_matches_only
)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff