Add parallel fetching, caching, and max_total parameter

- Parallel page fetching with ThreadPoolExecutor (concurrency=5)
- File-based cache with 5 min TTL in ~/.cache/polymarket-browse/
- New --no-cache flag to bypass cache
- New --max-total parameter for early exit
- Updated tests to work with new implementation
This commit is contained in:
shoko
2026-03-26 16:29:25 +00:00
parent 0a7911653b
commit eafbdba4a5
2 changed files with 638 additions and 266 deletions

View File

@@ -8,6 +8,9 @@ import html
import json import json
import time import time
import argparse import argparse
import hashlib
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Any, Callable, TypedDict from typing import Any, Callable, TypedDict
from urllib.parse import urlencode from urllib.parse import urlencode
@@ -108,6 +111,48 @@ GAME_CATEGORIES = {
"Tennis": "Tennis", "Tennis": "Tennis",
} }
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "polymarket-browse")
CACHE_TTL = 300 # 5 minutes default
MAX_PARALLEL_FETCHES = 5
# ============================================================
# CACHE
# ============================================================
def _get_cache_key(q: str) -> str:
return hashlib.md5(q.encode()).hexdigest()
def _get_cache_path(q: str) -> str:
os.makedirs(CACHE_DIR, exist_ok=True)
return os.path.join(CACHE_DIR, f"{_get_cache_key(q)}.json")
def _read_cache(q: str) -> dict[str, Any] | None:
cache_path = _get_cache_path(q)
if not os.path.exists(cache_path):
return None
try:
mtime = os.path.getmtime(cache_path)
age = time.time() - mtime
if age > CACHE_TTL:
return None
with open(cache_path) as f:
return json.load(f)
except Exception:
return None
def _write_cache(q: str, data: dict[str, Any]) -> None:
try:
cache_path = _get_cache_path(q)
with open(cache_path, "w") as f:
json.dump(data, f)
except Exception:
pass
# ============================================================ # ============================================================
# FETCH # FETCH
# ============================================================ # ============================================================
@@ -142,8 +187,16 @@ def fetch_page(
return None return None
def _fetch_page_with_index(q: str, page: int) -> tuple[int, dict[str, Any] | None]:
return page, fetch_page(q, page)
def fetch_all_pages( def fetch_all_pages(
q: str, matches_max: int | None = None, non_matches_max: int | None = None q: str,
matches_max: int | None = None,
non_matches_max: int | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> FetchResult: ) -> FetchResult:
""" """
Fetch pages until pagination ends, or until quotas are satisfied. Fetch pages until pagination ends, or until quotas are satisfied.
@@ -152,46 +205,85 @@ def fetch_all_pages(
q: search query q: search query
matches_max: stop early once we have this many match events (None = no limit) matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit) non_matches_max: stop early once we have this many non-match events (None = no limit)
max_total: stop early once we have this many total events (None = no limit)
use_cache: whether to use cache (default True)
Returns: Returns:
FetchResult with events, total_raw, and partial flag FetchResult with events, total_raw, and partial flag
""" """
all_events = [] cached = _read_cache(q) if use_cache else None
if cached is not None:
events = cached.get("events", [])
total_raw = cached.get("total_raw", 0)
if events:
return {"events": events, "total_raw": total_raw, "partial": False}
total_raw = 0 total_raw = 0
match_count = 0 page_count = 0
non_match_count = 0
page = 0
while True: while True:
page += 1 page_count += 1
time.sleep(0.2) data = fetch_page(q, page_count)
data = fetch_page(q, page)
if data is None: if data is None:
break break
events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0) total_raw = data.get("pagination", {}).get("totalResults", 0)
all_events.extend(events) if total_raw > 0:
break
if not data.get("events"):
break
# Count matches/non-matches in this page if total_raw == 0:
for e in events: return {"events": [], "total_raw": 0, "partial": False}
if is_match_market(e):
match_count += 1 total_pages = (total_raw + PAGE_SIZE - 1) // PAGE_SIZE
else: concurrency = min(MAX_PARALLEL_FETCHES, total_pages)
non_match_count += 1
all_page_data: dict[int, list[Any]] = {}
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(_fetch_page_with_index, q, page): page
for page in range(1, total_pages + 1)
}
for future in as_completed(futures):
try:
page_num, data = future.result()
if data is not None:
all_page_data[page_num] = data.get("events", [])
except Exception:
pass
all_events = []
for page_num in sorted(all_page_data.keys()):
all_events.extend(all_page_data[page_num])
_write_cache(q, {"events": all_events, "total_raw": total_raw})
match_count = 0
non_match_count = 0
filtered_events = []
total_seen = 0
for e in all_events:
is_match = is_match_market(e)
if is_match:
match_count += 1
else:
non_match_count += 1
filtered_events.append(e)
# Stop if we got what we wanted (only when caps are set)
if matches_max is not None and non_matches_max is not None: if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max: if match_count >= matches_max and non_match_count >= non_matches_max:
break break
# Stop when we get 0 events (no more pages) if max_total is not None:
if len(events) == 0: total_seen += 1
break if total_seen >= max_total:
# Stop when we've fetched all known results break
if len(all_events) >= total_raw:
break
partial = total_raw > 0 and len(all_events) < total_raw partial = len(all_events) < total_raw
return {"events": all_events, "total_raw": total_raw, "partial": partial} return {"events": filtered_events, "total_raw": total_raw, "partial": partial}
# ============================================================ # ============================================================
@@ -449,6 +541,8 @@ def browse_events(
non_matches_max: int = 10, non_matches_max: int = 10,
tradeable_only: bool = True, tradeable_only: bool = True,
sort_by: str | None = None, sort_by: str | None = None,
max_total: int | None = None,
use_cache: bool = True,
) -> BrowseResult: ) -> BrowseResult:
""" """
Browse Polymarket events. Browse Polymarket events.
@@ -459,15 +553,19 @@ def browse_events(
non_matches_max: max number of non-match markets to return non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc) sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
max_total: max total events to fetch before early exit (None = no limit)
use_cache: whether to use cache (default True)
""" """
# Pass quotas to fetch_all_pages for early-exit optimization.
# Only use early-exit when sort_by is None (no client-side sort needed).
use_early_exit = sort_by is None use_early_exit = sort_by is None
fetch_matches_max = matches_max if use_early_exit else None fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages( result = fetch_all_pages(
q, matches_max=fetch_matches_max, non_matches_max=fetch_non_matches_max q,
matches_max=fetch_matches_max,
non_matches_max=fetch_non_matches_max,
max_total=max_total,
use_cache=use_cache,
) )
events = result["events"] events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only) match_events, non_match_events = filter_events(events, tradeable_only)
@@ -1056,6 +1154,17 @@ def main() -> None:
action="store_true", action="store_true",
help="Show all events without tradeable filter (for debugging).", help="Show all events without tradeable filter (for debugging).",
) )
parser.add_argument(
"--no-cache",
action="store_true",
help="Disable cache and fetch fresh data from API.",
)
parser.add_argument(
"--max-total",
type=int,
default=None,
help="Max total events to fetch before early exit. Default: no limit.",
)
parser.add_argument( parser.add_argument(
"--telegram", "--telegram",
action="store_true", action="store_true",
@@ -1085,6 +1194,8 @@ def main() -> None:
matches_max=matches_max, matches_max=matches_max,
non_matches_max=non_matches_max, non_matches_max=non_matches_max,
tradeable_only=tradeable_only, tradeable_only=tradeable_only,
max_total=args.max_total,
use_cache=not args.no_cache,
) )
print_browse( print_browse(

File diff suppressed because it is too large Load Diff