17 Commits

Author SHA1 Message Date
shoko
53c268511a Add test for event happening exactly now shows LIVE
Added test_get_time_data_live_exactly_now to prevent regression.
2026-03-26 17:46:53 +00:00
shoko
7f3b885521 Fix: Event happening exactly now shows 'LIVE' instead of 'In 0m'
Changed condition from total_sec < 0 to total_sec <= 0 to catch the edge case where an event is happening right now.
2026-03-26 17:43:28 +00:00
shoko
e0ae6bbd14 Remove incorrect '100 pages safety cap' claim from SKILL.md
This limit doesn't exist in the code.
2026-03-26 17:37:41 +00:00
f468a12ea4 Merge pull request 'Add parallel fetching, caching, and max_total parameter' (#26) from feat/parallel-fetch-cache into master 2026-03-26 18:23:20 +01:00
shoko
c75d123dfd Update SKILL.md with new caching and parallel fetching documentation 2026-03-26 17:19:03 +00:00
shoko
9d1e328f53 Make page size calculation dynamic based on first API response
- Uses actual event count from page 1 to calculate total_pages
- Removes hardcoded '5' for events per page
- API changes to page size will be handled automatically
- Updated tests to match real API behavior (5 events per page)
2026-03-26 17:15:28 +00:00
shoko
09f3cb9066 Add comment explaining total_pages ceiling division calculation 2026-03-26 17:06:25 +00:00
shoko
1ae60f5661 Fix total_pages calculation bug and add tests
- Fixed total_pages calculation: API returns 5 events/page, not PAGE_SIZE
- This was causing partial=false positives when max_total was used
- Updated tests to use correct pagination values
2026-03-26 16:54:41 +00:00
shoko
bab373ab8f Add unit tests for parallelization, cache, and max_total
- TestParallelFetchConcurrency: verify batch size of 5 and concurrency limit
- TestCacheFunctions: test cache read/write error handling
- TestMaxTotalParameter: test max_total event limiting
2026-03-26 16:43:13 +00:00
shoko
eafbdba4a5 Add parallel fetching, caching, and max_total parameter
- Parallel page fetching with ThreadPoolExecutor (concurrency=5)
- File-based cache with 5 min TTL in ~/.cache/polymarket-browse/
- New --no-cache flag to bypass cache
- New --max-total parameter for early exit
- Updated tests to work with new implementation
2026-03-26 16:29:25 +00:00
0a7911653b Merge pull request 'Fix line lengths in browse.py' (#24) from fix/line-lengths into master 2026-03-26 16:42:53 +01:00
bae69043f8 Merge pull request 'Add type hints to browse.py' (#23) from feat/add-type-hints into master 2026-03-26 16:42:08 +01:00
shoko
d6c0342c0f Fix line lengths in browse.py
Split 2 lines that exceeded 120 chars in print_detail function.
All 62 tests pass.
2026-03-26 15:40:21 +00:00
shoko
ce526b1aa3 Add type hints to browse.py
- Added TypedDict classes for typed event/market structures
- Added type annotations to all functions
- Used Python 3.10+ union syntax (str | None, dict[str, Any])
- All 62 tests pass
2026-03-26 15:35:18 +00:00
ae50fd14f0 Merge pull request 'Fix #14: Refactor print_browse/send_to_telegram into single pipeline' (#22) from fix/issue-14-refactor-browse into master 2026-03-25 20:11:07 +01:00
shoko
c348d6daa1 tests: Add unit tests for browse_events, fetch_all_pages, filter_events, is_match_market, get_ml_market, get_ml_volume, sort_events
New test classes:
- TestIsMatchMarket: 5 tests for is_match_market() classification
- TestGetMlMarket: 5 tests for get_ml_market() and get_ml_volume()
- TestFilterEvents: 5 tests for filter_events() and sort_events()
- TestFetchAllPages: 4 tests for fetch_all_pages() early-exit logic
- TestBrowseEvents: 5 tests for browse_events() sort_by parameter

Total: 24 new tests (62 total, all passing)
2026-03-25 19:08:36 +00:00
shoko
764c75e712 Fix: Switch fetch_page from subprocess to urllib, add early-exit to fetch_all_pages, add sort_by to browse_events
- fetch_page: replace subprocess.run(curl) with urllib (stdlib, cleaner)
- fetch_all_pages: add matches_max/non_matches_max params for early-exit.
  When both are set, stop fetching once quotas are satisfied.
- browse_events: add sort_by param (None='fast' early-exit, 'volume'=full fetch+sort).
  Early-exit only used when sort_by=None (no client-side sort needed).
- Remove subprocess import (no longer needed after migration)
2026-03-25 18:53:11 +00:00
3 changed files with 1725 additions and 275 deletions

View File

@@ -34,7 +34,7 @@ hermes mcp add polymarket https://docs.polymarket.com/mcp
## Usage
```
polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram]
polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram] [--no-cache] [--max-total N]
```
## Arguments
@@ -49,6 +49,8 @@ polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non
- `--detail` : Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.
- `--list-categories` : List available game categories and exit
- `--raw` : Show all events without tradeable filter (for debugging). Includes fetch stats.
- `--no-cache` : Disable caching and fetch fresh data from the API.
- `--max-total` : Maximum total events to fetch before early exit. Default: no limit. Useful for quick snapshots.
- `--telegram` : Send results to Telegram. Requires `BOT_TOKEN` and `CHAT_ID` in environment variables.
## Output Format
@@ -118,13 +120,32 @@ Use `--raw` to disable the tradeable filter and see all match markets regardless
## Pagination
The script fetches **ALL pages** until the API runs out of results (up to 100 pages as a safety cap).
The script fetches **ALL pages** until the API runs out of results.
### Parallel Fetching
Pages are fetched in **parallel batches of 5** using ThreadPoolExecutor. This significantly reduces fetch time:
| Scenario | Without Parallelization | With Parallelization |
|----------|------------------------|---------------------|
| 10 pages (50 events) | ~20s (2s per page × 10) | ~4s (2s per batch × 2 batches) |
| 20 pages (100 events) | ~40s | ~8s |
The script first fetches page 1 to determine total pages, then fetches remaining pages in parallel batches of 5.
## Rate Limiting
- Exponential backoff: 2s → 4s → 8s → 16s → 32s
- Max 5 retries before aborting
## Caching
Results are cached in `~/.cache/polymarket-browse/` with a **5-minute TTL** to reduce redundant API calls.
- Use `--no-cache` to bypass the cache and fetch fresh data
- Cached data is automatically used when available and not expired
- Useful when running the script repeatedly (e.g., for monitoring)
## Odds Format
All odds are shown in **cents** format:

View File

@@ -8,10 +8,88 @@ import html
import json
import time
import argparse
import hashlib
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from typing import Any, Callable, TypedDict
from urllib.parse import urlencode
from urllib.request import urlopen, Request
class TimeData(TypedDict):
time_status: str
time_urgency: int
abs_time: str
class MatchEvent(TypedDict):
title: str
title_clean: str
tournament: str
url: str
time_status: str
time_urgency: int
abs_time: str
team_a: str
team_b: str
odds_a: str
odds_b: str
vol: float
class NonMatchEvent(TypedDict):
title: str
url: str
time_status: str
time_urgency: int
abs_time: str
market_count: int
total_vol: int
class Market(TypedDict):
type: str
question: str
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
url: str
class DetailEvent(TypedDict):
title: str
time_status: str
abs_time: str
url: str
livestream: str | None
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
markets: list[Market]
class BrowseResult(TypedDict):
query: str
total_raw: int
total_fetched: int
total_match: int
total_non_match: int
match_events: list[Any]
non_match_events: list[Any]
partial: bool
class FetchResult(TypedDict):
events: list[Any]
total_raw: int
partial: bool
# ============================================================
# CONFIG
# ============================================================
@@ -33,72 +111,200 @@ GAME_CATEGORIES = {
"Tennis": "Tennis",
}
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "polymarket-browse")
CACHE_TTL = 300 # 5 minutes default
MAX_PARALLEL_FETCHES = 5
# ============================================================
# CACHE
# ============================================================
def _get_cache_key(q: str) -> str:
return hashlib.md5(q.encode()).hexdigest()
def _get_cache_path(q: str) -> str:
os.makedirs(CACHE_DIR, exist_ok=True)
return os.path.join(CACHE_DIR, f"{_get_cache_key(q)}.json")
def _read_cache(q: str) -> dict[str, Any] | None:
cache_path = _get_cache_path(q)
if not os.path.exists(cache_path):
return None
try:
mtime = os.path.getmtime(cache_path)
age = time.time() - mtime
if age > CACHE_TTL:
return None
with open(cache_path) as f:
return json.load(f)
except Exception:
return None
def _write_cache(q: str, data: dict[str, Any]) -> None:
try:
cache_path = _get_cache_path(q)
with open(cache_path, "w") as f:
json.dump(data, f)
except Exception:
pass
# ============================================================
# FETCH
# ============================================================
def fetch_page(q, page=1, max_retries=MAX_RETRIES, initial_delay=INITIAL_RETRY_DELAY):
def fetch_page(
q: str,
page: int = 1,
max_retries: int = MAX_RETRIES,
initial_delay: float = INITIAL_RETRY_DELAY,
) -> dict[str, Any] | None:
base = "https://gamma-api.polymarket.com/public-search"
url = (f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false")
url = (
f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false"
)
delay = initial_delay
for attempt in range(max_retries):
time.sleep(delay)
r = subprocess.run(
["curl", "-s", url, "--max-time", "10", "-H", "User-Agent: curl/7.88.1"],
capture_output=True
)
if r.returncode == 0 and len(r.stdout) > 0:
try:
return json.loads(r.stdout.decode('utf-8'))
except json.JSONDecodeError:
if attempt < max_retries - 1:
delay *= 2 # Exponential backoff
continue
return None
else:
# Rate limit or other error - exponential backoff
if attempt > 0:
time.sleep(delay)
try:
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(req, timeout=10) as r:
return json.loads(r.read())
except Exception:
if attempt < max_retries - 1:
delay *= 2
continue
return None
return None
def fetch_all_pages(q, max_pages=100):
def _fetch_page_with_index(q: str, page: int) -> tuple[int, dict[str, Any] | None]:
return page, fetch_page(q, page)
def fetch_all_pages(
q: str,
matches_max: int | None = None,
non_matches_max: int | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> FetchResult:
"""
Fetch ALL pages until pagination ends.
max_pages is a safety cap to prevent infinite loops.
Fetch pages until pagination ends, or until quotas are satisfied.
Args:
q: search query
matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit)
max_total: stop early once we have this many total events (None = no limit)
use_cache: whether to use cache (default True)
Returns:
FetchResult with events, total_raw, and partial flag
"""
all_events = []
cached = _read_cache(q) if use_cache else None
if cached is not None:
events = cached.get("events", [])
total_raw = cached.get("total_raw", 0)
if events:
return {"events": events, "total_raw": total_raw, "partial": False}
total_raw = 0
for page in range(1, max_pages + 1):
time.sleep(0.2) # small delay between pages (API rate limit is generous)
data = fetch_page(q, page)
page_count = 0
page1_data = None
while True:
page_count += 1
data = fetch_page(q, page_count)
if data is None:
break
events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0)
all_events.extend(events)
# Stop when we get 0 events (no more pages),
# OR when we've fetched >= total results
if len(events) == 0:
if page_count == 1:
page1_data = data
if total_raw > 0:
break
if len(all_events) >= total_raw:
if not data.get("events"):
break
partial = (total_raw > 0 and len(all_events) < total_raw)
return {"events": all_events, "total_raw": total_raw, "partial": partial}
if total_raw == 0 or page1_data is None:
return {"events": [], "total_raw": 0, "partial": False}
page1_events = page1_data.get("events", [])
actual_page_size = len(page1_events)
# Use actual events per page from API for ceiling division
# ceil(total_raw / actual_page_size) = (total_raw + actual_page_size - 1) // actual_page_size
total_pages = (total_raw + actual_page_size - 1) // actual_page_size
concurrency = min(MAX_PARALLEL_FETCHES, total_pages)
all_page_data: dict[int, list[Any]] = {1: page1_events}
if total_pages > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(_fetch_page_with_index, q, page): page
for page in range(2, total_pages + 1)
}
for future in as_completed(futures):
try:
page_num, data = future.result()
if data is not None:
all_page_data[page_num] = data.get("events", [])
except Exception:
pass
all_events = []
for page_num in sorted(all_page_data.keys()):
all_events.extend(all_page_data[page_num])
_write_cache(q, {"events": all_events, "total_raw": total_raw})
match_count = 0
non_match_count = 0
filtered_events = []
total_seen = 0
for e in all_events:
is_match = is_match_market(e)
if is_match:
match_count += 1
else:
non_match_count += 1
filtered_events.append(e)
if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max:
break
if max_total is not None:
total_seen += 1
if total_seen >= max_total:
break
partial = len(all_events) < total_raw
return {"events": filtered_events, "total_raw": total_raw, "partial": partial}
# ============================================================
# FILTERS
# ============================================================
def is_match_market(e):
def is_match_market(e: dict[str, Any]) -> bool:
return (e.get("seriesSlug") and e.get("gameId")) or " vs " in e.get("title", "")
def get_event_url(e):
def get_event_url(e: dict[str, Any]) -> str:
"""Return the correct Polymarket URL for an event.
Match markets use /market/, non-match events use /event/.
"""
@@ -108,17 +314,20 @@ def get_event_url(e):
else:
return f"https://polymarket.com/event/{slug}"
def get_ml_market(e):
def get_ml_market(e: dict[str, Any]) -> dict[str, Any] | None:
for m in e.get("markets", []):
if m.get("sportsMarketType") == "moneyline":
return m
return None
def get_ml_volume(e):
def get_ml_volume(e: dict[str, Any]) -> float:
ml = get_ml_market(e)
return float(ml.get("volume", 0)) if ml else 0.0
def is_bo2_tie(e):
def is_bo2_tie(e: dict[str, Any]) -> bool:
"""
Detect if this is a BO2 that ended in a tie (1-1).
Returns True if all child_moneyline markets are closed (match is over but tied).
@@ -127,7 +336,11 @@ def is_bo2_tie(e):
if "BO2" not in title:
return False
child_markets = [m for m in e.get("markets", []) if m.get("sportsMarketType") == "child_moneyline"]
child_markets = [
m
for m in e.get("markets", [])
if m.get("sportsMarketType") == "child_moneyline"
]
if len(child_markets) != 2:
return False
@@ -135,7 +348,8 @@ def is_bo2_tie(e):
all_closed = all(m.get("closed", False) for m in child_markets)
return all_closed
def is_tradeable_event(e):
def is_tradeable_event(e: dict[str, Any]) -> bool:
ml = get_ml_market(e)
if not ml:
return False
@@ -165,7 +379,7 @@ def is_tradeable_event(e):
end_str = e.get("endDate", "")
if end_str:
try:
end_dt = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if end_dt < now:
return False
@@ -176,7 +390,7 @@ def is_tradeable_event(e):
start_str = e.get("startTime") or e.get("startDate", "")
if start_str:
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if start_dt < now:
# Check if it's recently started (within 4h) — consider those "live" still
@@ -188,7 +402,8 @@ def is_tradeable_event(e):
return True
def is_tradeable_market(m):
def is_tradeable_market(m: dict[str, Any]) -> bool:
accepting = m.get("acceptingOrders", False)
closed = m.get("closed", True)
best_ask = float(m.get("bestAsk", 0))
@@ -208,22 +423,26 @@ def is_tradeable_market(m):
return True
# ============================================================
# FORMATTING
# ============================================================
def prob_to_cents(p):
def prob_to_cents(p: float) -> int:
return int(round(p * 100))
def format_odds(p):
def format_odds(p: float) -> str:
return f"{prob_to_cents(p)}c"
def format_spread(bid, ask):
def format_spread(bid: float, ask: float) -> str:
spread = ask - bid
return f"{prob_to_cents(spread)}c"
def _get_time_data(e, tz=None):
def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
"""
Unified time data extraction for event timestamps.
@@ -237,11 +456,7 @@ def _get_time_data(e, tz=None):
Defaults to WIB (UTC+7).
Returns:
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
TimeData with time_status, time_urgency, and abs_time
"""
tz = tz or WIB
start_str = e.get("startTime") or e.get("startDate", "")
@@ -250,13 +465,13 @@ def _get_time_data(e, tz=None):
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc
total_sec = delta.total_seconds()
if total_sec < 0:
# Event is in the past
if total_sec <= 0:
# Event is in the past or happening now
hours_ago = abs(total_sec) / 3600
if hours_ago < 1:
time_status = "LIVE"
@@ -291,18 +506,24 @@ def _get_time_data(e, tz=None):
abs_time += "WIB"
else:
abs_time += start_dt.astimezone(tz).strftime("%Z")
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
return {
"time_status": time_status,
"time_urgency": time_urgency,
"abs_time": abs_time,
}
except Exception:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
def filter_events(events, tradeable_only=True):
def filter_events(
events: list[dict[str, Any]], tradeable_only: bool = True
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""
Classify events into match_markets and non_match_markets.
If tradeable_only=True, also filter out non-tradeable events.
"""
match_events = []
non_match_events = []
match_events: list[dict[str, Any]] = []
non_match_events: list[dict[str, Any]] = []
for e in events:
if is_match_market(e):
@@ -314,53 +535,79 @@ def filter_events(events, tradeable_only=True):
return match_events, non_match_events
def sort_events(events):
def sort_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(events, key=get_ml_volume, reverse=True)
# ============================================================
# BROWSE
# ============================================================
def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
result = fetch_all_pages(q)
def browse_events(
q: str,
matches_max: int = 10,
non_matches_max: int = 10,
tradeable_only: bool = True,
sort_by: str | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> BrowseResult:
"""
Browse Polymarket events.
Args:
q: search query
matches_max: max number of match markets to return
non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
max_total: max total events to fetch before early exit (None = no limit)
use_cache: whether to use cache (default True)
"""
use_early_exit = sort_by is None
fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages(
q,
matches_max=fetch_matches_max,
non_matches_max=fetch_non_matches_max,
max_total=max_total,
use_cache=use_cache,
)
events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only)
sorted_match = sort_events(match_events)
# Sort if requested; otherwise preserve API order
if sort_by == "volume":
match_events = sort_events(match_events)
non_match_events = sort_events(non_match_events)
return {
"query": q,
"total_raw": result["total_raw"],
"total_fetched": len(events),
"total_match": len(match_events),
"total_non_match": len(non_match_events),
"match_events": sorted_match[:matches_max],
"match_events": match_events[:matches_max],
"non_match_events": non_match_events[:non_matches_max],
"partial": result.get("partial", False),
}
# ============================================================
# FORMAT — EVENT
# ============================================================
def format_match_event(e):
def format_match_event(e: dict[str, Any]) -> MatchEvent:
"""
Format a match event into a canonical dict for rendering.
All computing done here; renderers just template.
Returns:
{
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
MatchEvent with all required fields
"""
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
@@ -396,20 +643,12 @@ def format_match_event(e):
}
def format_non_match_event(e):
def format_non_match_event(e: dict[str, Any]) -> NonMatchEvent:
"""
Format a non-match event into a canonical dict for rendering.
Returns:
{
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
NonMatchEvent with all required fields
"""
td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
@@ -430,7 +669,8 @@ def format_non_match_event(e):
# FORMAT — RENDER
# ============================================================
def render_match_lines(event_dict, i, mode):
def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
"""
Render a formatted match event dict into lines of text.
@@ -457,9 +697,7 @@ def render_match_lines(event_dict, i, mode):
lines = []
if mode == "html":
lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title_clean)}</a>')
else:
lines.append(f"{i}. [{title_clean}]({url})")
@@ -474,7 +712,7 @@ def render_match_lines(event_dict, i, mode):
return lines
def render_non_match_lines(event_dict, i, mode):
def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list[str]:
"""
Render a formatted non-match event dict into lines of text.
@@ -496,7 +734,7 @@ def render_non_match_lines(event_dict, i, mode):
lines = []
if mode == "html":
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title)}</a>')
else:
lines.append(f"{i}. [{title}]({url})")
@@ -510,7 +748,8 @@ def render_non_match_lines(event_dict, i, mode):
# FORMAT — LEGACY
# ============================================================
def format_event(e):
def format_event(e: dict[str, Any]) -> dict[str, Any]:
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
@@ -532,14 +771,18 @@ def format_event(e):
"volume": vol,
}
def format_detail_event(e):
def format_detail_event(e: dict[str, Any]) -> DetailEvent:
ml = get_ml_market(e)
active_markets = [
m for m in e.get("markets", [])
m
for m in e.get("markets", [])
if float(m.get("volume", 0)) > 0 and is_tradeable_market(m)
]
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
active_markets = sorted(
active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True
)
td = _get_time_data(e)
@@ -569,18 +812,21 @@ def format_detail_event(e):
],
}
# ============================================================
# DISPLAY
# ============================================================
def get_header_date():
def get_header_date() -> str:
"""Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
return now_utc7.strftime("%b %d, %Y")
def get_tournament(title):
def get_tournament(title: str) -> str:
"""Extract tournament name from event title. Title format: 'Category: Team A vs Team B (BO/X) - Tournament Name'"""
if " - " in title:
parts = title.split(" - ")
@@ -588,8 +834,23 @@ def get_tournament(title):
return " - ".join(parts[1:]).strip()
return ""
def print_browse(match_events, non_match_events, category, total_raw, total_fetched, total_match, total_non_match, raw_mode=False, partial=False, non_matches_max=5, matches_only=False, non_matches_only=False):
def print_browse(
match_events,
non_match_events,
category,
total_raw,
total_fetched,
total_match,
total_non_match,
raw_mode=False,
partial=False,
non_matches_max=5,
matches_only=False,
non_matches_only=False,
):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
@@ -599,7 +860,9 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}")
if raw_mode:
print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}")
print(
f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}"
)
if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
@@ -633,38 +896,60 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
for line in render_non_match_lines(fd, i, mode="text"):
print(line)
def print_detail(e, detail):
def print_detail(e: dict[str, Any], detail: DetailEvent) -> None:
print(f"\n{detail['title']}")
print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}")
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
spread_str = (
format_spread(detail["best_bid"], detail["best_ask"])
if detail["best_bid"] and detail["best_ask"]
else "N/A"
)
print(f"\n{detail['time_status']}")
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(
f"ML: {detail['outcomes'][0]} "
f"{format_odds(float(detail['prices'][0]))} vs "
f"{detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}"
)
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
print(f"\nMarkets ({len(detail['markets'])}):")
for m in detail["markets"]:
spread_str = format_spread(m["best_bid"], m["best_ask"]) if m["best_bid"] and m["best_ask"] else "N/A"
spread_str = (
format_spread(m["best_bid"], m["best_ask"])
if m["best_bid"] and m["best_ask"]
else "N/A"
)
print(f" [{m['type']}]")
print(f" {m['outcomes'][0]} {format_odds(float(m['prices'][0]))} vs {m['outcomes'][1]} {format_odds(float(m['prices'][1]))}")
print(
f" {m['outcomes'][0]} "
f"{format_odds(float(m['prices'][0]))} vs "
f"{m['outcomes'][1]} {format_odds(float(m['prices'][1]))}"
)
print(f" Vol: ${m['volume']:,.0f} | {spread_str}")
print(f" URL: {m['url']}")
# ============================================================
# TELEGRAM
# ============================================================
def escape_html(text):
def escape_html(text: str) -> str:
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (text
.replace("&", "&amp;")
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
.replace('"', "&quot;")
)
def send_telegram_message(bot_token, chat_id, text, timeout=10):
def send_telegram_message(
bot_token: str, chat_id: str, text: str, timeout: int = 10
) -> int:
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
@@ -672,12 +957,14 @@ def send_telegram_message(bot_token, chat_id, text, timeout=10):
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
data = urlencode(
{
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}
).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
@@ -686,15 +973,23 @@ def send_telegram_message(bot_token, chat_id, text, timeout=10):
return result["result"]["message_id"]
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
def send_to_telegram(
match_events: list[dict[str, Any]],
non_match_events: list[dict[str, Any]],
category: str,
matches_only: bool = False,
non_matches_only: bool = False,
) -> None:
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
@@ -737,7 +1032,14 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
def send_chunked(
all_lines: list[str],
send_fn: Callable[[str], None],
category: str,
header_date: str,
show_matches: bool,
show_non_matches: bool,
) -> None:
"""
Split already-built lines into Telegram-safe chunks and send them.
@@ -800,31 +1102,83 @@ def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_n
# MAIN
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Browse Polymarket tradeable events by game category.")
parser.add_argument("--category", default="Counter Strike",
choices=list(GAME_CATEGORIES.keys()),
help="Game category to browse")
parser.add_argument("--limit", type=int, default=5,
help="Max events per section (match + non-match). Default: 5")
parser.add_argument("--matches", type=int, default=None,
help="Max match markets to show. Default: --limit")
parser.add_argument("--non-matches", type=int, default=None,
help="Max non-match markets to show. Default: --limit")
parser.add_argument("--search", type=str, default=None,
help="Free-text team/term search within the selected category. Overrides default query.")
parser.add_argument("--matches-only", action="store_true",
help="Show only match markets (suppress non-match section).")
parser.add_argument("--non-matches-only", action="store_true",
help="Show only non-match markets (suppress match section).")
parser.add_argument("--list-categories", action="store_true",
help="List available game categories and exit")
parser.add_argument("--detail", type=int, default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.")
parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
def main() -> None:
parser = argparse.ArgumentParser(
description="Browse Polymarket tradeable events by game category."
)
parser.add_argument(
"--category",
default="Counter Strike",
choices=list(GAME_CATEGORIES.keys()),
help="Game category to browse",
)
parser.add_argument(
"--limit",
type=int,
default=5,
help="Max events per section (match + non-match). Default: 5",
)
parser.add_argument(
"--matches",
type=int,
default=None,
help="Max match markets to show. Default: --limit",
)
parser.add_argument(
"--non-matches",
type=int,
default=None,
help="Max non-match markets to show. Default: --limit",
)
parser.add_argument(
"--search",
type=str,
default=None,
help="Free-text team/term search within the selected category. Overrides default query.",
)
parser.add_argument(
"--matches-only",
action="store_true",
help="Show only match markets (suppress non-match section).",
)
parser.add_argument(
"--non-matches-only",
action="store_true",
help="Show only non-match markets (suppress match section).",
)
parser.add_argument(
"--list-categories",
action="store_true",
help="List available game categories and exit",
)
parser.add_argument(
"--detail",
type=int,
default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.",
)
parser.add_argument(
"--raw",
action="store_true",
help="Show all events without tradeable filter (for debugging).",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable cache and fetch fresh data from API.",
)
parser.add_argument(
"--max-total",
type=int,
default=None,
help="Max total events to fetch before early exit. Default: no limit.",
)
parser.add_argument(
"--telegram",
action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).",
)
args = parser.parse_args()
if args.list_categories:
@@ -844,7 +1198,14 @@ def main():
else:
print(f"\nFetching {args.category} events...")
result = browse_events(search_term, matches_max=matches_max, non_matches_max=non_matches_max, tradeable_only=tradeable_only)
result = browse_events(
search_term,
matches_max=matches_max,
non_matches_max=non_matches_max,
tradeable_only=tradeable_only,
max_total=args.max_total,
use_cache=not args.no_cache,
)
print_browse(
result["match_events"],
@@ -858,7 +1219,7 @@ def main():
partial=result.get("partial", False),
non_matches_max=non_matches_max,
matches_only=args.matches_only,
non_matches_only=args.non_matches_only
non_matches_only=args.non_matches_only,
)
# Print detail for selected event if any
@@ -878,8 +1239,9 @@ def main():
result["non_match_events"],
args.category,
matches_only=args.matches_only,
non_matches_only=args.non_matches_only
non_matches_only=args.non_matches_only,
)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff