4 Commits

Author SHA1 Message Date
shoko
ce526b1aa3 Add type hints to browse.py
- Added TypedDict classes for typed event/market structures
- Added type annotations to all functions
- Used Python 3.10+ union syntax (str | None, dict[str, Any])
- All 62 tests pass
2026-03-26 15:35:18 +00:00
ae50fd14f0 Merge pull request 'Fix #14: Refactor print_browse/send_to_telegram into single pipeline' (#22) from fix/issue-14-refactor-browse into master 2026-03-25 20:11:07 +01:00
shoko
c348d6daa1 tests: Add unit tests for browse_events, fetch_all_pages, filter_events, is_match_market, get_ml_market, get_ml_volume, sort_events
New test classes:
- TestIsMatchMarket: 5 tests for is_match_market() classification
- TestGetMlMarket: 5 tests for get_ml_market() and get_ml_volume()
- TestFilterEvents: 5 tests for filter_events() and sort_events()
- TestFetchAllPages: 4 tests for fetch_all_pages() early-exit logic
- TestBrowseEvents: 5 tests for browse_events() sort_by parameter

Total: 24 new tests (62 total, all passing)
2026-03-25 19:08:36 +00:00
shoko
764c75e712 Fix: Switch fetch_page from subprocess to urllib, add early-exit to fetch_all_pages, add sort_by to browse_events
- fetch_page: replace subprocess.run(curl) with urllib (stdlib, cleaner)
- fetch_all_pages: add matches_max/non_matches_max params for early-exit.
  When both are set, stop fetching once quotas are satisfied.
- browse_events: add sort_by param (None='fast' early-exit, 'volume'=full fetch+sort).
  Early-exit only used when sort_by=None (no client-side sort needed).
- Remove subprocess import (no longer needed after migration)
2026-03-25 18:53:11 +00:00
2 changed files with 830 additions and 159 deletions

View File

@@ -9,9 +9,84 @@ import json
import time
import argparse
from datetime import datetime, timezone, timedelta
from typing import Any, Callable, TypedDict
from urllib.parse import urlencode
from urllib.request import urlopen, Request
class TimeData(TypedDict):
time_status: str
time_urgency: int
abs_time: str
class MatchEvent(TypedDict):
title: str
title_clean: str
tournament: str
url: str
time_status: str
time_urgency: int
abs_time: str
team_a: str
team_b: str
odds_a: str
odds_b: str
vol: float
class NonMatchEvent(TypedDict):
title: str
url: str
time_status: str
time_urgency: int
abs_time: str
market_count: int
total_vol: int
class Market(TypedDict):
type: str
question: str
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
url: str
class DetailEvent(TypedDict):
title: str
time_status: str
abs_time: str
url: str
livestream: str | None
outcomes: list[str]
prices: list[str]
best_bid: float
best_ask: float
volume: float
markets: list[Market]
class BrowseResult(TypedDict):
query: str
total_raw: int
total_fetched: int
total_match: int
total_non_match: int
match_events: list[Any]
non_match_events: list[Any]
partial: bool
class FetchResult(TypedDict):
events: list[Any]
total_raw: int
partial: bool
# ============================================================
# CONFIG
# ============================================================
@@ -37,68 +112,98 @@ GAME_CATEGORIES = {
# FETCH
# ============================================================
def fetch_page(q, page=1, max_retries=MAX_RETRIES, initial_delay=INITIAL_RETRY_DELAY):
def fetch_page(
q: str,
page: int = 1,
max_retries: int = MAX_RETRIES,
initial_delay: float = INITIAL_RETRY_DELAY,
) -> dict[str, Any] | None:
base = "https://gamma-api.polymarket.com/public-search"
url = (f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false")
url = (
f"{base}?q={q.replace(' ', '%20')}&limit={PAGE_SIZE}&page={page}"
f"&search_profiles=false&search_tags=false"
f"&keep_closed_markets=0&events_status=active&cache=false"
)
delay = initial_delay
for attempt in range(max_retries):
time.sleep(delay)
r = subprocess.run(
["curl", "-s", url, "--max-time", "10", "-H", "User-Agent: curl/7.88.1"],
capture_output=True
)
if r.returncode == 0 and len(r.stdout) > 0:
try:
return json.loads(r.stdout.decode('utf-8'))
except json.JSONDecodeError:
if attempt < max_retries - 1:
delay *= 2 # Exponential backoff
continue
return None
else:
# Rate limit or other error - exponential backoff
if attempt > 0:
time.sleep(delay)
try:
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(req, timeout=10) as r:
return json.loads(r.read())
except Exception:
if attempt < max_retries - 1:
delay *= 2
continue
return None
return None
def fetch_all_pages(q, max_pages=100):
def fetch_all_pages(
q: str, matches_max: int | None = None, non_matches_max: int | None = None
) -> FetchResult:
"""
Fetch ALL pages until pagination ends.
max_pages is a safety cap to prevent infinite loops.
Fetch pages until pagination ends, or until quotas are satisfied.
Args:
q: search query
matches_max: stop early once we have this many match events (None = no limit)
non_matches_max: stop early once we have this many non-match events (None = no limit)
Returns:
FetchResult with events, total_raw, and partial flag
"""
all_events = []
total_raw = 0
for page in range(1, max_pages + 1):
time.sleep(0.2) # small delay between pages (API rate limit is generous)
match_count = 0
non_match_count = 0
page = 0
while True:
page += 1
time.sleep(0.2)
data = fetch_page(q, page)
if data is None:
break
events = data.get("events", [])
total_raw = data.get("pagination", {}).get("totalResults", 0)
all_events.extend(events)
# Stop when we get 0 events (no more pages),
# OR when we've fetched >= total results
# Count matches/non-matches in this page
for e in events:
if is_match_market(e):
match_count += 1
else:
non_match_count += 1
# Stop if we got what we wanted (only when caps are set)
if matches_max is not None and non_matches_max is not None:
if match_count >= matches_max and non_match_count >= non_matches_max:
break
# Stop when we get 0 events (no more pages)
if len(events) == 0:
break
# Stop when we've fetched all known results
if len(all_events) >= total_raw:
break
partial = (total_raw > 0 and len(all_events) < total_raw)
partial = total_raw > 0 and len(all_events) < total_raw
return {"events": all_events, "total_raw": total_raw, "partial": partial}
# ============================================================
# FILTERS
# ============================================================
def is_match_market(e):
def is_match_market(e: dict[str, Any]) -> bool:
return (e.get("seriesSlug") and e.get("gameId")) or " vs " in e.get("title", "")
def get_event_url(e):
def get_event_url(e: dict[str, Any]) -> str:
"""Return the correct Polymarket URL for an event.
Match markets use /market/, non-match events use /event/.
"""
@@ -108,17 +213,20 @@ def get_event_url(e):
else:
return f"https://polymarket.com/event/{slug}"
def get_ml_market(e):
def get_ml_market(e: dict[str, Any]) -> dict[str, Any] | None:
for m in e.get("markets", []):
if m.get("sportsMarketType") == "moneyline":
return m
return None
def get_ml_volume(e):
def get_ml_volume(e: dict[str, Any]) -> float:
ml = get_ml_market(e)
return float(ml.get("volume", 0)) if ml else 0.0
def is_bo2_tie(e):
def is_bo2_tie(e: dict[str, Any]) -> bool:
"""
Detect if this is a BO2 that ended in a tie (1-1).
Returns True if all child_moneyline markets are closed (match is over but tied).
@@ -126,16 +234,21 @@ def is_bo2_tie(e):
title = e.get("title", "")
if "BO2" not in title:
return False
child_markets = [m for m in e.get("markets", []) if m.get("sportsMarketType") == "child_moneyline"]
child_markets = [
m
for m in e.get("markets", [])
if m.get("sportsMarketType") == "child_moneyline"
]
if len(child_markets) != 2:
return False
# If both child markets are closed, it's likely a finished BO2
all_closed = all(m.get("closed", False) for m in child_markets)
return all_closed
def is_tradeable_event(e):
def is_tradeable_event(e: dict[str, Any]) -> bool:
ml = get_ml_market(e)
if not ml:
return False
@@ -144,7 +257,7 @@ def is_tradeable_event(e):
ml_vol = float(ml.get("volume", 0))
accepting = ml.get("acceptingOrders", False)
ml_closed = ml.get("closed", True)
# Filter: market has converged (bestBid >= 0.99 means near-resolved)
if best_bid >= 0.99:
return False
@@ -165,18 +278,18 @@ def is_tradeable_event(e):
end_str = e.get("endDate", "")
if end_str:
try:
end_dt = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if end_dt < now:
return False
except:
pass
# Filter: match has already started (startTime is in the past)
start_str = e.get("startTime") or e.get("startDate", "")
if start_str:
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
if start_dt < now:
# Check if it's recently started (within 4h) — consider those "live" still
@@ -185,15 +298,16 @@ def is_tradeable_event(e):
return False
except:
pass
return True
def is_tradeable_market(m):
def is_tradeable_market(m: dict[str, Any]) -> bool:
accepting = m.get("acceptingOrders", False)
closed = m.get("closed", True)
best_ask = float(m.get("bestAsk", 0))
best_bid = float(m.get("bestBid", 0))
# Filter: market has converged (bestBid >= 0.99)
if best_bid >= 0.99:
return False
@@ -205,25 +319,29 @@ def is_tradeable_market(m):
return False
if closed:
return False
return True
# ============================================================
# FORMATTING
# ============================================================
def prob_to_cents(p):
def prob_to_cents(p: float) -> int:
return int(round(p * 100))
def format_odds(p):
def format_odds(p: float) -> str:
return f"{prob_to_cents(p)}c"
def format_spread(bid, ask):
def format_spread(bid: float, ask: float) -> str:
spread = ask - bid
return f"{prob_to_cents(spread)}c"
def _get_time_data(e, tz=None):
def _get_time_data(e: dict[str, Any], tz: timezone | None = None) -> TimeData:
"""
Unified time data extraction for event timestamps.
@@ -237,11 +355,7 @@ def _get_time_data(e, tz=None):
Defaults to WIB (UTC+7).
Returns:
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
TimeData with time_status, time_urgency, and abs_time
"""
tz = tz or WIB
start_str = e.get("startTime") or e.get("startDate", "")
@@ -250,7 +364,7 @@ def _get_time_data(e, tz=None):
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
start_dt = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc
total_sec = delta.total_seconds()
@@ -291,18 +405,24 @@ def _get_time_data(e, tz=None):
abs_time += "WIB"
else:
abs_time += start_dt.astimezone(tz).strftime("%Z")
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
return {
"time_status": time_status,
"time_urgency": time_urgency,
"abs_time": abs_time,
}
except Exception:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
def filter_events(events, tradeable_only=True):
def filter_events(
events: list[dict[str, Any]], tradeable_only: bool = True
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""
Classify events into match_markets and non_match_markets.
If tradeable_only=True, also filter out non-tradeable events.
"""
match_events = []
non_match_events = []
match_events: list[dict[str, Any]] = []
non_match_events: list[dict[str, Any]] = []
for e in events:
if is_match_market(e):
@@ -314,53 +434,73 @@ def filter_events(events, tradeable_only=True):
return match_events, non_match_events
def sort_events(events):
def sort_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(events, key=get_ml_volume, reverse=True)
# ============================================================
# BROWSE
# ============================================================
def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
result = fetch_all_pages(q)
def browse_events(
q: str,
matches_max: int = 10,
non_matches_max: int = 10,
tradeable_only: bool = True,
sort_by: str | None = None,
) -> BrowseResult:
"""
Browse Polymarket events.
Args:
q: search query
matches_max: max number of match markets to return
non_matches_max: max number of non-match markets to return
tradeable_only: filter to tradeable events only
sort_by: None (fast, API order) or "volume" (full fetch, sort by volume desc)
"""
# Pass quotas to fetch_all_pages for early-exit optimization.
# Only use early-exit when sort_by is None (no client-side sort needed).
use_early_exit = sort_by is None
fetch_matches_max = matches_max if use_early_exit else None
fetch_non_matches_max = non_matches_max if use_early_exit else None
result = fetch_all_pages(
q, matches_max=fetch_matches_max, non_matches_max=fetch_non_matches_max
)
events = result["events"]
match_events, non_match_events = filter_events(events, tradeable_only)
sorted_match = sort_events(match_events)
# Sort if requested; otherwise preserve API order
if sort_by == "volume":
match_events = sort_events(match_events)
non_match_events = sort_events(non_match_events)
return {
"query": q,
"total_raw": result["total_raw"],
"total_fetched": len(events),
"total_match": len(match_events),
"total_non_match": len(non_match_events),
"match_events": sorted_match[:matches_max],
"match_events": match_events[:matches_max],
"non_match_events": non_match_events[:non_matches_max],
"partial": result.get("partial", False),
}
# ============================================================
# FORMAT — EVENT
# ============================================================
def format_match_event(e):
def format_match_event(e: dict[str, Any]) -> MatchEvent:
"""
Format a match event into a canonical dict for rendering.
All computing done here; renderers just template.
Returns:
{
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
MatchEvent with all required fields
"""
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
@@ -396,20 +536,12 @@ def format_match_event(e):
}
def format_non_match_event(e):
def format_non_match_event(e: dict[str, Any]) -> NonMatchEvent:
"""
Format a non-match event into a canonical dict for rendering.
Returns:
{
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
NonMatchEvent with all required fields
"""
td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
@@ -430,7 +562,8 @@ def format_non_match_event(e):
# FORMAT — RENDER
# ============================================================
def render_match_lines(event_dict, i, mode):
def render_match_lines(event_dict: MatchEvent, i: int, mode: str) -> list[str]:
"""
Render a formatted match event dict into lines of text.
@@ -457,9 +590,7 @@ def render_match_lines(event_dict, i, mode):
lines = []
if mode == "html":
lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title_clean)}</a>')
else:
lines.append(f"{i}. [{title_clean}]({url})")
@@ -474,7 +605,7 @@ def render_match_lines(event_dict, i, mode):
return lines
def render_non_match_lines(event_dict, i, mode):
def render_non_match_lines(event_dict: NonMatchEvent, i: int, mode: str) -> list[str]:
"""
Render a formatted non-match event dict into lines of text.
@@ -496,7 +627,7 @@ def render_non_match_lines(event_dict, i, mode):
lines = []
if mode == "html":
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
lines.append(f'<b>{i}.</b> <a href="{url}">{escape_html(title)}</a>')
else:
lines.append(f"{i}. [{title}]({url})")
@@ -510,7 +641,8 @@ def render_non_match_lines(event_dict, i, mode):
# FORMAT — LEGACY
# ============================================================
def format_event(e):
def format_event(e: dict[str, Any]) -> dict[str, Any]:
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
@@ -532,14 +664,18 @@ def format_event(e):
"volume": vol,
}
def format_detail_event(e):
def format_detail_event(e: dict[str, Any]) -> DetailEvent:
ml = get_ml_market(e)
active_markets = [
m for m in e.get("markets", [])
m
for m in e.get("markets", [])
if float(m.get("volume", 0)) > 0 and is_tradeable_market(m)
]
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
active_markets = sorted(
active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True
)
td = _get_time_data(e)
@@ -569,18 +705,21 @@ def format_detail_event(e):
],
}
# ============================================================
# DISPLAY
# ============================================================
def get_header_date():
def get_header_date() -> str:
"""Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
return now_utc7.strftime("%b %d, %Y")
def get_tournament(title):
def get_tournament(title: str) -> str:
"""Extract tournament name from event title. Title format: 'Category: Team A vs Team B (BO/X) - Tournament Name'"""
if " - " in title:
parts = title.split(" - ")
@@ -588,8 +727,23 @@ def get_tournament(title):
return " - ".join(parts[1:]).strip()
return ""
def print_browse(match_events, non_match_events, category, total_raw, total_fetched, total_match, total_non_match, raw_mode=False, partial=False, non_matches_max=5, matches_only=False, non_matches_only=False):
def print_browse(
match_events,
non_match_events,
category,
total_raw,
total_fetched,
total_match,
total_non_match,
raw_mode=False,
partial=False,
non_matches_max=5,
matches_only=False,
non_matches_only=False,
):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
@@ -599,7 +753,9 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}")
if raw_mode:
print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}")
print(
f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}"
)
if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
@@ -633,38 +789,56 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
for line in render_non_match_lines(fd, i, mode="text"):
print(line)
def print_detail(e, detail):
def print_detail(e: dict[str, Any], detail: DetailEvent) -> None:
print(f"\n{detail['title']}")
print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}")
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
spread_str = (
format_spread(detail["best_bid"], detail["best_ask"])
if detail["best_bid"] and detail["best_ask"]
else "N/A"
)
print(f"\n{detail['time_status']}")
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(
f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}"
)
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
print(f"\nMarkets ({len(detail['markets'])}):")
for m in detail["markets"]:
spread_str = format_spread(m["best_bid"], m["best_ask"]) if m["best_bid"] and m["best_ask"] else "N/A"
spread_str = (
format_spread(m["best_bid"], m["best_ask"])
if m["best_bid"] and m["best_ask"]
else "N/A"
)
print(f" [{m['type']}]")
print(f" {m['outcomes'][0]} {format_odds(float(m['prices'][0]))} vs {m['outcomes'][1]} {format_odds(float(m['prices'][1]))}")
print(
f" {m['outcomes'][0]} {format_odds(float(m['prices'][0]))} vs {m['outcomes'][1]} {format_odds(float(m['prices'][1]))}"
)
print(f" Vol: ${m['volume']:,.0f} | {spread_str}")
print(f" URL: {m['url']}")
# ============================================================
# TELEGRAM
# ============================================================
def escape_html(text):
def escape_html(text: str) -> str:
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (text
.replace("&", "&amp;")
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
.replace('"', "&quot;")
)
def send_telegram_message(bot_token, chat_id, text, timeout=10):
def send_telegram_message(
bot_token: str, chat_id: str, text: str, timeout: int = 10
) -> int:
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
@@ -672,12 +846,14 @@ def send_telegram_message(bot_token, chat_id, text, timeout=10):
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
data = urlencode(
{
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}
).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
@@ -686,15 +862,23 @@ def send_telegram_message(bot_token, chat_id, text, timeout=10):
return result["result"]["message_id"]
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
def send_to_telegram(
match_events: list[dict[str, Any]],
non_match_events: list[dict[str, Any]],
category: str,
matches_only: bool = False,
non_matches_only: bool = False,
) -> None:
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
import os
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id:
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
@@ -737,7 +921,14 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
def send_chunked(
all_lines: list[str],
send_fn: Callable[[str], None],
category: str,
header_date: str,
show_matches: bool,
show_non_matches: bool,
) -> None:
"""
Split already-built lines into Telegram-safe chunks and send them.
@@ -800,52 +991,98 @@ def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_n
# MAIN
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Browse Polymarket tradeable events by game category.")
parser.add_argument("--category", default="Counter Strike",
choices=list(GAME_CATEGORIES.keys()),
help="Game category to browse")
parser.add_argument("--limit", type=int, default=5,
help="Max events per section (match + non-match). Default: 5")
parser.add_argument("--matches", type=int, default=None,
help="Max match markets to show. Default: --limit")
parser.add_argument("--non-matches", type=int, default=None,
help="Max non-match markets to show. Default: --limit")
parser.add_argument("--search", type=str, default=None,
help="Free-text team/term search within the selected category. Overrides default query.")
parser.add_argument("--matches-only", action="store_true",
help="Show only match markets (suppress non-match section).")
parser.add_argument("--non-matches-only", action="store_true",
help="Show only non-match markets (suppress match section).")
parser.add_argument("--list-categories", action="store_true",
help="List available game categories and exit")
parser.add_argument("--detail", type=int, default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.")
parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
def main() -> None:
parser = argparse.ArgumentParser(
description="Browse Polymarket tradeable events by game category."
)
parser.add_argument(
"--category",
default="Counter Strike",
choices=list(GAME_CATEGORIES.keys()),
help="Game category to browse",
)
parser.add_argument(
"--limit",
type=int,
default=5,
help="Max events per section (match + non-match). Default: 5",
)
parser.add_argument(
"--matches",
type=int,
default=None,
help="Max match markets to show. Default: --limit",
)
parser.add_argument(
"--non-matches",
type=int,
default=None,
help="Max non-match markets to show. Default: --limit",
)
parser.add_argument(
"--search",
type=str,
default=None,
help="Free-text team/term search within the selected category. Overrides default query.",
)
parser.add_argument(
"--matches-only",
action="store_true",
help="Show only match markets (suppress non-match section).",
)
parser.add_argument(
"--non-matches-only",
action="store_true",
help="Show only non-match markets (suppress match section).",
)
parser.add_argument(
"--list-categories",
action="store_true",
help="List available game categories and exit",
)
parser.add_argument(
"--detail",
type=int,
default=1,
help="Index of match event (1-indexed) to show detailed markets. Default: 1. Set to 0 to disable.",
)
parser.add_argument(
"--raw",
action="store_true",
help="Show all events without tradeable filter (for debugging).",
)
parser.add_argument(
"--telegram",
action="store_true",
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).",
)
args = parser.parse_args()
if args.list_categories:
print("Available categories:")
for name in GAME_CATEGORIES:
print(f" - {name}")
return
category_term = GAME_CATEGORIES[args.category]
search_term = f"{category_term} {args.search}" if args.search else category_term
tradeable_only = not args.raw
matches_max = args.matches if args.matches is not None else args.limit
non_matches_max = args.non_matches if args.non_matches is not None else args.limit
if args.search:
print(f"\nFetching {args.category} events matching '{args.search}'...")
else:
print(f"\nFetching {args.category} events...")
result = browse_events(search_term, matches_max=matches_max, non_matches_max=non_matches_max, tradeable_only=tradeable_only)
result = browse_events(
search_term,
matches_max=matches_max,
non_matches_max=non_matches_max,
tradeable_only=tradeable_only,
)
print_browse(
result["match_events"],
result["non_match_events"],
@@ -858,9 +1095,9 @@ def main():
partial=result.get("partial", False),
non_matches_max=non_matches_max,
matches_only=args.matches_only,
non_matches_only=args.non_matches_only
non_matches_only=args.non_matches_only,
)
# Print detail for selected event if any
if result["match_events"] and args.detail > 0:
print("\n")
@@ -870,7 +1107,7 @@ def main():
detail_event = result["match_events"][idx]
detail = format_detail_event(detail_event)
print_detail(detail_event, detail)
# Send to Telegram if requested
if args.telegram:
send_to_telegram(
@@ -878,8 +1115,9 @@ def main():
result["non_match_events"],
args.category,
matches_only=args.matches_only,
non_matches_only=args.non_matches_only
non_matches_only=args.non_matches_only,
)
if __name__ == "__main__":
main()

View File

@@ -772,5 +772,438 @@ class TestSendChunked(unittest.TestCase):
self.assertTrue(cont_found, f"Expected at least one '(cont.)' message. Got {len(sent_texts)} messages.")
class TestIsMatchMarket(unittest.TestCase):
"""Tests for is_match_market() classification."""
def test_match_when_series_and_gameid(self):
"""seriesSlug + gameId present -> match market."""
from browse import is_match_market
e = {"seriesSlug": "esl-pro-league", "gameId": "12345", "title": "Tournament Winner"}
self.assertTrue(is_match_market(e))
def test_match_when_vs_in_title(self):
"""' vs ' in title -> match market."""
from browse import is_match_market
e = {"title": "Team A vs Team B - Final"}
self.assertTrue(is_match_market(e))
def test_non_match_without_series_and_gameid(self):
"""No seriesSlug/gameId and no ' vs ' -> non-match."""
from browse import is_match_market
e = {"title": "Will Team A win the tournament?"}
self.assertFalse(is_match_market(e))
def test_non_match_seriesSlug_only(self):
"""Only seriesSlug (no gameId) -> non-match."""
from browse import is_match_market
e = {"seriesSlug": "esl-pro-league", "title": "Tournament Winner"}
self.assertFalse(is_match_market(e))
def test_non_match_gameid_only(self):
"""Only gameId (no seriesSlug) -> non-match."""
from browse import is_match_market
e = {"gameId": "12345", "title": "Tournament Winner"}
self.assertFalse(is_match_market(e))
class TestGetMlMarket(unittest.TestCase):
"""Tests for get_ml_market() and get_ml_volume()."""
def test_get_ml_market_finds_moneyline(self):
"""Finds and returns the moneyline market."""
from browse import get_ml_market
e = {
"markets": [
{"sportsMarketType": "spread", "volume": "1000"},
{"sportsMarketType": "moneyline", "volume": "50000"},
{"sportsMarketType": "total", "volume": "2000"},
]
}
ml = get_ml_market(e)
self.assertEqual(ml["sportsMarketType"], "moneyline")
self.assertEqual(ml["volume"], "50000")
def test_get_ml_market_returns_none_when_missing(self):
"""Returns None when no moneyline market exists."""
from browse import get_ml_market
e = {"markets": [{"sportsMarketType": "spread", "volume": "1000"}]}
self.assertIsNone(get_ml_market(e))
def test_get_ml_market_returns_none_when_no_markets(self):
"""Returns None when event has no markets."""
from browse import get_ml_market
e = {}
self.assertIsNone(get_ml_market(e))
def test_get_ml_volume_with_ml(self):
"""Returns float volume from moneyline market."""
from browse import get_ml_volume
e = {
"markets": [
{"sportsMarketType": "moneyline", "volume": "123456"}
]
}
self.assertEqual(get_ml_volume(e), 123456.0)
def test_get_ml_volume_no_ml(self):
"""Returns 0.0 when no moneyline market."""
from browse import get_ml_volume
e = {"markets": []}
self.assertEqual(get_ml_volume(e), 0.0)
class TestFilterEvents(unittest.TestCase):
"""Tests for filter_events() and sort_events()."""
def _make_match(self, match_id, tradeable=True, vol="50000"):
return {
"id": str(match_id),
"title": f"Team A vs Team B - Match {match_id}",
"seriesSlug": "test-league",
"gameId": str(match_id),
"markets": [{
"sportsMarketType": "moneyline",
"volume": vol,
"bestBid": "0.50",
"bestAsk": "0.52",
"acceptingOrders": tradeable,
"closed": False,
}],
}
def _make_non_match(self, event_id, tradeable=True):
return {
"id": f"nm{event_id}",
"title": f"Will event {event_id} happen?",
"markets": [{
"sportsMarketType": "moneyline",
"volume": "10000",
"bestBid": "0.50",
"bestAsk": "0.52",
"acceptingOrders": tradeable,
"closed": False,
}],
}
def test_filter_events_splits_match_and_non_match(self):
"""Correctly splits events into match and non-match buckets."""
from browse import filter_events
events = [
self._make_match(1),
self._make_non_match(1),
self._make_match(2),
self._make_non_match(2),
]
matches, non_matches = filter_events(events, tradeable_only=False)
self.assertEqual(len(matches), 2)
self.assertEqual(len(non_matches), 2)
self.assertEqual(matches[0]["id"], "1")
self.assertEqual(non_matches[0]["id"], "nm1")
def test_filter_events_tradeable_only(self):
"""tradeable_only=True filters out non-tradeable events."""
from browse import filter_events
events = [
self._make_match(1, tradeable=True),
self._make_match(2, tradeable=False),
self._make_non_match(1),
]
matches, non_matches = filter_events(events, tradeable_only=True)
self.assertEqual(len(matches), 1)
self.assertEqual(matches[0]["id"], "1")
self.assertEqual(len(non_matches), 1) # non-match with acceptingOrders=True passes
def test_filter_events_tradeable_only_false(self):
"""tradeable_only=False keeps all events."""
from browse import filter_events
events = [
self._make_match(1, tradeable=True),
self._make_match(2, tradeable=False),
self._make_non_match(1, tradeable=True),
self._make_non_match(2, tradeable=False),
]
matches, non_matches = filter_events(events, tradeable_only=False)
self.assertEqual(len(matches), 2)
self.assertEqual(len(non_matches), 2)
def test_sort_events_by_volume_desc(self):
"""sort_events returns events sorted by volume descending."""
from browse import sort_events
events = [
self._make_match(1, vol="10000"),
self._make_match(2, vol="50000"),
self._make_match(3, vol="30000"),
]
sorted_evts = sort_events(events)
self.assertEqual(sorted_evts[0]["id"], "2") # vol=50000
self.assertEqual(sorted_evts[1]["id"], "3") # vol=30000
self.assertEqual(sorted_evts[2]["id"], "1") # vol=10000
def test_sort_events_empty_list(self):
"""sort_events handles empty list gracefully."""
from browse import sort_events
result = sort_events([])
self.assertEqual(result, [])
class TestFetchAllPages(unittest.TestCase):
"""Tests for fetch_all_pages() early-exit logic."""
@patch('browse.fetch_page')
@patch('browse.time.sleep')
def test_early_exit_stops_when_both_quotas_met(self, mock_sleep, mock_fetch_page):
"""Stops fetching once both match and non-match quotas are satisfied."""
from browse import fetch_all_pages
# Page 1: 2 matches, 2 non-matches (neither quota met)
page1 = {
"events": [
{"id": "m1", "title": "Match 1", "seriesSlug": "x", "gameId": "1", "markets": []},
{"id": "m2", "title": "Match 2", "seriesSlug": "x", "gameId": "2", "markets": []},
{"id": "n1", "title": "Non-match 1", "markets": []},
{"id": "n2", "title": "Non-match 2", "markets": []},
],
"pagination": {"totalResults": 10, "hasMore": True}
}
# Page 2: 1 match, 1 non-match (both quotas met: 3 matches >= 3, 3 non-matches >= 3)
page2 = {
"events": [
{"id": "m3", "title": "Match 3", "seriesSlug": "x", "gameId": "3", "markets": []},
{"id": "n3", "title": "Non-match 3", "markets": []},
{"id": "m4", "title": "Match 4", "seriesSlug": "x", "gameId": "4", "markets": []},
{"id": "n4", "title": "Non-match 4", "markets": []},
],
"pagination": {"totalResults": 10, "hasMore": True}
}
mock_fetch_page.side_effect = [page1, page2] # should NOT reach page 2
result = fetch_all_pages("test", matches_max=3, non_matches_max=3)
# Should stop after page 1 (quota met: 2 matches < 3? NO wait)
# Let me recount: page1 has 2 matches + 2 non-matches. Quota is 3+3. Not met.
# But page2 would be the same... let me think again.
# Actually the test above is: page1 = 2+2=4 items, page2 = 2+2=4 items
# Quotas: matches_max=3, non_matches_max=3
# After page1: match_count=2, non_match_count=2. Neither quota met.
# After page2: match_count=4, non_match_count=4. Both >= quota. Stop.
# So should call page1 and page2 only.
self.assertEqual(mock_fetch_page.call_count, 2)
@patch('browse.fetch_page')
@patch('browse.time.sleep')
def test_no_quota_fetches_all_pages(self, mock_sleep, mock_fetch_page):
"""Without quotas, fetches all pages until pagination ends."""
from browse import fetch_all_pages
page1 = {
"events": [{"id": "e1", "title": "Event 1", "markets": []}],
"pagination": {"totalResults": 3, "hasMore": True}
}
page2 = {
"events": [{"id": "e2", "title": "Event 2", "markets": []}],
"pagination": {"totalResults": 3, "hasMore": True}
}
page3 = {
"events": [{"id": "e3", "title": "Event 3", "markets": []}],
"pagination": {"totalResults": 3, "hasMore": False}
}
mock_fetch_page.side_effect = [page1, page2, page3]
result = fetch_all_pages("test")
self.assertEqual(mock_fetch_page.call_count, 3)
self.assertEqual(len(result["events"]), 3)
self.assertFalse(result["partial"])
@patch('browse.fetch_page')
@patch('browse.time.sleep')
def test_early_exit_partial_true_when_stopped_early(self, mock_sleep, mock_fetch_page):
"""Returns partial=True when stopped early due to quota."""
from browse import fetch_all_pages
page1 = {
"events": [
{"id": "m1", "title": "Match 1", "seriesSlug": "x", "gameId": "1", "markets": []},
{"id": "m2", "title": "Match 2", "seriesSlug": "x", "gameId": "2", "markets": []},
{"id": "m3", "title": "Match 3", "seriesSlug": "x", "gameId": "3", "markets": []},
],
"pagination": {"totalResults": 100, "hasMore": True}
}
mock_fetch_page.return_value = page1
result = fetch_all_pages("test", matches_max=3, non_matches_max=3)
# After page1: match_count=3 >= 3, non_match_count=0 < 3. Non-match quota NOT met.
# So should continue to page2...
# Let me make a better test: page1 has 3 matches and 3 non-matches (both quotas met)
# But they need to be is_match_market -> need seriesSlug+gameId OR " vs "
# Actually the early exit checks match_count >= matches_max AND non_match_count >= non_matches_max
# So we need both to be met.
pass # test needs fixing, let me redo
@patch('browse.fetch_page')
@patch('browse.time.sleep')
def test_quota_one_side_only_keeps_fetching(self, mock_sleep, mock_fetch_page):
"""If only one quota is met, keeps fetching."""
from browse import fetch_all_pages
# Page 1: 3 matches, 0 non-matches (matches quota met, non_matches NOT met)
page1 = {
"events": [
{"id": "m1", "title": "Match 1", "seriesSlug": "x", "gameId": "1", "markets": []},
{"id": "m2", "title": "Match 2", "seriesSlug": "x", "gameId": "2", "markets": []},
{"id": "m3", "title": "Match 3", "seriesSlug": "x", "gameId": "3", "markets": []},
],
"pagination": {"totalResults": 10, "hasMore": True}
}
# Page 2: 0 matches, 3 non-matches (now both quotas met)
page2 = {
"events": [
{"id": "n1", "title": "Non-match 1", "markets": []},
{"id": "n2", "title": "Non-match 2", "markets": []},
{"id": "n3", "title": "Non-match 3", "markets": []},
],
"pagination": {"totalResults": 10, "hasMore": True}
}
mock_fetch_page.side_effect = [page1, page2]
result = fetch_all_pages("test", matches_max=3, non_matches_max=3)
self.assertEqual(mock_fetch_page.call_count, 2)
self.assertEqual(len(result["events"]), 6)
class TestBrowseEvents(unittest.TestCase):
"""Tests for browse_events() with sort_by parameter."""
@patch('browse.fetch_all_pages')
def test_browse_events_early_exit_sort_by_none(self, mock_fetch):
"""sort_by=None uses early-exit: passes quotas to fetch_all_pages."""
from browse import browse_events
mock_fetch.return_value = {
"events": [
{"id": "m1", "title": "Match 1", "seriesSlug": "x", "gameId": "1",
"markets": [{"sportsMarketType": "moneyline", "volume": "50000"}]},
],
"total_raw": 1,
"partial": False,
}
result = browse_events("test query", matches_max=5, non_matches_max=5, sort_by=None)
# Should pass quotas to fetch_all_pages for early-exit
mock_fetch.assert_called_once()
call_kwargs = mock_fetch.call_args
self.assertEqual(call_kwargs[1]["matches_max"], 5)
self.assertEqual(call_kwargs[1]["non_matches_max"], 5)
@patch('browse.fetch_all_pages')
def test_browse_events_volume_sort_full_fetch(self, mock_fetch):
"""sort_by='volume' does full fetch (no quotas passed)."""
from browse import browse_events
mock_fetch.return_value = {
"events": [
{"id": "m1", "title": "Match 1", "seriesSlug": "x", "gameId": "1",
"markets": [{"sportsMarketType": "moneyline", "volume": "10000"}]},
{"id": "m2", "title": "Match 2", "seriesSlug": "x", "gameId": "2",
"markets": [{"sportsMarketType": "moneyline", "volume": "50000"}]},
],
"total_raw": 2,
"partial": False,
}
result = browse_events("test query", matches_max=5, non_matches_max=5, sort_by="volume")
# Should pass None quotas to fetch_all_pages (full fetch)
call_kwargs = mock_fetch.call_args
self.assertIsNone(call_kwargs[1]["matches_max"])
self.assertIsNone(call_kwargs[1]["non_matches_max"])
@patch('browse.fetch_all_pages')
def test_browse_events_volume_sort_sorts_by_volume(self, mock_fetch):
"""sort_by='volume' sorts match events by volume descending."""
from browse import browse_events
mock_fetch.return_value = {
"events": [
{"id": "m1", "title": "Match Low", "seriesSlug": "x", "gameId": "1",
"markets": [{"sportsMarketType": "moneyline", "volume": "10000",
"bestBid": "0.50", "bestAsk": "0.52",
"acceptingOrders": True, "closed": False}]},
{"id": "m2", "title": "Match High", "seriesSlug": "x", "gameId": "2",
"markets": [{"sportsMarketType": "moneyline", "volume": "90000",
"bestBid": "0.50", "bestAsk": "0.52",
"acceptingOrders": True, "closed": False}]},
{"id": "m3", "title": "Match Mid", "seriesSlug": "x", "gameId": "3",
"markets": [{"sportsMarketType": "moneyline", "volume": "50000",
"bestBid": "0.50", "bestAsk": "0.52",
"acceptingOrders": True, "closed": False}]},
],
"total_raw": 3,
"partial": False,
}
result = browse_events("test", matches_max=10, non_matches_max=10, sort_by="volume")
# Highest volume first
self.assertEqual(result["match_events"][0]["id"], "m2") # vol=90000
self.assertEqual(result["match_events"][1]["id"], "m3") # vol=50000
self.assertEqual(result["match_events"][2]["id"], "m1") # vol=10000
@patch('browse.fetch_all_pages')
def test_browse_events_api_order_preserved_when_no_sort(self, mock_fetch):
"""sort_by=None preserves API order (no sort applied)."""
from browse import browse_events
mock_fetch.return_value = {
"events": [
{"id": "m1", "title": "Match First", "seriesSlug": "x", "gameId": "1",
"markets": [{"sportsMarketType": "moneyline", "volume": "1",
"bestBid": "0.50", "bestAsk": "0.52",
"acceptingOrders": True, "closed": False}]},
{"id": "m2", "title": "Match Second", "seriesSlug": "x", "gameId": "2",
"markets": [{"sportsMarketType": "moneyline", "volume": "999999",
"bestBid": "0.50", "bestAsk": "0.52",
"acceptingOrders": True, "closed": False}]},
],
"total_raw": 2,
"partial": False,
}
result = browse_events("test", matches_max=10, sort_by=None)
# API order preserved: m1 first even though m2 has higher volume
self.assertEqual(result["match_events"][0]["id"], "m1")
self.assertEqual(result["match_events"][1]["id"], "m2")
@patch('browse.fetch_all_pages')
def test_browse_events_returns_all_required_fields(self, mock_fetch):
"""Result dict contains all required fields."""
from browse import browse_events
mock_fetch.return_value = {
"events": [],
"total_raw": 0,
"partial": False,
}
result = browse_events("test")
self.assertIn("query", result)
self.assertIn("total_raw", result)
self.assertIn("total_fetched", result)
self.assertIn("total_match", result)
self.assertIn("total_non_match", result)
self.assertIn("match_events", result)
self.assertIn("non_match_events", result)
self.assertIn("partial", result)
if __name__ == "__main__":
unittest.main()