Add parallel fetching, caching, and max_total parameter #26

Merged
shoko merged 6 commits from feat/parallel-fetch-cache into master 2026-03-26 18:23:20 +01:00
3 changed files with 1033 additions and 268 deletions

View File

@@ -34,7 +34,7 @@ hermes mcp add polymarket https://docs.polymarket.com/mcp
## Usage ## Usage
``` ```
polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram] polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non-matches N] [--search "TeamName"] [--matches-only] [--non-matches-only] [--detail N] [--raw] [--telegram] [--no-cache] [--max-total N]
``` ```
## Arguments ## Arguments
@@ -49,6 +49,8 @@ polymarket-browse [--category "Counter Strike"] [--limit 5] [--matches N] [--non
- `--detail` : Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable. - `--detail` : Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.
- `--list-categories` : List available game categories and exit - `--list-categories` : List available game categories and exit
- `--raw` : Show all events without tradeable filter (for debugging). Includes fetch stats. - `--raw` : Show all events without tradeable filter (for debugging). Includes fetch stats.
- `--no-cache` : Disable caching and fetch fresh data from the API.
- `--max-total` : Maximum total events to fetch before early exit. Default: no limit. Useful for quick snapshots.
- `--telegram` : Send results to Telegram. Requires `BOT_TOKEN` and `CHAT_ID` in environment variables. - `--telegram` : Send results to Telegram. Requires `BOT_TOKEN` and `CHAT_ID` in environment variables.
## Output Format ## Output Format
@@ -120,11 +122,30 @@ Use `--raw` to disable the tradeable filter and see all match markets regardless
The script fetches **ALL pages** until the API runs out of results (up to 100 pages as a safety cap). The script fetches **ALL pages** until the API runs out of results (up to 100 pages as a safety cap).
### Parallel Fetching
Pages are fetched in **parallel batches of 5** using ThreadPoolExecutor. This significantly reduces fetch time:
| Scenario | Without Parallelization | With Parallelization |
|----------|------------------------|---------------------|
| 10 pages (50 events) | ~20s (2s per page × 10) | ~4s (2s per batch × 2 batches) |
| 20 pages (100 events) | ~40s | ~8s |
The script first fetches page 1 to determine total pages, then fetches remaining pages in parallel batches of 5.
## Rate Limiting ## Rate Limiting
- Exponential backoff: 2s → 4s → 8s → 16s → 32s - Exponential backoff: 2s → 4s → 8s → 16s → 32s
- Max 5 retries before aborting - Max 5 retries before aborting
## Caching
Results are cached in `~/.cache/polymarket-browse/` with a **5-minute TTL** to reduce redundant API calls.
- Use `--no-cache` to bypass the cache and fetch fresh data
- Cached data is automatically used when available and not expired
- Useful when running the script repeatedly (e.g., for monitoring)
## Odds Format ## Odds Format
All odds are shown in **cents** format: All odds are shown in **cents** format:

View File

@@ -8,6 +8,9 @@ import html
import json import json
import time import time
import argparse import argparse
import hashlib
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Any, Callable, TypedDict from typing import Any, Callable, TypedDict
from urllib.parse import urlencode from urllib.parse import urlencode
@@ -108,6 +111,48 @@ GAME_CATEGORIES = {
"Tennis": "Tennis", "Tennis": "Tennis",
} }
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "polymarket-browse")
CACHE_TTL = 300 # 5 minutes default
MAX_PARALLEL_FETCHES = 5
# ============================================================
# CACHE
# ============================================================
def _get_cache_key(q: str) -> str:
return hashlib.md5(q.encode()).hexdigest()
def _get_cache_path(q: str) -> str:
os.makedirs(CACHE_DIR, exist_ok=True)
return os.path.join(CACHE_DIR, f"{_get_cache_key(q)}.json")
def _read_cache(q: str) -> dict[str, Any] | None:
cache_path = _get_cache_path(q)
if not os.path.exists(cache_path):
return None
try:
mtime = os.path.getmtime(cache_path)
age = time.time() - mtime
if age > CACHE_TTL:
return None
with open(cache_path) as f:
return json.load(f)
except Exception:
return None
def _write_cache(q: str, data: dict[str, Any]) -> None:
try:
cache_path = _get_cache_path(q)
with open(cache_path, "w") as f:
json.dump(data, f)
except Exception:
pass
# ============================================================ # ============================================================
# FETCH # FETCH
# ============================================================ # ============================================================
@@ -142,8 +187,16 @@ def fetch_page(
return None return None
def _fetch_page_with_index(q: str, page: int) -> tuple[int, dict[str, Any] | None]:
return page, fetch_page(q, page)
def fetch_all_pages( def fetch_all_pages(
q: str, matches_max: int | None = None, non_matches_max: int | None = None q: str,
matches_max: int | None = None,
non_matches_max: int | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> FetchResult: ) -> FetchResult:
""" """
Fetch pages until pagination ends, or until quotas are satisfied. Fetch pages until pagination ends, or until quotas are satisfied.
@@ -152,46 +205,94 @@ def fetch_all_pages(
q: search query q: search query
matches_max: stop early once we have this many match events (None = no limit) matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit) non_matches_max: stop early once we have this many non-match events (None = no limit)
max_total: stop early once we have this many total events (None = no limit)
use_cache: whether to use cache (default True)
Returns: Returns:
FetchResult with events, total_raw, and partial flag FetchResult with events, total_raw, and partial flag
""" """
all_events = [] cached = _read_cache(q) if use_cache else None
if cached is not None:
events = cached.get("events", [])
total_raw = cached.get("total_raw", 0)
if events:
return {"events": events, "total_raw": total_raw, "partial": False}
total_raw = 0 total_raw = 0
match_count = 0 page_count = 0
non_match_count = 0 page1_data = None
page = 0
while True: while True:
page += 1 page_count += 1
time.sleep(0.2) data = fetch_page(q, page_count)
data = fetch_page(q, page)
if data is None: if data is None:
break break
events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0) total_raw = data.get("pagination", {}).get("totalResults", 0)
all_events.extend(events) if page_count == 1:
page1_data = data
if total_raw > 0:
break
if not data.get("events"):
break
# Count matches/non-matches in this page if total_raw == 0 or page1_data is None:
for e in events: return {"events": [], "total_raw": 0, "partial": False}
if is_match_market(e):
page1_events = page1_data.get("events", [])
actual_page_size = len(page1_events)
# Use actual events per page from API for ceiling division
# ceil(total_raw / actual_page_size) = (total_raw + actual_page_size - 1) // actual_page_size
total_pages = (total_raw + actual_page_size - 1) // actual_page_size
concurrency = min(MAX_PARALLEL_FETCHES, total_pages)
all_page_data: dict[int, list[Any]] = {1: page1_events}
if total_pages > 1:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(_fetch_page_with_index, q, page): page
for page in range(2, total_pages + 1)
}
for future in as_completed(futures):
try:
page_num, data = future.result()
if data is not None:
all_page_data[page_num] = data.get("events", [])
except Exception:
pass
all_events = []
for page_num in sorted(all_page_data.keys()):
all_events.extend(all_page_data[page_num])
_write_cache(q, {"events": all_events, "total_raw": total_raw})
match_count = 0
non_match_count = 0
filtered_events = []
total_seen = 0
for e in all_events:
is_match = is_match_market(e)
if is_match:
match_count += 1 match_count += 1
else: else:
non_match_count += 1 non_match_count += 1
# Stop if we got what we wanted (only when caps are set) filtered_events.append(e)
if matches_max is not None and non_matches_max is not None: if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max: if match_count >= matches_max and non_match_count >= non_matches_max:
break break
# Stop when we get 0 events (no more pages) if max_total is not None:
if len(events) == 0: total_seen += 1
break if total_seen >= max_total:
# Stop when we've fetched all known results
if len(all_events) >= total_raw:
break break
partial = total_raw > 0 and len(all_events) < total_raw partial = len(all_events) < total_raw
return {"events": all_events, "total_raw": total_raw, "partial": partial} return {"events": filtered_events, "total_raw": total_raw, "partial": partial}
# ============================================================ # ============================================================
@@ -449,6 +550,8 @@ def browse_events(
non_matches_max: int = 10, non_matches_max: int = 10,
tradeable_only: bool = True, tradeable_only: bool = True,
sort_by: str | None = None, sort_by: str | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> BrowseResult: ) -> BrowseResult:
""" """
Browse Polymarket events. Browse Polymarket events.
@@ -459,15 +562,19 @@ def browse_events(
non_matches_max: max number of non-match markets to return non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc) sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
max_total: max total events to fetch before early exit (None = no limit)
use_cache: whether to use cache (default True)
""" """
# Pass quotas to fetch_all_pages for early-exit optimization.
# Only use early-exit when sort_by is None (no client-side sort needed).
use_early_exit = sort_by is None use_early_exit = sort_by is None
fetch_matches_max = matches_max if use_early_exit else None fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages( result = fetch_all_pages(
q, matches_max=fetch_matches_max, non_matches_max=fetch_non_matches_max q,
matches_max=fetch_matches_max,
non_matches_max=fetch_non_matches_max,
max_total=max_total,
use_cache=use_cache,
) )
events = result["events"] events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only) match_events, non_match_events = filter_events(events, tradeable_only)
@@ -1056,6 +1163,17 @@ def main() -> None:
action="store_true", action="store_true",
help="Show all events without tradeable filter (for debugging).", help="Show all events without tradeable filter (for debugging).",
) )
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable cache and fetch fresh data from API.",
)
parser.add_argument(
"--max-total",
type=int,
default=None,
help="Max total events to fetch before early exit. Default: no limit.",
)
parser.add_argument( parser.add_argument(
"--telegram", "--telegram",
action="store_true", action="store_true",
@@ -1085,6 +1203,8 @@ def main() -> None:
matches_max=matches_max, matches_max=matches_max,
non_matches_max=non_matches_max, non_matches_max=non_matches_max,
tradeable_only=tradeable_only, tradeable_only=tradeable_only,
max_total=args.max_total,
use_cache=not args.no_cache,
) )
print_browse( print_browse(

File diff suppressed because it is too large Load Diff