Compare commits
8 Commits
hermes/her
...
d018e87b35
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d018e87b35 | ||
|
|
8cde441996 | ||
|
|
d0534aedbf | ||
| 2703b942c1 | |||
|
|
f9c4bac7b8 | ||
|
|
c49600cd4d | ||
|
|
3a988943b9 | ||
|
|
da367c594b |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@ __pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
.DS_Store
|
||||
.worktrees/
|
||||
|
||||
@@ -4,11 +4,13 @@ Polymarket Event Browser
|
||||
Browse tradeable Polymarket events by game category.
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import html
|
||||
import json
|
||||
import time
|
||||
import argparse
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen, Request
|
||||
|
||||
# ============================================================
|
||||
# CONFIG
|
||||
@@ -17,6 +19,7 @@ from datetime import datetime, timezone, timedelta
|
||||
PAGE_SIZE = 50
|
||||
MAX_RETRIES = 5
|
||||
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
|
||||
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
|
||||
|
||||
GAME_CATEGORIES = {
|
||||
"All Esports": "Esports",
|
||||
@@ -219,94 +222,79 @@ def format_spread(bid, ask):
|
||||
spread = ask - bid
|
||||
return f"{prob_to_cents(spread)}c"
|
||||
|
||||
def get_match_time_status(e):
|
||||
|
||||
def _get_time_data(e, tz=None):
|
||||
"""
|
||||
Return a human-readable match time status.
|
||||
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
|
||||
Uses startTime for actual match start time.
|
||||
Displays times in WIB (UTC+7 for Indonesian users).
|
||||
Unified time data extraction for event timestamps.
|
||||
|
||||
Uses startTime (preferred) or startDate as the event start time.
|
||||
Datetime parsing and all relative calculations are UTC-based.
|
||||
The tz parameter only affects the abs_time formatting.
|
||||
|
||||
Args:
|
||||
e: Event dict with 'startTime' or 'startDate' key.
|
||||
tz: datetime.timezone for abs_time formatting.
|
||||
Defaults to WIB (UTC+7).
|
||||
|
||||
Returns:
|
||||
{
|
||||
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
|
||||
"time_urgency": int, # 0-3 (higher = more urgent/live)
|
||||
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
|
||||
}
|
||||
"""
|
||||
# Use startTime for actual match start, not startDate (which is market creation time)
|
||||
tz = tz or WIB
|
||||
start_str = e.get("startTime") or e.get("startDate", "")
|
||||
|
||||
if not start_str:
|
||||
return "TBD", 0
|
||||
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
|
||||
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
utc7 = timezone(timedelta(hours=7))
|
||||
now = now_utc.astimezone(utc7)
|
||||
start_utc7 = start_dt.astimezone(utc7)
|
||||
|
||||
delta = start_dt - now_utc
|
||||
total_sec = delta.total_seconds()
|
||||
|
||||
if delta.total_seconds() < 0:
|
||||
# Started already
|
||||
hours_ago = abs(delta.total_seconds()) / 3600
|
||||
if total_sec < 0:
|
||||
# Event is in the past
|
||||
hours_ago = abs(total_sec) / 3600
|
||||
if hours_ago < 1:
|
||||
return "LIVE", 3
|
||||
time_status = "LIVE"
|
||||
time_urgency = 3
|
||||
elif hours_ago < 4:
|
||||
return f"LIVE {int(hours_ago)}h", 3
|
||||
time_status = f"LIVE {int(hours_ago)}h"
|
||||
time_urgency = 3
|
||||
elif hours_ago < 24:
|
||||
return f"Started {int(hours_ago)}h ago", 1
|
||||
time_status = f"{int(hours_ago)}h ago"
|
||||
time_urgency = 1
|
||||
else:
|
||||
days = int(hours_ago / 24)
|
||||
return f"{days}d ago", 0
|
||||
time_status = f"{days}d ago"
|
||||
time_urgency = 0
|
||||
else:
|
||||
# Starts in future
|
||||
hours_until = delta.total_seconds() / 3600
|
||||
if hours_until <= 0:
|
||||
return "LIVE", 3
|
||||
elif hours_until < 1:
|
||||
mins = int(delta.total_seconds() / 60)
|
||||
return f"In {mins}m", 3
|
||||
elif hours_until < 24:
|
||||
return f"In {int(hours_until)}h", 2
|
||||
# Event is in the future
|
||||
if total_sec < 3600:
|
||||
mins = int(total_sec / 60)
|
||||
time_status = f"In {mins}m"
|
||||
time_urgency = 3
|
||||
elif total_sec < 86400:
|
||||
hours_until = int(total_sec / 3600)
|
||||
time_status = f"In {hours_until}h"
|
||||
time_urgency = 2
|
||||
else:
|
||||
days = int(hours_until / 24)
|
||||
return f"In {days}d", 1
|
||||
except:
|
||||
return "", 0
|
||||
days = int(total_sec / 86400)
|
||||
time_status = f"In {days}d"
|
||||
time_urgency = 1
|
||||
|
||||
def get_match_time_str(e):
|
||||
"""
|
||||
Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
|
||||
Uses startTime for actual match start time.
|
||||
"""
|
||||
start_str = e.get("startTime") or e.get("startDate", "")
|
||||
if not start_str:
|
||||
return "TBD"
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
delta = start_dt - now_utc
|
||||
|
||||
if delta.total_seconds() < 0:
|
||||
hours_ago = abs(delta.total_seconds()) / 3600
|
||||
if hours_ago < 1:
|
||||
return "LIVE"
|
||||
elif hours_ago < 4:
|
||||
return f"LIVE {int(hours_ago)}h"
|
||||
elif hours_ago < 24:
|
||||
return f"{int(hours_ago)}h ago"
|
||||
else:
|
||||
days = int(hours_ago / 24)
|
||||
return f"{days}d ago"
|
||||
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ")
|
||||
if tz == WIB:
|
||||
abs_time += "WIB"
|
||||
else:
|
||||
hours_until = delta.total_seconds() / 3600
|
||||
if hours_until <= 0:
|
||||
return "LIVE"
|
||||
elif hours_until < 1:
|
||||
mins = int(delta.total_seconds() / 60)
|
||||
return f"In {mins}m"
|
||||
elif hours_until < 24:
|
||||
return f"In {int(hours_until)}h"
|
||||
else:
|
||||
days = int(hours_until / 24)
|
||||
return f"In {days}d"
|
||||
except:
|
||||
return ""
|
||||
abs_time += start_dt.astimezone(tz).strftime("%Z")
|
||||
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
|
||||
except Exception:
|
||||
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
|
||||
|
||||
|
||||
def filter_events(events, tradeable_only=True):
|
||||
"""
|
||||
@@ -325,6 +313,7 @@ def filter_events(events, tradeable_only=True):
|
||||
|
||||
return match_events, non_match_events
|
||||
|
||||
|
||||
def sort_events(events):
|
||||
return sorted(events, key=get_ml_volume, reverse=True)
|
||||
|
||||
@@ -349,7 +338,176 @@ def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
|
||||
}
|
||||
|
||||
# ============================================================
|
||||
# FORMAT
|
||||
# FORMAT — EVENT
|
||||
# ============================================================
|
||||
|
||||
def format_match_event(e):
|
||||
"""
|
||||
Format a match event into a canonical dict for rendering.
|
||||
All computing done here; renderers just template.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"title": str, # raw title
|
||||
"title_clean": str, # "Team A vs Team B"
|
||||
"tournament": str, # "Tournament Name" or ""
|
||||
"url": str,
|
||||
"time_status": str, # "LIVE", "In 6h", "12h ago"
|
||||
"time_urgency": int, # 0-3
|
||||
"abs_time": str, # "Mar 25, 19:00 WIB"
|
||||
"team_a": str,
|
||||
"team_b": str,
|
||||
"odds_a": str, # "55c"
|
||||
"odds_b": str,
|
||||
"vol": int,
|
||||
}
|
||||
"""
|
||||
ml = get_ml_market(e)
|
||||
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
|
||||
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
|
||||
td = _get_time_data(e)
|
||||
title = e.get("title", "")
|
||||
|
||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
||||
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
||||
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
||||
|
||||
if " - " in title:
|
||||
title_clean = title.split(" - ")[0].strip()
|
||||
else:
|
||||
title_clean = title
|
||||
|
||||
tournament = get_tournament(title)
|
||||
|
||||
return {
|
||||
"title": title,
|
||||
"title_clean": title_clean,
|
||||
"tournament": tournament,
|
||||
"url": get_event_url(e),
|
||||
"time_status": td["time_status"],
|
||||
"time_urgency": td["time_urgency"],
|
||||
"abs_time": td["abs_time"],
|
||||
"team_a": team_a,
|
||||
"team_b": team_b,
|
||||
"odds_a": odds_a,
|
||||
"odds_b": odds_b,
|
||||
"vol": get_ml_volume(e),
|
||||
}
|
||||
|
||||
|
||||
def format_non_match_event(e):
|
||||
"""
|
||||
Format a non-match event into a canonical dict for rendering.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"title": str,
|
||||
"url": str,
|
||||
"time_status": str,
|
||||
"time_urgency": int,
|
||||
"abs_time": str,
|
||||
"market_count": int,
|
||||
"total_vol": int,
|
||||
}
|
||||
"""
|
||||
td = _get_time_data(e)
|
||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
||||
market_count = len(e.get("markets", []))
|
||||
|
||||
return {
|
||||
"title": e.get("title", "?"),
|
||||
"url": get_event_url(e),
|
||||
"time_status": td["time_status"],
|
||||
"time_urgency": td["time_urgency"],
|
||||
"abs_time": td["abs_time"],
|
||||
"market_count": market_count,
|
||||
"total_vol": int(total_vol),
|
||||
}
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FORMAT — RENDER
|
||||
# ============================================================
|
||||
|
||||
def render_match_lines(event_dict, i, mode):
|
||||
"""
|
||||
Render a formatted match event dict into lines of text.
|
||||
|
||||
Args:
|
||||
event_dict: canonical dict from format_match_event()
|
||||
i: 1-based index for the event number
|
||||
mode: "text" for plain text/Markdown, "html" for Telegram HTML
|
||||
|
||||
Returns:
|
||||
List[str], one line per element (no trailing blank line).
|
||||
Caller adds the blank line separator between events.
|
||||
"""
|
||||
title_clean = event_dict["title_clean"]
|
||||
url = event_dict["url"]
|
||||
abs_time = event_dict["abs_time"]
|
||||
time_status = event_dict["time_status"]
|
||||
vol = event_dict["vol"]
|
||||
tournament = event_dict["tournament"]
|
||||
team_a = event_dict["team_a"]
|
||||
team_b = event_dict["team_b"]
|
||||
odds_a = event_dict["odds_a"]
|
||||
odds_b = event_dict["odds_b"]
|
||||
|
||||
lines = []
|
||||
|
||||
if mode == "html":
|
||||
lines.append(
|
||||
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
|
||||
)
|
||||
else:
|
||||
lines.append(f"{i}. [{title_clean}]({url})")
|
||||
|
||||
lines.append(f" {abs_time} | {time_status}")
|
||||
lines.append(f" Vol: ${vol:,.0f}")
|
||||
|
||||
if tournament:
|
||||
lines.append(f" Tournament: {tournament}")
|
||||
|
||||
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
def render_non_match_lines(event_dict, i, mode):
|
||||
"""
|
||||
Render a formatted non-match event dict into lines of text.
|
||||
|
||||
Args:
|
||||
event_dict: canonical dict from format_non_match_event()
|
||||
i: 1-based index for the event number
|
||||
mode: "text" for plain text/Markdown, "html" for Telegram HTML
|
||||
|
||||
Returns:
|
||||
List[str], one line per element (no trailing blank line).
|
||||
"""
|
||||
title = event_dict["title"]
|
||||
url = event_dict["url"]
|
||||
abs_time = event_dict["abs_time"]
|
||||
time_status = event_dict["time_status"]
|
||||
market_count = event_dict["market_count"]
|
||||
total_vol = event_dict["total_vol"]
|
||||
|
||||
lines = []
|
||||
|
||||
if mode == "html":
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
|
||||
else:
|
||||
lines.append(f"{i}. [{title}]({url})")
|
||||
|
||||
lines.append(f" {abs_time} | {time_status}")
|
||||
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FORMAT — LEGACY
|
||||
# ============================================================
|
||||
|
||||
def format_event(e):
|
||||
@@ -359,12 +517,12 @@ def format_event(e):
|
||||
best_bid = float(ml.get("bestBid", 0)) if ml else 0
|
||||
best_ask = float(ml.get("bestAsk", 0)) if ml else 0
|
||||
vol = get_ml_volume(e)
|
||||
time_status, urgency = get_match_time_status(e)
|
||||
td = _get_time_data(e)
|
||||
|
||||
return {
|
||||
"title": e.get("title", ""),
|
||||
"time_status": time_status,
|
||||
"time_urgency": urgency,
|
||||
"time_status": td["time_status"],
|
||||
"time_urgency": td["time_urgency"],
|
||||
"url": get_event_url(e),
|
||||
"livestream": e.get("resolutionSource"),
|
||||
"outcomes": outcomes,
|
||||
@@ -383,11 +541,12 @@ def format_detail_event(e):
|
||||
]
|
||||
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
|
||||
|
||||
time_status, urgency = get_match_time_status(e)
|
||||
td = _get_time_data(e)
|
||||
|
||||
return {
|
||||
"title": e.get("title", ""),
|
||||
"time_status": time_status,
|
||||
"time_status": td["time_status"],
|
||||
"abs_time": td["abs_time"],
|
||||
"url": get_event_url(e),
|
||||
"livestream": e.get("resolutionSource"),
|
||||
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
|
||||
@@ -414,48 +573,6 @@ def format_detail_event(e):
|
||||
# DISPLAY
|
||||
# ============================================================
|
||||
|
||||
def get_start_time_wib(e):
|
||||
"""Return (date_time_str, relative_str) for display."""
|
||||
start_str = e.get("startTime") or e.get("startDate", "")
|
||||
if not start_str:
|
||||
return "TBD", ""
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
utc7 = timezone(timedelta(hours=7))
|
||||
start_utc7 = start_dt.astimezone(utc7)
|
||||
|
||||
# Absolute: "Mar 25, 19:00 WIB"
|
||||
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
|
||||
|
||||
# Relative: "In 5h", "In 10h", "LIVE", etc.
|
||||
delta = start_dt - now_utc
|
||||
if delta.total_seconds() < 0:
|
||||
hours_ago = abs(delta.total_seconds()) / 3600
|
||||
if hours_ago < 1:
|
||||
rel_str = "LIVE"
|
||||
elif hours_ago < 24:
|
||||
rel_str = f"{int(hours_ago)}h ago"
|
||||
else:
|
||||
days = int(hours_ago / 24)
|
||||
rel_str = f"{days}d ago"
|
||||
else:
|
||||
hours_until = delta.total_seconds() / 3600
|
||||
if hours_until <= 0:
|
||||
rel_str = "LIVE"
|
||||
elif hours_until < 1:
|
||||
mins_until = int(delta.total_seconds() / 60)
|
||||
rel_str = f"In {mins_until}m"
|
||||
elif hours_until < 24:
|
||||
rel_str = f"In {int(hours_until)}h"
|
||||
else:
|
||||
days = int(hours_until / 24)
|
||||
rel_str = f"In {days}d"
|
||||
|
||||
return abs_str, rel_str
|
||||
except:
|
||||
return "TBD", ""
|
||||
|
||||
def get_header_date():
|
||||
"""Return current date string like 'Mar 25, 2026'"""
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
@@ -486,9 +603,8 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
||||
if partial:
|
||||
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
|
||||
|
||||
# --- MATCH MARKETS ---
|
||||
# Determine sections to show
|
||||
if not matches_only and not non_matches_only:
|
||||
# Default: show both
|
||||
show_matches = True
|
||||
show_non_matches = True
|
||||
elif matches_only:
|
||||
@@ -498,68 +614,31 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
||||
show_matches = False
|
||||
show_non_matches = True
|
||||
|
||||
# Match events
|
||||
if show_matches:
|
||||
print(f"\nMATCH MARKETS")
|
||||
print("\nMATCH MARKETS")
|
||||
if not match_events:
|
||||
print(" No match markets found.")
|
||||
else:
|
||||
for i, e in enumerate(match_events, 1):
|
||||
f = format_event(e)
|
||||
ml = get_ml_market(e)
|
||||
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
|
||||
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
|
||||
vol = f["volume"]
|
||||
title = f["title"]
|
||||
url = f["url"]
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
fd = format_match_event(e)
|
||||
for line in render_match_lines(fd, i, mode="text"):
|
||||
print(line)
|
||||
|
||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
||||
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
||||
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
||||
|
||||
if " - " in title:
|
||||
title_clean = title.split(" - ")[0].strip()
|
||||
else:
|
||||
title_clean = title
|
||||
|
||||
tournament = get_tournament(title)
|
||||
|
||||
print(f"\n {i}. [{title_clean}]({url})")
|
||||
print(f" {start_time_wib} | {rel_time}")
|
||||
print(f" Vol: ${vol:,.0f}")
|
||||
if tournament:
|
||||
print(f" Tournament: {tournament}")
|
||||
print(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
|
||||
|
||||
# --- NON-MATCH MARKETS ---
|
||||
# Non-match events
|
||||
if show_non_matches and non_match_events:
|
||||
print(f"\nNON-MATCH MARKETS")
|
||||
|
||||
print("\nNON-MATCH MARKETS")
|
||||
for i, e in enumerate(non_match_events[:non_matches_max], 1):
|
||||
title = e.get("title", "?")
|
||||
url = get_event_url(e)
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
|
||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
||||
market_count = len(e.get("markets", []))
|
||||
|
||||
print(f"\n {i}. [{title}]({url})")
|
||||
print(f" {start_time_wib} | {rel_time}")
|
||||
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
||||
fd = format_non_match_event(e)
|
||||
for line in render_non_match_lines(fd, i, mode="text"):
|
||||
print(line)
|
||||
|
||||
def print_detail(e, detail):
|
||||
from datetime import datetime, timezone, timedelta
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
utc7 = timezone(timedelta(hours=7))
|
||||
now_utc7 = now_utc.astimezone(utc7)
|
||||
|
||||
print(f"\n{detail['title']}")
|
||||
print(f"URL: {detail['url']}")
|
||||
print(f"Livestream: {detail['livestream']}")
|
||||
|
||||
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
|
||||
time_str = get_match_time_str(e)
|
||||
print(f"\n{detail['time_status']}")
|
||||
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
|
||||
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
|
||||
@@ -576,14 +655,44 @@ def print_detail(e, detail):
|
||||
# TELEGRAM
|
||||
# ============================================================
|
||||
|
||||
def escape_html(text):
|
||||
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
|
||||
return (text
|
||||
.replace("&", "&")
|
||||
.replace("<", "<")
|
||||
.replace(">", ">")
|
||||
.replace('"', """))
|
||||
|
||||
|
||||
def send_telegram_message(bot_token, chat_id, text, timeout=10):
|
||||
"""Send a message via Telegram bot API. Returns the message ID on success.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
|
||||
URLError/HTTPError: On network or HTTP-level failures.
|
||||
"""
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
data = urlencode({
|
||||
"chat_id": chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_web_page_preview": "true",
|
||||
}).encode("utf-8")
|
||||
req = Request(url, data=data, method="POST")
|
||||
with urlopen(req, timeout=timeout) as resp:
|
||||
result = json.loads(resp.read())
|
||||
if not result.get("ok"):
|
||||
raise RuntimeError(f"Telegram API error: {result.get('description')}")
|
||||
return result["result"]["message_id"]
|
||||
|
||||
|
||||
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
|
||||
"""Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
|
||||
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
|
||||
import os
|
||||
bot_token = os.environ.get("BOT_TOKEN")
|
||||
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
|
||||
chat_id = os.environ.get("CHAT_ID")
|
||||
if not bot_token or not chat_id:
|
||||
print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
|
||||
return
|
||||
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
|
||||
|
||||
from datetime import datetime, timezone, timedelta
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
@@ -596,92 +705,71 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
||||
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
|
||||
|
||||
def send(text):
|
||||
result = subprocess.run(
|
||||
["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
|
||||
"-d", f"chat_id={chat_id}",
|
||||
"-d", f"text={text}",
|
||||
"-d", "parse_mode=HTML",
|
||||
"-d", "disable_web_page_preview=true"],
|
||||
capture_output=True
|
||||
)
|
||||
resp = json.loads(result.stdout.decode())
|
||||
if resp.get("ok"):
|
||||
print(f" Sent msg {resp['result']['message_id']}")
|
||||
else:
|
||||
print(f" Error: {resp.get('description')}")
|
||||
msg_id = send_telegram_message(bot_token, chat_id, text)
|
||||
print(f" Sent msg {msg_id}")
|
||||
|
||||
# Build sections
|
||||
lines = [f"<b>{category.upper()}</b> | {header_date}"]
|
||||
lines.append("")
|
||||
# Build lines
|
||||
lines = [f"<b>{category.upper()}</b> | {header_date}", ""]
|
||||
|
||||
if show_matches:
|
||||
lines.append("MATCH MARKETS")
|
||||
lines.append("")
|
||||
lines += ["MATCH MARKETS", ""]
|
||||
if not match_events:
|
||||
lines.append(" No match markets found.")
|
||||
else:
|
||||
for i, e in enumerate(match_events, 1):
|
||||
ml = get_ml_market(e)
|
||||
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
|
||||
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
|
||||
vol = get_ml_volume(e)
|
||||
title = e.get("title", "?")
|
||||
url = get_event_url(e)
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
||||
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
||||
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
||||
tournament = get_tournament(title)
|
||||
title_clean = title.split(" - ")[0].strip() if " - " in title else title
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
|
||||
lines.append(f" {start_time_wib} | {rel_time}")
|
||||
lines.append(f" Vol: ${vol:,.0f}")
|
||||
if tournament:
|
||||
lines.append(f" Tournament: {tournament}")
|
||||
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
|
||||
fd = format_match_event(e)
|
||||
lines += render_match_lines(fd, i, mode="html")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
|
||||
if show_non_matches:
|
||||
lines.append("NON-MATCH MARKETS")
|
||||
lines.append("")
|
||||
lines += ["NON-MATCH MARKETS", ""]
|
||||
if not non_match_events:
|
||||
lines.append(" No non-match markets found.")
|
||||
else:
|
||||
for i, e in enumerate(non_match_events, 1):
|
||||
title = e.get("title", "?")
|
||||
url = get_event_url(e)
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
||||
market_count = len(e.get("markets", []))
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
|
||||
lines.append(f" {start_time_wib} | {rel_time}")
|
||||
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
||||
fd = format_non_match_event(e)
|
||||
lines += render_non_match_lines(fd, i, mode="html")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
|
||||
# Chunk by 10 items (events), respecting 4096 char Telegram limit
|
||||
text = "\n".join(lines)
|
||||
# Chunk and send
|
||||
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
|
||||
|
||||
|
||||
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
|
||||
"""
|
||||
Split already-built lines into Telegram-safe chunks and send them.
|
||||
|
||||
Telegram messages are capped at 4096 chars. Chunks are grouped by
|
||||
section header so no event is split across messages.
|
||||
|
||||
Args:
|
||||
all_lines: Full message lines list (built by caller).
|
||||
send_fn: Closure that sends a single string and prints confirmation.
|
||||
category: Category name for header.
|
||||
header_date: Date string for header.
|
||||
show_matches: Whether MATCH MARKETS section is present.
|
||||
show_non_matches: Whether NON-MATCH MARKETS section is present.
|
||||
"""
|
||||
text = "\n".join(all_lines)
|
||||
if len(text) <= 4096:
|
||||
send(text)
|
||||
send_fn(text)
|
||||
return
|
||||
|
||||
# Split into chunks of 10 events
|
||||
# Split into chunks of 10 events, respecting section headers
|
||||
all_items = []
|
||||
in_match = True
|
||||
for line in lines:
|
||||
for line in all_lines:
|
||||
if line == "MATCH MARKETS":
|
||||
in_match = True
|
||||
elif line == "NON-MATCH MARKETS":
|
||||
in_match = False
|
||||
elif line.startswith("<b>") and ". " in line and "</a>" in line:
|
||||
elif line.startswith("<b>") and "</a>" in line:
|
||||
# Event title line: <b>1.</b> <a href="...">Title</a>
|
||||
all_items.append((in_match, line))
|
||||
|
||||
chunk = []
|
||||
chunk_len = 0
|
||||
chunk_num = 1
|
||||
|
||||
# Header is always first
|
||||
header = f"<b>{category.upper()}</b> | {header_date}\n"
|
||||
if show_matches:
|
||||
header += "\nMATCH MARKETS\n\n"
|
||||
@@ -692,9 +780,8 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
||||
test_chunk = chunk + [item_line, ""]
|
||||
test_text = header + "\n".join(chunk) + "\n".join(test_chunk)
|
||||
if len(test_text) > 4096 or len(chunk) >= 10:
|
||||
# Send current chunk
|
||||
msg = header + "\n".join(chunk)
|
||||
send(msg)
|
||||
send_fn(msg)
|
||||
chunk = [item_line, ""]
|
||||
header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n"
|
||||
if show_matches and is_match:
|
||||
@@ -706,7 +793,7 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
||||
|
||||
if chunk:
|
||||
msg = header + "\n".join(chunk)
|
||||
send(msg)
|
||||
send_fn(msg)
|
||||
|
||||
|
||||
# ============================================================
|
||||
@@ -737,7 +824,7 @@ def main():
|
||||
parser.add_argument("--raw", action="store_true",
|
||||
help="Show all events without tradeable filter (for debugging).")
|
||||
parser.add_argument("--telegram", action="store_true",
|
||||
help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
|
||||
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list_categories:
|
||||
|
||||
1
skills/polymarket-browse/tests/__init__.py
Normal file
1
skills/polymarket-browse/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Tests package
|
||||
776
skills/polymarket-browse/tests/test_browse.py
Normal file
776
skills/polymarket-browse/tests/test_browse.py
Normal file
@@ -0,0 +1,776 @@
|
||||
"""
|
||||
Unit tests for browse.py Telegram functions.
|
||||
|
||||
Run with: python -m pytest tests/test_browse.py -v
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
|
||||
from browse import send_telegram_message
|
||||
|
||||
|
||||
class TestSendTelegramMessage(unittest.TestCase):
|
||||
"""Tests for the module-level send_telegram_message function."""
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_success(self, mock_urlopen):
|
||||
"""send_telegram_message returns message_id on success."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 123}}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
result = send_telegram_message("test_token", "test_chat", "hello world")
|
||||
|
||||
self.assertEqual(result, 123)
|
||||
mock_urlopen.assert_called_once()
|
||||
call_args = mock_urlopen.call_args
|
||||
req = call_args[0][0]
|
||||
self.assertEqual(req.full_url, "https://api.telegram.org/bottest_token/sendMessage")
|
||||
self.assertEqual(req.method, "POST")
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_api_error_raises_runtime_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises RuntimeError when Telegram API returns ok=false."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": false, "description": "Forbidden"}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
self.assertIn("Telegram API error: Forbidden", str(ctx.exception))
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_invalid_token_raises_http_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises HTTPError on invalid token (404)."""
|
||||
from urllib.error import HTTPError
|
||||
mock_urlopen.side_effect = HTTPError(
|
||||
url="https://api.telegram.org/botINVALID/sendMessage",
|
||||
code=404,
|
||||
msg="Not Found",
|
||||
hdrs={},
|
||||
fp=None
|
||||
)
|
||||
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
send_telegram_message("INVALID", "test_chat", "hello")
|
||||
self.assertEqual(ctx.exception.code, 404)
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_rate_limit_raises_http_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises HTTPError on rate limit (429)."""
|
||||
from urllib.error import HTTPError
|
||||
mock_urlopen.side_effect = HTTPError(
|
||||
url="https://api.telegram.org/bottest_token/sendMessage",
|
||||
code=429,
|
||||
msg="Too Many Requests",
|
||||
hdrs={},
|
||||
fp=None
|
||||
)
|
||||
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
self.assertEqual(ctx.exception.code, 429)
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_network_error_raises_url_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises URLError on network failure."""
|
||||
from urllib.error import URLError
|
||||
mock_urlopen.side_effect = URLError("Connection refused")
|
||||
|
||||
with self.assertRaises(URLError) as ctx:
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
self.assertIn("Connection refused", str(ctx.exception))
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_timeout_raises_url_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises URLError on timeout."""
|
||||
from urllib.error import URLError
|
||||
mock_urlopen.side_effect = URLError("<urlopen error TimeoutError: timed out>")
|
||||
|
||||
with self.assertRaises(URLError):
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_custom_timeout_used(self, mock_urlopen):
|
||||
"""send_telegram_message respects custom timeout parameter."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 456}}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
send_telegram_message("test_token", "test_chat", "hello", timeout=30)
|
||||
|
||||
call_kwargs = mock_urlopen.call_args[1]
|
||||
self.assertEqual(call_kwargs['timeout'], 30)
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_html_parsing_mode(self, mock_urlopen):
|
||||
"""send_telegram_message sends with parse_mode=HTML."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 789}}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
send_telegram_message("test_token", "test_chat", "<b>bold</b>")
|
||||
|
||||
call_args = mock_urlopen.call_args
|
||||
req = call_args[0][0]
|
||||
# Verify parse_mode=HTML is in the data
|
||||
self.assertIn(b"parse_mode=HTML", req.data)
|
||||
|
||||
|
||||
class TestHtmlInjection(unittest.TestCase):
|
||||
"""Tests for HTML injection prevention in Telegram messages."""
|
||||
|
||||
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
|
||||
@patch('browse.send_telegram_message')
|
||||
def test_send_to_telegram_html_injection_in_match_title(self, mock_send_msg):
|
||||
"""
|
||||
titles in match events are NOT escaped before inserting into HTML.
|
||||
This test FAILS if HTML chars are unescaped (vulnerable),
|
||||
and PASSES once escape_html() is implemented.
|
||||
"""
|
||||
mock_send_msg.return_value = 123
|
||||
|
||||
# Simulate a Polymarket event with HTML injection in the title
|
||||
malicious_event = {
|
||||
"title": "<script>alert('XSS')</script> - Team A vs Team B",
|
||||
"slug": "test-event",
|
||||
"startTime": "2027-03-26T12:00:00Z",
|
||||
"markets": [{
|
||||
"sportsMarketType": "moneyline",
|
||||
"outcomes": '["Team A", "Team B"]',
|
||||
"outcomePrices": "[0.55, 0.45]",
|
||||
"bestBid": "0.54",
|
||||
"bestAsk": "0.56",
|
||||
"volume": 50000,
|
||||
"acceptingOrders": True,
|
||||
"closed": False,
|
||||
}],
|
||||
}
|
||||
|
||||
from browse import send_to_telegram
|
||||
send_to_telegram([malicious_event], [], "Counter Strike")
|
||||
|
||||
# Check what was passed to send_telegram_message
|
||||
self.assertEqual(mock_send_msg.called, True)
|
||||
sent_text = mock_send_msg.call_args[0][2] # text arg (3rd positional)
|
||||
|
||||
# AFTER FIX: <script> should be escaped as <script>
|
||||
# BEFORE FIX: raw <script> appears in text (vulnerable — test would fail here)
|
||||
self.assertIn("<script>", sent_text,
|
||||
"HTML injection still present — title may NOT be escaped")
|
||||
self.assertIn("</script>", sent_text)
|
||||
|
||||
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
|
||||
@patch('browse.send_telegram_message')
|
||||
def test_send_to_telegram_ampersand_in_title(self, mock_send_msg):
|
||||
"""
|
||||
Ampersands in titles should be escaped as & when using HTML parse_mode.
|
||||
BEFORE fix: "&" appears raw in the HTML (vulnerable).
|
||||
AFTER fix: "&" appears as "&".
|
||||
"""
|
||||
mock_send_msg.return_value = 123
|
||||
|
||||
event_with_ampersand = {
|
||||
"title": "Team A & Team B vs Team C",
|
||||
"slug": "amp-test",
|
||||
"startTime": "2027-03-26T12:00:00Z",
|
||||
"markets": [{
|
||||
"sportsMarketType": "moneyline",
|
||||
"outcomes": '["Team A & Team B", "Team C"]',
|
||||
"outcomePrices": "[0.50, 0.50]",
|
||||
"bestBid": "0.49",
|
||||
"bestAsk": "0.51",
|
||||
"volume": 10000,
|
||||
"acceptingOrders": True,
|
||||
"closed": False,
|
||||
}],
|
||||
}
|
||||
|
||||
from browse import send_to_telegram
|
||||
send_to_telegram([event_with_ampersand], [], "Dota 2")
|
||||
|
||||
sent_text = mock_send_msg.call_args[0][2]
|
||||
|
||||
# AFTER FIX: & should be escaped as &
|
||||
# BEFORE FIX: raw & appears (vulnerable — test would fail here)
|
||||
self.assertIn("&", sent_text,
|
||||
"Ampersand not escaped — title may NOT be escaped")
|
||||
|
||||
|
||||
class TestTimeFunctions(unittest.TestCase):
|
||||
"""Tests for _get_time_data() unified helper.
|
||||
|
||||
These tests verify the helper returns correct time_status, time_urgency,
|
||||
and abs_time for various event scenarios. Callers extract the fields they
|
||||
need from the returned dict.
|
||||
"""
|
||||
|
||||
def _make_event(self, start_time):
|
||||
"""Helper to create a minimal event with a startTime."""
|
||||
return {"startTime": start_time}
|
||||
|
||||
def _frozen_dt(self, year, month, day, hour, minute, second=0):
|
||||
return datetime(year, month, day, hour, minute, second,
|
||||
tzinfo=timezone.utc)
|
||||
|
||||
def _mock_datetime(self, frozen):
|
||||
"""Return a mock datetime class that freezes now() to the given datetime."""
|
||||
class MockDatetime:
|
||||
@staticmethod
|
||||
def now(tz=None):
|
||||
if tz is None:
|
||||
return frozen
|
||||
return frozen.astimezone(tz)
|
||||
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||
def __call__(self, *a, **k):
|
||||
return datetime(*a, **k)
|
||||
return MockDatetime
|
||||
|
||||
# === _get_time_data core tests ===
|
||||
|
||||
def test_get_time_data_tbd(self):
|
||||
"""No startTime -> TBD/0urgency/abs TBD."""
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data({})
|
||||
self.assertEqual(td["time_status"], "TBD")
|
||||
self.assertEqual(td["time_urgency"], 0)
|
||||
self.assertEqual(td["abs_time"], "TBD")
|
||||
|
||||
def test_get_time_data_in_30m(self):
|
||||
"""Starts in 30 minutes -> 'In 30m', urgency 3."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:30:00Z"))
|
||||
self.assertEqual(td["time_status"], "In 30m")
|
||||
self.assertEqual(td["time_urgency"], 3)
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
|
||||
def test_get_time_data_in_6h(self):
|
||||
"""Starts in 6 hours -> 'In 6h', urgency 2."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T18:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "In 6h")
|
||||
self.assertEqual(td["time_urgency"], 2)
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
|
||||
def test_get_time_data_in_2d(self):
|
||||
"""Starts in 2 days -> 'In 2d', urgency 1."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-27T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "In 2d")
|
||||
self.assertEqual(td["time_urgency"], 1)
|
||||
|
||||
def test_get_time_data_live(self):
|
||||
"""Started 30 minutes ago -> 'LIVE', urgency 3."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 30, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "LIVE")
|
||||
self.assertEqual(td["time_urgency"], 3)
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
|
||||
def test_get_time_data_started_2h_ago(self):
|
||||
"""Started 2 hours ago -> 'LIVE 2h', urgency 3."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 14, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "LIVE 2h")
|
||||
self.assertEqual(td["time_urgency"], 3)
|
||||
|
||||
def test_get_time_data_started_12h_ago(self):
|
||||
"""Started 12 hours ago -> '12h ago', urgency 1."""
|
||||
frozen = self._frozen_dt(2026, 3, 26, 0, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "12h ago")
|
||||
self.assertEqual(td["time_urgency"], 1)
|
||||
|
||||
def test_get_time_data_started_2d_ago(self):
|
||||
"""Started 2 days ago -> '2d ago', urgency 0."""
|
||||
frozen = self._frozen_dt(2026, 3, 27, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "2d ago")
|
||||
self.assertEqual(td["time_urgency"], 0)
|
||||
|
||||
def test_get_time_data_abs_time_format(self):
|
||||
"""abs_time is formatted correctly in WIB."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
# 19:00 UTC = 02:00 WIB next day
|
||||
td = _get_time_data(self._make_event("2026-03-26T02:00:00Z"))
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
# UTC 12:00 -> WIB 19:00 same day
|
||||
td2 = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
|
||||
|
||||
|
||||
class TestFormatMatchEvent(unittest.TestCase):
|
||||
"""Tests for format_match_event() canonical dict."""
|
||||
|
||||
def _frozen_dt(self, year, month, day, hour, minute):
|
||||
return datetime(year, month, day, hour, minute,
|
||||
tzinfo=timezone.utc)
|
||||
|
||||
def _mock_datetime(self, frozen):
|
||||
class MockDatetime:
|
||||
@staticmethod
|
||||
def now(tz=None):
|
||||
if tz is None:
|
||||
return frozen
|
||||
return frozen.astimezone(tz)
|
||||
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||
def __call__(self, *a, **k):
|
||||
return datetime(*a, **k)
|
||||
return MockDatetime
|
||||
|
||||
def _make_event(self, title, ml_market=None, start_time="2026-03-25T18:00:00Z"):
|
||||
import json as _json
|
||||
e = {
|
||||
"title": title,
|
||||
"slug": "test-slug",
|
||||
"startTime": start_time,
|
||||
"markets": [],
|
||||
}
|
||||
if ml_market:
|
||||
e["markets"].append(ml_market)
|
||||
return e
|
||||
|
||||
def _make_ml_market(self, outcomes, prices, vol=50000):
|
||||
import json
|
||||
return {
|
||||
"sportsMarketType": "moneyline",
|
||||
"outcomes": json.dumps(outcomes),
|
||||
"outcomePrices": json.dumps(prices),
|
||||
"bestBid": str(float(prices[0]) - 0.01) if prices else "0.49",
|
||||
"bestAsk": str(float(prices[0]) + 0.01) if prices else "0.51",
|
||||
"volume": str(vol),
|
||||
"acceptingOrders": True,
|
||||
"closed": False,
|
||||
}
|
||||
|
||||
def test_fields_present(self):
|
||||
"""All canonical fields are present and non-null."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_match_event
|
||||
e = self._make_event(
|
||||
"Counter Strike: Team A vs Team B - ESL Pro League",
|
||||
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||
)
|
||||
fd = format_match_event(e)
|
||||
self.assertIn("title", fd)
|
||||
self.assertIn("title_clean", fd)
|
||||
self.assertIn("tournament", fd)
|
||||
self.assertIn("url", fd)
|
||||
self.assertIn("time_status", fd)
|
||||
self.assertIn("time_urgency", fd)
|
||||
self.assertIn("abs_time", fd)
|
||||
self.assertIn("team_a", fd)
|
||||
self.assertIn("team_b", fd)
|
||||
self.assertIn("odds_a", fd)
|
||||
self.assertIn("odds_b", fd)
|
||||
self.assertIn("vol", fd)
|
||||
|
||||
def test_title_clean_no_tournament(self):
|
||||
"""title_clean strips tournament suffix after ' - '."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_match_event
|
||||
e = self._make_event(
|
||||
"Counter Strike: Team A vs Team B - ESL Pro League",
|
||||
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||
)
|
||||
fd = format_match_event(e)
|
||||
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
|
||||
self.assertEqual(fd["tournament"], "ESL Pro League")
|
||||
|
||||
def test_title_clean_no_dash(self):
|
||||
"""title_clean is unchanged when no ' - ' separator."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_match_event
|
||||
e = self._make_event(
|
||||
"Counter Strike: Team A vs Team B",
|
||||
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||
)
|
||||
fd = format_match_event(e)
|
||||
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
|
||||
self.assertEqual(fd["tournament"], "")
|
||||
|
||||
def test_missing_ml(self):
|
||||
"""Returns valid dict with '?' fallbacks when no ML market."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_match_event
|
||||
e = self._make_event("Team A vs Team B")
|
||||
fd = format_match_event(e)
|
||||
self.assertEqual(fd["team_a"], "?")
|
||||
self.assertEqual(fd["team_b"], "?")
|
||||
self.assertEqual(fd["odds_a"], "?")
|
||||
self.assertEqual(fd["odds_b"], "?")
|
||||
self.assertEqual(fd["vol"], 0)
|
||||
|
||||
def test_missing_outcomes(self):
|
||||
"""Handles empty outcomes list gracefully."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_match_event
|
||||
e = self._make_event(
|
||||
"Team A vs Team B",
|
||||
self._make_ml_market([], []),
|
||||
)
|
||||
fd = format_match_event(e)
|
||||
self.assertEqual(fd["team_a"], "?")
|
||||
self.assertEqual(fd["team_b"], "?")
|
||||
|
||||
def test_time_data_passed_through(self):
|
||||
"""Time fields come from _get_time_data."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_match_event
|
||||
e = self._make_event(
|
||||
"Team A vs Team B",
|
||||
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||
start_time="2026-03-25T18:00:00Z", # 6h in future
|
||||
)
|
||||
fd = format_match_event(e)
|
||||
self.assertEqual(fd["time_status"], "In 6h")
|
||||
self.assertEqual(fd["time_urgency"], 2)
|
||||
self.assertIn("WIB", fd["abs_time"])
|
||||
|
||||
|
||||
class TestFormatNonMatchEvent(unittest.TestCase):
|
||||
"""Tests for format_non_match_event() canonical dict."""
|
||||
|
||||
def _frozen_dt(self, year, month, day, hour, minute):
|
||||
return datetime(year, month, day, hour, minute,
|
||||
tzinfo=timezone.utc)
|
||||
|
||||
def _mock_datetime(self, frozen):
|
||||
class MockDatetime:
|
||||
@staticmethod
|
||||
def now(tz=None):
|
||||
if tz is None:
|
||||
return frozen
|
||||
return frozen.astimezone(tz)
|
||||
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||
def __call__(self, *a, **k):
|
||||
return datetime(*a, **k)
|
||||
return MockDatetime
|
||||
|
||||
def test_fields_present(self):
|
||||
"""All canonical fields are present."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_non_match_event
|
||||
e = {
|
||||
"title": "Will it rain in Jakarta?",
|
||||
"slug": "rain-jakarta",
|
||||
"startTime": "2026-03-25T18:00:00Z",
|
||||
"markets": [
|
||||
{"volume": "10000"},
|
||||
{"volume": "5000"},
|
||||
],
|
||||
}
|
||||
fd = format_non_match_event(e)
|
||||
self.assertIn("title", fd)
|
||||
self.assertIn("url", fd)
|
||||
self.assertIn("time_status", fd)
|
||||
self.assertIn("time_urgency", fd)
|
||||
self.assertIn("abs_time", fd)
|
||||
self.assertIn("market_count", fd)
|
||||
self.assertIn("total_vol", fd)
|
||||
|
||||
def test_market_stats(self):
|
||||
"""market_count and total_vol computed correctly."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_non_match_event
|
||||
e = {
|
||||
"title": "Test",
|
||||
"slug": "test",
|
||||
"startTime": "2026-03-25T18:00:00Z",
|
||||
"markets": [
|
||||
{"volume": "10000"},
|
||||
{"volume": "5000"},
|
||||
],
|
||||
}
|
||||
fd = format_non_match_event(e)
|
||||
self.assertEqual(fd["market_count"], 2)
|
||||
self.assertEqual(fd["total_vol"], 15000)
|
||||
|
||||
def test_time_passed_through(self):
|
||||
"""Time fields come from _get_time_data."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import format_non_match_event
|
||||
e = {
|
||||
"title": "Test",
|
||||
"slug": "test",
|
||||
"startTime": "2026-03-25T18:00:00Z",
|
||||
"markets": [],
|
||||
}
|
||||
fd = format_non_match_event(e)
|
||||
self.assertEqual(fd["time_status"], "In 6h")
|
||||
|
||||
|
||||
class TestRenderMatchLines(unittest.TestCase):
|
||||
"""Tests for render_match_lines() text and HTML output."""
|
||||
|
||||
def test_text_mode_exact_lines(self):
|
||||
"""text mode produces expected plain text lines."""
|
||||
from browse import render_match_lines
|
||||
fd = {
|
||||
"title_clean": "Team A vs Team B",
|
||||
"url": "https://polymarket.com/market/test",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "In 6h",
|
||||
"vol": 50000,
|
||||
"tournament": "ESL Pro League",
|
||||
"team_a": "Team A",
|
||||
"team_b": "Team B",
|
||||
"odds_a": "55c",
|
||||
"odds_b": "45c",
|
||||
}
|
||||
lines = render_match_lines(fd, 1, mode="text")
|
||||
self.assertEqual(lines[0], "1. [Team A vs Team B](https://polymarket.com/market/test)")
|
||||
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
|
||||
self.assertEqual(lines[2], " Vol: $50,000")
|
||||
self.assertEqual(lines[3], " Tournament: ESL Pro League")
|
||||
self.assertEqual(lines[4], " Odds: Team A 55c | 45c Team B")
|
||||
|
||||
def test_text_mode_no_tournament(self):
|
||||
"""text mode omits Tournament line when tournament is empty."""
|
||||
from browse import render_match_lines
|
||||
fd = {
|
||||
"title_clean": "Team A vs Team B",
|
||||
"url": "https://polymarket.com/market/test",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "In 6h",
|
||||
"vol": 50000,
|
||||
"tournament": "",
|
||||
"team_a": "Team A",
|
||||
"team_b": "Team B",
|
||||
"odds_a": "55c",
|
||||
"odds_b": "45c",
|
||||
}
|
||||
lines = render_match_lines(fd, 2, mode="text")
|
||||
self.assertEqual(len(lines), 4)
|
||||
self.assertEqual(lines[0], "2. [Team A vs Team B](https://polymarket.com/market/test)")
|
||||
self.assertNotIn("Tournament", lines[3])
|
||||
|
||||
def test_html_mode_exact(self):
|
||||
"""html mode produces expected HTML lines with escape_html."""
|
||||
from browse import render_match_lines
|
||||
fd = {
|
||||
"title_clean": "Team A & Team B vs Team C",
|
||||
"url": "https://polymarket.com/market/test",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "LIVE",
|
||||
"vol": 50000,
|
||||
"tournament": "ESL Pro League",
|
||||
"team_a": "Team A & Team B",
|
||||
"team_b": "Team C",
|
||||
"odds_a": "55c",
|
||||
"odds_b": "45c",
|
||||
}
|
||||
lines = render_match_lines(fd, 1, mode="html")
|
||||
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/market/test\">Team A & Team B vs Team C</a>")
|
||||
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | LIVE")
|
||||
self.assertEqual(lines[2], " Vol: $50,000")
|
||||
self.assertEqual(lines[3], " Tournament: ESL Pro League")
|
||||
self.assertEqual(lines[4], " Odds: Team A & Team B 55c | 45c Team C")
|
||||
|
||||
def test_html_mode_xss_prevention(self):
|
||||
"""html mode escapes < and > to prevent XSS."""
|
||||
from browse import render_match_lines
|
||||
fd = {
|
||||
"title_clean": "<script>alert('xss')</script>",
|
||||
"url": "https://polymarket.com/market/test",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "LIVE",
|
||||
"vol": 1000,
|
||||
"tournament": "",
|
||||
"team_a": "Team A",
|
||||
"team_b": "Team B",
|
||||
"odds_a": "50c",
|
||||
"odds_b": "50c",
|
||||
}
|
||||
lines = render_match_lines(fd, 1, mode="html")
|
||||
self.assertIn("<script>", lines[0])
|
||||
self.assertNotIn("<script>", lines[0])
|
||||
|
||||
|
||||
class TestRenderNonMatchLines(unittest.TestCase):
|
||||
"""Tests for render_non_match_lines() text and HTML output."""
|
||||
|
||||
def test_text_mode_exact_lines(self):
|
||||
"""text mode produces expected plain text lines."""
|
||||
from browse import render_non_match_lines
|
||||
fd = {
|
||||
"title": "Will it rain in Jakarta?",
|
||||
"url": "https://polymarket.com/event/rain-jakarta",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "In 6h",
|
||||
"market_count": 3,
|
||||
"total_vol": 25000,
|
||||
}
|
||||
lines = render_non_match_lines(fd, 1, mode="text")
|
||||
self.assertEqual(lines[0], "1. [Will it rain in Jakarta?](https://polymarket.com/event/rain-jakarta)")
|
||||
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
|
||||
self.assertEqual(lines[2], " Markets: 3 | Total Vol: $25,000")
|
||||
|
||||
def test_html_mode_exact(self):
|
||||
"""html mode produces expected HTML lines with escape_html."""
|
||||
from browse import render_non_match_lines
|
||||
fd = {
|
||||
"title": "Rain <or> Sun?",
|
||||
"url": "https://polymarket.com/event/rain-sun",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "In 6h",
|
||||
"market_count": 2,
|
||||
"total_vol": 10000,
|
||||
}
|
||||
lines = render_non_match_lines(fd, 1, mode="html")
|
||||
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/event/rain-sun\">Rain <or> Sun?</a>")
|
||||
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
|
||||
self.assertEqual(lines[2], " Markets: 2 | Total Vol: $10,000")
|
||||
|
||||
|
||||
class TestPrintBrowseIntegration(unittest.TestCase):
|
||||
"""Integration tests for print_browse using the new pipeline."""
|
||||
|
||||
def _frozen_dt(self, year, month, day, hour, minute):
|
||||
return datetime(year, month, day, hour, minute,
|
||||
tzinfo=timezone.utc)
|
||||
|
||||
def _mock_datetime(self, frozen):
|
||||
class MockDatetime:
|
||||
@staticmethod
|
||||
def now(tz=None):
|
||||
if tz is None:
|
||||
return frozen
|
||||
return frozen.astimezone(tz)
|
||||
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||
def __call__(self, *a, **k):
|
||||
return datetime(*a, **k)
|
||||
return MockDatetime
|
||||
|
||||
@patch('builtins.print')
|
||||
def test_print_browse_uses_new_pipeline(self, mock_print):
|
||||
"""print_browse calls format_match_event and render_match_lines."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import print_browse
|
||||
match_events = [{
|
||||
"title": "Counter Strike: Team A vs Team B - ESL Pro League",
|
||||
"slug": "csa",
|
||||
"startTime": "2026-03-25T18:00:00Z",
|
||||
"markets": [{
|
||||
"sportsMarketType": "moneyline",
|
||||
"outcomes": '["Team A", "Team B"]',
|
||||
"outcomePrices": "[0.55, 0.45]",
|
||||
"bestBid": "0.54",
|
||||
"bestAsk": "0.56",
|
||||
"volume": "50000",
|
||||
"acceptingOrders": True,
|
||||
"closed": False,
|
||||
}],
|
||||
}]
|
||||
with patch('browse.format_match_event') as mock_fmt, \
|
||||
patch('browse.render_match_lines') as mock_render:
|
||||
mock_fmt.return_value = {
|
||||
"title_clean": "Team A vs Team B",
|
||||
"url": "https://polymarket.com/market/csa",
|
||||
"abs_time": "Mar 25, 19:00 WIB",
|
||||
"time_status": "In 6h",
|
||||
"vol": 50000,
|
||||
"tournament": "ESL Pro League",
|
||||
"team_a": "Team A",
|
||||
"team_b": "Team B",
|
||||
"odds_a": "55c",
|
||||
"odds_b": "45c",
|
||||
}
|
||||
mock_render.return_value = [
|
||||
"1. [Team A vs Team B](https://polymarket.com/market/csa)",
|
||||
" Mar 25, 19:00 WIB | In 6h",
|
||||
" Vol: $50,000",
|
||||
" Tournament: ESL Pro League",
|
||||
" Odds: Team A 55c | 45c Team B",
|
||||
]
|
||||
print_browse(match_events, [], "Counter Strike", 1, 1, 1, 0,
|
||||
non_matches_max=5)
|
||||
|
||||
mock_fmt.assert_called_once_with(match_events[0])
|
||||
mock_render.assert_called_once_with(mock_fmt.return_value, 1, mode="text")
|
||||
|
||||
@patch('builtins.print')
|
||||
def test_print_browse_matches_only(self, mock_print):
|
||||
"""matches_only suppresses non-match section."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import print_browse
|
||||
with patch('browse.format_non_match_event') as mock_non_fmt:
|
||||
print_browse([], [], "Counter Strike", 0, 0, 0, 0,
|
||||
non_matches_max=5, matches_only=True)
|
||||
mock_non_fmt.assert_not_called()
|
||||
|
||||
|
||||
class TestSendChunked(unittest.TestCase):
|
||||
"""Tests for send_chunked() helper."""
|
||||
|
||||
def test_small_message_sent_directly(self):
|
||||
"""Messages under 4096 chars go through without chunking."""
|
||||
sent_texts = []
|
||||
def fake_send(text):
|
||||
sent_texts.append(text)
|
||||
|
||||
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", "", "MATCH MARKETS", "", "1. test"]
|
||||
# This fits in one message
|
||||
from browse import send_chunked
|
||||
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
|
||||
show_matches=True, show_non_matches=False)
|
||||
self.assertEqual(len(sent_texts), 1)
|
||||
|
||||
def test_chunked_message_gets_cont_header(self):
|
||||
"""Messages over 4096 chars get continuation header."""
|
||||
sent_texts = []
|
||||
def fake_send(text):
|
||||
sent_texts.append(text)
|
||||
|
||||
# Build enough content to exceed 4096 chars
|
||||
# Each event line: ~260 chars. Need ~16 events + headers (~4200 chars)
|
||||
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", ""]
|
||||
for i in range(16):
|
||||
lines += [f"<b>{i+1}.</b> <a href=\"https://polymarket.com/market/{i}\">Team {'X' * 250}</a>", " Mar 25, 19:00 WIB | In 6h", " Vol: $50,000", " Odds: TeamA 55c | 45c TeamB", ""]
|
||||
lines.append("")
|
||||
|
||||
from browse import send_chunked
|
||||
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
|
||||
show_matches=True, show_non_matches=False)
|
||||
|
||||
# Should have sent more than one message (chunked)
|
||||
self.assertGreater(len(sent_texts), 1)
|
||||
# At least one continuation message
|
||||
cont_found = any("(cont.)" in t for t in sent_texts)
|
||||
self.assertTrue(cont_found, f"Expected at least one '(cont.)' message. Got {len(sent_texts)} messages.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user