Compare commits
8 Commits
hermes/her
...
d018e87b35
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d018e87b35 | ||
|
|
8cde441996 | ||
|
|
d0534aedbf | ||
| 2703b942c1 | |||
|
|
f9c4bac7b8 | ||
|
|
c49600cd4d | ||
|
|
3a988943b9 | ||
|
|
da367c594b |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@ __pycache__/
|
|||||||
*.pyc
|
*.pyc
|
||||||
*.pyo
|
*.pyo
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
.worktrees/
|
||||||
|
|||||||
@@ -4,11 +4,13 @@ Polymarket Event Browser
|
|||||||
Browse tradeable Polymarket events by game category.
|
Browse tradeable Polymarket events by game category.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import subprocess
|
import html
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import argparse
|
import argparse
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
from urllib.request import urlopen, Request
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# CONFIG
|
# CONFIG
|
||||||
@@ -17,6 +19,7 @@ from datetime import datetime, timezone, timedelta
|
|||||||
PAGE_SIZE = 50
|
PAGE_SIZE = 50
|
||||||
MAX_RETRIES = 5
|
MAX_RETRIES = 5
|
||||||
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
|
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
|
||||||
|
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
|
||||||
|
|
||||||
GAME_CATEGORIES = {
|
GAME_CATEGORIES = {
|
||||||
"All Esports": "Esports",
|
"All Esports": "Esports",
|
||||||
@@ -219,94 +222,79 @@ def format_spread(bid, ask):
|
|||||||
spread = ask - bid
|
spread = ask - bid
|
||||||
return f"{prob_to_cents(spread)}c"
|
return f"{prob_to_cents(spread)}c"
|
||||||
|
|
||||||
def get_match_time_status(e):
|
|
||||||
|
def _get_time_data(e, tz=None):
|
||||||
"""
|
"""
|
||||||
Return a human-readable match time status.
|
Unified time data extraction for event timestamps.
|
||||||
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
|
|
||||||
Uses startTime for actual match start time.
|
Uses startTime (preferred) or startDate as the event start time.
|
||||||
Displays times in WIB (UTC+7 for Indonesian users).
|
Datetime parsing and all relative calculations are UTC-based.
|
||||||
|
The tz parameter only affects the abs_time formatting.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
e: Event dict with 'startTime' or 'startDate' key.
|
||||||
|
tz: datetime.timezone for abs_time formatting.
|
||||||
|
Defaults to WIB (UTC+7).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{
|
||||||
|
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
|
||||||
|
"time_urgency": int, # 0-3 (higher = more urgent/live)
|
||||||
|
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
|
||||||
|
}
|
||||||
"""
|
"""
|
||||||
# Use startTime for actual match start, not startDate (which is market creation time)
|
tz = tz or WIB
|
||||||
start_str = e.get("startTime") or e.get("startDate", "")
|
start_str = e.get("startTime") or e.get("startDate", "")
|
||||||
|
|
||||||
if not start_str:
|
if not start_str:
|
||||||
return "TBD", 0
|
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||||
now_utc = datetime.now(timezone.utc)
|
now_utc = datetime.now(timezone.utc)
|
||||||
utc7 = timezone(timedelta(hours=7))
|
|
||||||
now = now_utc.astimezone(utc7)
|
|
||||||
start_utc7 = start_dt.astimezone(utc7)
|
|
||||||
|
|
||||||
delta = start_dt - now_utc
|
delta = start_dt - now_utc
|
||||||
|
total_sec = delta.total_seconds()
|
||||||
|
|
||||||
if delta.total_seconds() < 0:
|
if total_sec < 0:
|
||||||
# Started already
|
# Event is in the past
|
||||||
hours_ago = abs(delta.total_seconds()) / 3600
|
hours_ago = abs(total_sec) / 3600
|
||||||
if hours_ago < 1:
|
if hours_ago < 1:
|
||||||
return "LIVE", 3
|
time_status = "LIVE"
|
||||||
|
time_urgency = 3
|
||||||
elif hours_ago < 4:
|
elif hours_ago < 4:
|
||||||
return f"LIVE {int(hours_ago)}h", 3
|
time_status = f"LIVE {int(hours_ago)}h"
|
||||||
|
time_urgency = 3
|
||||||
elif hours_ago < 24:
|
elif hours_ago < 24:
|
||||||
return f"Started {int(hours_ago)}h ago", 1
|
time_status = f"{int(hours_ago)}h ago"
|
||||||
|
time_urgency = 1
|
||||||
else:
|
else:
|
||||||
days = int(hours_ago / 24)
|
days = int(hours_ago / 24)
|
||||||
return f"{days}d ago", 0
|
time_status = f"{days}d ago"
|
||||||
|
time_urgency = 0
|
||||||
else:
|
else:
|
||||||
# Starts in future
|
# Event is in the future
|
||||||
hours_until = delta.total_seconds() / 3600
|
if total_sec < 3600:
|
||||||
if hours_until <= 0:
|
mins = int(total_sec / 60)
|
||||||
return "LIVE", 3
|
time_status = f"In {mins}m"
|
||||||
elif hours_until < 1:
|
time_urgency = 3
|
||||||
mins = int(delta.total_seconds() / 60)
|
elif total_sec < 86400:
|
||||||
return f"In {mins}m", 3
|
hours_until = int(total_sec / 3600)
|
||||||
elif hours_until < 24:
|
time_status = f"In {hours_until}h"
|
||||||
return f"In {int(hours_until)}h", 2
|
time_urgency = 2
|
||||||
else:
|
else:
|
||||||
days = int(hours_until / 24)
|
days = int(total_sec / 86400)
|
||||||
return f"In {days}d", 1
|
time_status = f"In {days}d"
|
||||||
except:
|
time_urgency = 1
|
||||||
return "", 0
|
|
||||||
|
|
||||||
def get_match_time_str(e):
|
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ")
|
||||||
"""
|
if tz == WIB:
|
||||||
Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
|
abs_time += "WIB"
|
||||||
Uses startTime for actual match start time.
|
else:
|
||||||
"""
|
abs_time += start_dt.astimezone(tz).strftime("%Z")
|
||||||
start_str = e.get("startTime") or e.get("startDate", "")
|
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
|
||||||
if not start_str:
|
except Exception:
|
||||||
return "TBD"
|
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
|
||||||
try:
|
|
||||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
|
||||||
now_utc = datetime.now(timezone.utc)
|
|
||||||
delta = start_dt - now_utc
|
|
||||||
|
|
||||||
if delta.total_seconds() < 0:
|
|
||||||
hours_ago = abs(delta.total_seconds()) / 3600
|
|
||||||
if hours_ago < 1:
|
|
||||||
return "LIVE"
|
|
||||||
elif hours_ago < 4:
|
|
||||||
return f"LIVE {int(hours_ago)}h"
|
|
||||||
elif hours_ago < 24:
|
|
||||||
return f"{int(hours_ago)}h ago"
|
|
||||||
else:
|
|
||||||
days = int(hours_ago / 24)
|
|
||||||
return f"{days}d ago"
|
|
||||||
else:
|
|
||||||
hours_until = delta.total_seconds() / 3600
|
|
||||||
if hours_until <= 0:
|
|
||||||
return "LIVE"
|
|
||||||
elif hours_until < 1:
|
|
||||||
mins = int(delta.total_seconds() / 60)
|
|
||||||
return f"In {mins}m"
|
|
||||||
elif hours_until < 24:
|
|
||||||
return f"In {int(hours_until)}h"
|
|
||||||
else:
|
|
||||||
days = int(hours_until / 24)
|
|
||||||
return f"In {days}d"
|
|
||||||
except:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def filter_events(events, tradeable_only=True):
|
def filter_events(events, tradeable_only=True):
|
||||||
"""
|
"""
|
||||||
@@ -325,6 +313,7 @@ def filter_events(events, tradeable_only=True):
|
|||||||
|
|
||||||
return match_events, non_match_events
|
return match_events, non_match_events
|
||||||
|
|
||||||
|
|
||||||
def sort_events(events):
|
def sort_events(events):
|
||||||
return sorted(events, key=get_ml_volume, reverse=True)
|
return sorted(events, key=get_ml_volume, reverse=True)
|
||||||
|
|
||||||
@@ -349,7 +338,176 @@ def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# FORMAT
|
# FORMAT — EVENT
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def format_match_event(e):
|
||||||
|
"""
|
||||||
|
Format a match event into a canonical dict for rendering.
|
||||||
|
All computing done here; renderers just template.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{
|
||||||
|
"title": str, # raw title
|
||||||
|
"title_clean": str, # "Team A vs Team B"
|
||||||
|
"tournament": str, # "Tournament Name" or ""
|
||||||
|
"url": str,
|
||||||
|
"time_status": str, # "LIVE", "In 6h", "12h ago"
|
||||||
|
"time_urgency": int, # 0-3
|
||||||
|
"abs_time": str, # "Mar 25, 19:00 WIB"
|
||||||
|
"team_a": str,
|
||||||
|
"team_b": str,
|
||||||
|
"odds_a": str, # "55c"
|
||||||
|
"odds_b": str,
|
||||||
|
"vol": int,
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
ml = get_ml_market(e)
|
||||||
|
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
|
||||||
|
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
|
||||||
|
td = _get_time_data(e)
|
||||||
|
title = e.get("title", "")
|
||||||
|
|
||||||
|
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
||||||
|
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
||||||
|
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
||||||
|
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
||||||
|
|
||||||
|
if " - " in title:
|
||||||
|
title_clean = title.split(" - ")[0].strip()
|
||||||
|
else:
|
||||||
|
title_clean = title
|
||||||
|
|
||||||
|
tournament = get_tournament(title)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"title": title,
|
||||||
|
"title_clean": title_clean,
|
||||||
|
"tournament": tournament,
|
||||||
|
"url": get_event_url(e),
|
||||||
|
"time_status": td["time_status"],
|
||||||
|
"time_urgency": td["time_urgency"],
|
||||||
|
"abs_time": td["abs_time"],
|
||||||
|
"team_a": team_a,
|
||||||
|
"team_b": team_b,
|
||||||
|
"odds_a": odds_a,
|
||||||
|
"odds_b": odds_b,
|
||||||
|
"vol": get_ml_volume(e),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def format_non_match_event(e):
|
||||||
|
"""
|
||||||
|
Format a non-match event into a canonical dict for rendering.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{
|
||||||
|
"title": str,
|
||||||
|
"url": str,
|
||||||
|
"time_status": str,
|
||||||
|
"time_urgency": int,
|
||||||
|
"abs_time": str,
|
||||||
|
"market_count": int,
|
||||||
|
"total_vol": int,
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
td = _get_time_data(e)
|
||||||
|
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
||||||
|
market_count = len(e.get("markets", []))
|
||||||
|
|
||||||
|
return {
|
||||||
|
"title": e.get("title", "?"),
|
||||||
|
"url": get_event_url(e),
|
||||||
|
"time_status": td["time_status"],
|
||||||
|
"time_urgency": td["time_urgency"],
|
||||||
|
"abs_time": td["abs_time"],
|
||||||
|
"market_count": market_count,
|
||||||
|
"total_vol": int(total_vol),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# FORMAT — RENDER
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def render_match_lines(event_dict, i, mode):
|
||||||
|
"""
|
||||||
|
Render a formatted match event dict into lines of text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_dict: canonical dict from format_match_event()
|
||||||
|
i: 1-based index for the event number
|
||||||
|
mode: "text" for plain text/Markdown, "html" for Telegram HTML
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str], one line per element (no trailing blank line).
|
||||||
|
Caller adds the blank line separator between events.
|
||||||
|
"""
|
||||||
|
title_clean = event_dict["title_clean"]
|
||||||
|
url = event_dict["url"]
|
||||||
|
abs_time = event_dict["abs_time"]
|
||||||
|
time_status = event_dict["time_status"]
|
||||||
|
vol = event_dict["vol"]
|
||||||
|
tournament = event_dict["tournament"]
|
||||||
|
team_a = event_dict["team_a"]
|
||||||
|
team_b = event_dict["team_b"]
|
||||||
|
odds_a = event_dict["odds_a"]
|
||||||
|
odds_b = event_dict["odds_b"]
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
if mode == "html":
|
||||||
|
lines.append(
|
||||||
|
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
lines.append(f"{i}. [{title_clean}]({url})")
|
||||||
|
|
||||||
|
lines.append(f" {abs_time} | {time_status}")
|
||||||
|
lines.append(f" Vol: ${vol:,.0f}")
|
||||||
|
|
||||||
|
if tournament:
|
||||||
|
lines.append(f" Tournament: {tournament}")
|
||||||
|
|
||||||
|
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
|
||||||
|
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
def render_non_match_lines(event_dict, i, mode):
|
||||||
|
"""
|
||||||
|
Render a formatted non-match event dict into lines of text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_dict: canonical dict from format_non_match_event()
|
||||||
|
i: 1-based index for the event number
|
||||||
|
mode: "text" for plain text/Markdown, "html" for Telegram HTML
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str], one line per element (no trailing blank line).
|
||||||
|
"""
|
||||||
|
title = event_dict["title"]
|
||||||
|
url = event_dict["url"]
|
||||||
|
abs_time = event_dict["abs_time"]
|
||||||
|
time_status = event_dict["time_status"]
|
||||||
|
market_count = event_dict["market_count"]
|
||||||
|
total_vol = event_dict["total_vol"]
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
if mode == "html":
|
||||||
|
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
|
||||||
|
else:
|
||||||
|
lines.append(f"{i}. [{title}]({url})")
|
||||||
|
|
||||||
|
lines.append(f" {abs_time} | {time_status}")
|
||||||
|
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
||||||
|
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# FORMAT — LEGACY
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
def format_event(e):
|
def format_event(e):
|
||||||
@@ -359,12 +517,12 @@ def format_event(e):
|
|||||||
best_bid = float(ml.get("bestBid", 0)) if ml else 0
|
best_bid = float(ml.get("bestBid", 0)) if ml else 0
|
||||||
best_ask = float(ml.get("bestAsk", 0)) if ml else 0
|
best_ask = float(ml.get("bestAsk", 0)) if ml else 0
|
||||||
vol = get_ml_volume(e)
|
vol = get_ml_volume(e)
|
||||||
time_status, urgency = get_match_time_status(e)
|
td = _get_time_data(e)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"title": e.get("title", ""),
|
"title": e.get("title", ""),
|
||||||
"time_status": time_status,
|
"time_status": td["time_status"],
|
||||||
"time_urgency": urgency,
|
"time_urgency": td["time_urgency"],
|
||||||
"url": get_event_url(e),
|
"url": get_event_url(e),
|
||||||
"livestream": e.get("resolutionSource"),
|
"livestream": e.get("resolutionSource"),
|
||||||
"outcomes": outcomes,
|
"outcomes": outcomes,
|
||||||
@@ -383,11 +541,12 @@ def format_detail_event(e):
|
|||||||
]
|
]
|
||||||
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
|
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
|
||||||
|
|
||||||
time_status, urgency = get_match_time_status(e)
|
td = _get_time_data(e)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"title": e.get("title", ""),
|
"title": e.get("title", ""),
|
||||||
"time_status": time_status,
|
"time_status": td["time_status"],
|
||||||
|
"abs_time": td["abs_time"],
|
||||||
"url": get_event_url(e),
|
"url": get_event_url(e),
|
||||||
"livestream": e.get("resolutionSource"),
|
"livestream": e.get("resolutionSource"),
|
||||||
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
|
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
|
||||||
@@ -414,48 +573,6 @@ def format_detail_event(e):
|
|||||||
# DISPLAY
|
# DISPLAY
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
def get_start_time_wib(e):
|
|
||||||
"""Return (date_time_str, relative_str) for display."""
|
|
||||||
start_str = e.get("startTime") or e.get("startDate", "")
|
|
||||||
if not start_str:
|
|
||||||
return "TBD", ""
|
|
||||||
try:
|
|
||||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
|
||||||
now_utc = datetime.now(timezone.utc)
|
|
||||||
utc7 = timezone(timedelta(hours=7))
|
|
||||||
start_utc7 = start_dt.astimezone(utc7)
|
|
||||||
|
|
||||||
# Absolute: "Mar 25, 19:00 WIB"
|
|
||||||
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
|
|
||||||
|
|
||||||
# Relative: "In 5h", "In 10h", "LIVE", etc.
|
|
||||||
delta = start_dt - now_utc
|
|
||||||
if delta.total_seconds() < 0:
|
|
||||||
hours_ago = abs(delta.total_seconds()) / 3600
|
|
||||||
if hours_ago < 1:
|
|
||||||
rel_str = "LIVE"
|
|
||||||
elif hours_ago < 24:
|
|
||||||
rel_str = f"{int(hours_ago)}h ago"
|
|
||||||
else:
|
|
||||||
days = int(hours_ago / 24)
|
|
||||||
rel_str = f"{days}d ago"
|
|
||||||
else:
|
|
||||||
hours_until = delta.total_seconds() / 3600
|
|
||||||
if hours_until <= 0:
|
|
||||||
rel_str = "LIVE"
|
|
||||||
elif hours_until < 1:
|
|
||||||
mins_until = int(delta.total_seconds() / 60)
|
|
||||||
rel_str = f"In {mins_until}m"
|
|
||||||
elif hours_until < 24:
|
|
||||||
rel_str = f"In {int(hours_until)}h"
|
|
||||||
else:
|
|
||||||
days = int(hours_until / 24)
|
|
||||||
rel_str = f"In {days}d"
|
|
||||||
|
|
||||||
return abs_str, rel_str
|
|
||||||
except:
|
|
||||||
return "TBD", ""
|
|
||||||
|
|
||||||
def get_header_date():
|
def get_header_date():
|
||||||
"""Return current date string like 'Mar 25, 2026'"""
|
"""Return current date string like 'Mar 25, 2026'"""
|
||||||
now_utc = datetime.now(timezone.utc)
|
now_utc = datetime.now(timezone.utc)
|
||||||
@@ -486,9 +603,8 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
|||||||
if partial:
|
if partial:
|
||||||
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
|
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
|
||||||
|
|
||||||
# --- MATCH MARKETS ---
|
# Determine sections to show
|
||||||
if not matches_only and not non_matches_only:
|
if not matches_only and not non_matches_only:
|
||||||
# Default: show both
|
|
||||||
show_matches = True
|
show_matches = True
|
||||||
show_non_matches = True
|
show_non_matches = True
|
||||||
elif matches_only:
|
elif matches_only:
|
||||||
@@ -498,68 +614,31 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
|||||||
show_matches = False
|
show_matches = False
|
||||||
show_non_matches = True
|
show_non_matches = True
|
||||||
|
|
||||||
|
# Match events
|
||||||
if show_matches:
|
if show_matches:
|
||||||
print(f"\nMATCH MARKETS")
|
print("\nMATCH MARKETS")
|
||||||
if not match_events:
|
if not match_events:
|
||||||
print(" No match markets found.")
|
print(" No match markets found.")
|
||||||
else:
|
else:
|
||||||
for i, e in enumerate(match_events, 1):
|
for i, e in enumerate(match_events, 1):
|
||||||
f = format_event(e)
|
fd = format_match_event(e)
|
||||||
ml = get_ml_market(e)
|
for line in render_match_lines(fd, i, mode="text"):
|
||||||
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
|
print(line)
|
||||||
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
|
|
||||||
vol = f["volume"]
|
|
||||||
title = f["title"]
|
|
||||||
url = f["url"]
|
|
||||||
start_time_wib, rel_time = get_start_time_wib(e)
|
|
||||||
|
|
||||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
# Non-match events
|
||||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
|
||||||
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
|
||||||
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
|
||||||
|
|
||||||
if " - " in title:
|
|
||||||
title_clean = title.split(" - ")[0].strip()
|
|
||||||
else:
|
|
||||||
title_clean = title
|
|
||||||
|
|
||||||
tournament = get_tournament(title)
|
|
||||||
|
|
||||||
print(f"\n {i}. [{title_clean}]({url})")
|
|
||||||
print(f" {start_time_wib} | {rel_time}")
|
|
||||||
print(f" Vol: ${vol:,.0f}")
|
|
||||||
if tournament:
|
|
||||||
print(f" Tournament: {tournament}")
|
|
||||||
print(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
|
|
||||||
|
|
||||||
# --- NON-MATCH MARKETS ---
|
|
||||||
if show_non_matches and non_match_events:
|
if show_non_matches and non_match_events:
|
||||||
print(f"\nNON-MATCH MARKETS")
|
print("\nNON-MATCH MARKETS")
|
||||||
|
|
||||||
for i, e in enumerate(non_match_events[:non_matches_max], 1):
|
for i, e in enumerate(non_match_events[:non_matches_max], 1):
|
||||||
title = e.get("title", "?")
|
fd = format_non_match_event(e)
|
||||||
url = get_event_url(e)
|
for line in render_non_match_lines(fd, i, mode="text"):
|
||||||
start_time_wib, rel_time = get_start_time_wib(e)
|
print(line)
|
||||||
|
|
||||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
|
||||||
market_count = len(e.get("markets", []))
|
|
||||||
|
|
||||||
print(f"\n {i}. [{title}]({url})")
|
|
||||||
print(f" {start_time_wib} | {rel_time}")
|
|
||||||
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
|
||||||
|
|
||||||
def print_detail(e, detail):
|
def print_detail(e, detail):
|
||||||
from datetime import datetime, timezone, timedelta
|
|
||||||
now_utc = datetime.now(timezone.utc)
|
|
||||||
utc7 = timezone(timedelta(hours=7))
|
|
||||||
now_utc7 = now_utc.astimezone(utc7)
|
|
||||||
|
|
||||||
print(f"\n{detail['title']}")
|
print(f"\n{detail['title']}")
|
||||||
print(f"URL: {detail['url']}")
|
print(f"URL: {detail['url']}")
|
||||||
print(f"Livestream: {detail['livestream']}")
|
print(f"Livestream: {detail['livestream']}")
|
||||||
|
|
||||||
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
|
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
|
||||||
time_str = get_match_time_str(e)
|
|
||||||
print(f"\n{detail['time_status']}")
|
print(f"\n{detail['time_status']}")
|
||||||
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
|
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
|
||||||
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
|
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
|
||||||
@@ -576,14 +655,44 @@ def print_detail(e, detail):
|
|||||||
# TELEGRAM
|
# TELEGRAM
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
|
def escape_html(text):
|
||||||
|
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
|
||||||
|
return (text
|
||||||
|
.replace("&", "&")
|
||||||
|
.replace("<", "<")
|
||||||
|
.replace(">", ">")
|
||||||
|
.replace('"', """))
|
||||||
|
|
||||||
|
|
||||||
|
def send_telegram_message(bot_token, chat_id, text, timeout=10):
|
||||||
|
"""Send a message via Telegram bot API. Returns the message ID on success.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
|
||||||
|
URLError/HTTPError: On network or HTTP-level failures.
|
||||||
|
"""
|
||||||
|
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||||
|
data = urlencode({
|
||||||
|
"chat_id": chat_id,
|
||||||
|
"text": text,
|
||||||
|
"parse_mode": "HTML",
|
||||||
|
"disable_web_page_preview": "true",
|
||||||
|
}).encode("utf-8")
|
||||||
|
req = Request(url, data=data, method="POST")
|
||||||
|
with urlopen(req, timeout=timeout) as resp:
|
||||||
|
result = json.loads(resp.read())
|
||||||
|
if not result.get("ok"):
|
||||||
|
raise RuntimeError(f"Telegram API error: {result.get('description')}")
|
||||||
|
return result["result"]["message_id"]
|
||||||
|
|
||||||
|
|
||||||
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
|
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
|
||||||
"""Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
|
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
|
||||||
import os
|
import os
|
||||||
bot_token = os.environ.get("BOT_TOKEN")
|
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
|
||||||
chat_id = os.environ.get("CHAT_ID")
|
chat_id = os.environ.get("CHAT_ID")
|
||||||
if not bot_token or not chat_id:
|
if not bot_token or not chat_id:
|
||||||
print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
|
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
|
||||||
return
|
|
||||||
|
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
now_utc = datetime.now(timezone.utc)
|
now_utc = datetime.now(timezone.utc)
|
||||||
@@ -596,92 +705,71 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
|||||||
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
|
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
|
||||||
|
|
||||||
def send(text):
|
def send(text):
|
||||||
result = subprocess.run(
|
msg_id = send_telegram_message(bot_token, chat_id, text)
|
||||||
["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
|
print(f" Sent msg {msg_id}")
|
||||||
"-d", f"chat_id={chat_id}",
|
|
||||||
"-d", f"text={text}",
|
|
||||||
"-d", "parse_mode=HTML",
|
|
||||||
"-d", "disable_web_page_preview=true"],
|
|
||||||
capture_output=True
|
|
||||||
)
|
|
||||||
resp = json.loads(result.stdout.decode())
|
|
||||||
if resp.get("ok"):
|
|
||||||
print(f" Sent msg {resp['result']['message_id']}")
|
|
||||||
else:
|
|
||||||
print(f" Error: {resp.get('description')}")
|
|
||||||
|
|
||||||
# Build sections
|
# Build lines
|
||||||
lines = [f"<b>{category.upper()}</b> | {header_date}"]
|
lines = [f"<b>{category.upper()}</b> | {header_date}", ""]
|
||||||
lines.append("")
|
|
||||||
|
|
||||||
if show_matches:
|
if show_matches:
|
||||||
lines.append("MATCH MARKETS")
|
lines += ["MATCH MARKETS", ""]
|
||||||
lines.append("")
|
|
||||||
if not match_events:
|
if not match_events:
|
||||||
lines.append(" No match markets found.")
|
lines.append(" No match markets found.")
|
||||||
else:
|
else:
|
||||||
for i, e in enumerate(match_events, 1):
|
for i, e in enumerate(match_events, 1):
|
||||||
ml = get_ml_market(e)
|
fd = format_match_event(e)
|
||||||
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
|
lines += render_match_lines(fd, i, mode="html")
|
||||||
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
|
|
||||||
vol = get_ml_volume(e)
|
|
||||||
title = e.get("title", "?")
|
|
||||||
url = get_event_url(e)
|
|
||||||
start_time_wib, rel_time = get_start_time_wib(e)
|
|
||||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
|
||||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
|
||||||
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
|
||||||
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
|
||||||
tournament = get_tournament(title)
|
|
||||||
title_clean = title.split(" - ")[0].strip() if " - " in title else title
|
|
||||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
|
|
||||||
lines.append(f" {start_time_wib} | {rel_time}")
|
|
||||||
lines.append(f" Vol: ${vol:,.0f}")
|
|
||||||
if tournament:
|
|
||||||
lines.append(f" Tournament: {tournament}")
|
|
||||||
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
|
|
||||||
lines.append("")
|
lines.append("")
|
||||||
lines.append("")
|
lines.append("")
|
||||||
|
|
||||||
if show_non_matches:
|
if show_non_matches:
|
||||||
lines.append("NON-MATCH MARKETS")
|
lines += ["NON-MATCH MARKETS", ""]
|
||||||
lines.append("")
|
|
||||||
if not non_match_events:
|
if not non_match_events:
|
||||||
lines.append(" No non-match markets found.")
|
lines.append(" No non-match markets found.")
|
||||||
else:
|
else:
|
||||||
for i, e in enumerate(non_match_events, 1):
|
for i, e in enumerate(non_match_events, 1):
|
||||||
title = e.get("title", "?")
|
fd = format_non_match_event(e)
|
||||||
url = get_event_url(e)
|
lines += render_non_match_lines(fd, i, mode="html")
|
||||||
start_time_wib, rel_time = get_start_time_wib(e)
|
lines.append("")
|
||||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
|
||||||
market_count = len(e.get("markets", []))
|
|
||||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
|
|
||||||
lines.append(f" {start_time_wib} | {rel_time}")
|
|
||||||
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
|
||||||
lines.append("")
|
lines.append("")
|
||||||
|
|
||||||
# Chunk by 10 items (events), respecting 4096 char Telegram limit
|
# Chunk and send
|
||||||
text = "\n".join(lines)
|
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
|
||||||
|
|
||||||
|
|
||||||
|
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
|
||||||
|
"""
|
||||||
|
Split already-built lines into Telegram-safe chunks and send them.
|
||||||
|
|
||||||
|
Telegram messages are capped at 4096 chars. Chunks are grouped by
|
||||||
|
section header so no event is split across messages.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
all_lines: Full message lines list (built by caller).
|
||||||
|
send_fn: Closure that sends a single string and prints confirmation.
|
||||||
|
category: Category name for header.
|
||||||
|
header_date: Date string for header.
|
||||||
|
show_matches: Whether MATCH MARKETS section is present.
|
||||||
|
show_non_matches: Whether NON-MATCH MARKETS section is present.
|
||||||
|
"""
|
||||||
|
text = "\n".join(all_lines)
|
||||||
if len(text) <= 4096:
|
if len(text) <= 4096:
|
||||||
send(text)
|
send_fn(text)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Split into chunks of 10 events
|
# Split into chunks of 10 events, respecting section headers
|
||||||
all_items = []
|
all_items = []
|
||||||
in_match = True
|
in_match = True
|
||||||
for line in lines:
|
for line in all_lines:
|
||||||
if line == "MATCH MARKETS":
|
if line == "MATCH MARKETS":
|
||||||
in_match = True
|
in_match = True
|
||||||
elif line == "NON-MATCH MARKETS":
|
elif line == "NON-MATCH MARKETS":
|
||||||
in_match = False
|
in_match = False
|
||||||
elif line.startswith("<b>") and ". " in line and "</a>" in line:
|
elif line.startswith("<b>") and "</a>" in line:
|
||||||
|
# Event title line: <b>1.</b> <a href="...">Title</a>
|
||||||
all_items.append((in_match, line))
|
all_items.append((in_match, line))
|
||||||
|
|
||||||
chunk = []
|
chunk = []
|
||||||
chunk_len = 0
|
|
||||||
chunk_num = 1
|
|
||||||
|
|
||||||
# Header is always first
|
|
||||||
header = f"<b>{category.upper()}</b> | {header_date}\n"
|
header = f"<b>{category.upper()}</b> | {header_date}\n"
|
||||||
if show_matches:
|
if show_matches:
|
||||||
header += "\nMATCH MARKETS\n\n"
|
header += "\nMATCH MARKETS\n\n"
|
||||||
@@ -692,9 +780,8 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
|||||||
test_chunk = chunk + [item_line, ""]
|
test_chunk = chunk + [item_line, ""]
|
||||||
test_text = header + "\n".join(chunk) + "\n".join(test_chunk)
|
test_text = header + "\n".join(chunk) + "\n".join(test_chunk)
|
||||||
if len(test_text) > 4096 or len(chunk) >= 10:
|
if len(test_text) > 4096 or len(chunk) >= 10:
|
||||||
# Send current chunk
|
|
||||||
msg = header + "\n".join(chunk)
|
msg = header + "\n".join(chunk)
|
||||||
send(msg)
|
send_fn(msg)
|
||||||
chunk = [item_line, ""]
|
chunk = [item_line, ""]
|
||||||
header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n"
|
header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n"
|
||||||
if show_matches and is_match:
|
if show_matches and is_match:
|
||||||
@@ -706,7 +793,7 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
|||||||
|
|
||||||
if chunk:
|
if chunk:
|
||||||
msg = header + "\n".join(chunk)
|
msg = header + "\n".join(chunk)
|
||||||
send(msg)
|
send_fn(msg)
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
@@ -737,7 +824,7 @@ def main():
|
|||||||
parser.add_argument("--raw", action="store_true",
|
parser.add_argument("--raw", action="store_true",
|
||||||
help="Show all events without tradeable filter (for debugging).")
|
help="Show all events without tradeable filter (for debugging).")
|
||||||
parser.add_argument("--telegram", action="store_true",
|
parser.add_argument("--telegram", action="store_true",
|
||||||
help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
|
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.list_categories:
|
if args.list_categories:
|
||||||
|
|||||||
1
skills/polymarket-browse/tests/__init__.py
Normal file
1
skills/polymarket-browse/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# Tests package
|
||||||
776
skills/polymarket-browse/tests/test_browse.py
Normal file
776
skills/polymarket-browse/tests/test_browse.py
Normal file
@@ -0,0 +1,776 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for browse.py Telegram functions.
|
||||||
|
|
||||||
|
Run with: python -m pytest tests/test_browse.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
|
||||||
|
from browse import send_telegram_message
|
||||||
|
|
||||||
|
|
||||||
|
class TestSendTelegramMessage(unittest.TestCase):
|
||||||
|
"""Tests for the module-level send_telegram_message function."""
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_success(self, mock_urlopen):
|
||||||
|
"""send_telegram_message returns message_id on success."""
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 123}}'
|
||||||
|
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||||
|
|
||||||
|
result = send_telegram_message("test_token", "test_chat", "hello world")
|
||||||
|
|
||||||
|
self.assertEqual(result, 123)
|
||||||
|
mock_urlopen.assert_called_once()
|
||||||
|
call_args = mock_urlopen.call_args
|
||||||
|
req = call_args[0][0]
|
||||||
|
self.assertEqual(req.full_url, "https://api.telegram.org/bottest_token/sendMessage")
|
||||||
|
self.assertEqual(req.method, "POST")
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_api_error_raises_runtime_error(self, mock_urlopen):
|
||||||
|
"""send_telegram_message raises RuntimeError when Telegram API returns ok=false."""
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = b'{"ok": false, "description": "Forbidden"}'
|
||||||
|
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||||
|
|
||||||
|
with self.assertRaises(RuntimeError) as ctx:
|
||||||
|
send_telegram_message("test_token", "test_chat", "hello")
|
||||||
|
self.assertIn("Telegram API error: Forbidden", str(ctx.exception))
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_invalid_token_raises_http_error(self, mock_urlopen):
|
||||||
|
"""send_telegram_message raises HTTPError on invalid token (404)."""
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
mock_urlopen.side_effect = HTTPError(
|
||||||
|
url="https://api.telegram.org/botINVALID/sendMessage",
|
||||||
|
code=404,
|
||||||
|
msg="Not Found",
|
||||||
|
hdrs={},
|
||||||
|
fp=None
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.assertRaises(HTTPError) as ctx:
|
||||||
|
send_telegram_message("INVALID", "test_chat", "hello")
|
||||||
|
self.assertEqual(ctx.exception.code, 404)
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_rate_limit_raises_http_error(self, mock_urlopen):
|
||||||
|
"""send_telegram_message raises HTTPError on rate limit (429)."""
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
mock_urlopen.side_effect = HTTPError(
|
||||||
|
url="https://api.telegram.org/bottest_token/sendMessage",
|
||||||
|
code=429,
|
||||||
|
msg="Too Many Requests",
|
||||||
|
hdrs={},
|
||||||
|
fp=None
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.assertRaises(HTTPError) as ctx:
|
||||||
|
send_telegram_message("test_token", "test_chat", "hello")
|
||||||
|
self.assertEqual(ctx.exception.code, 429)
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_network_error_raises_url_error(self, mock_urlopen):
|
||||||
|
"""send_telegram_message raises URLError on network failure."""
|
||||||
|
from urllib.error import URLError
|
||||||
|
mock_urlopen.side_effect = URLError("Connection refused")
|
||||||
|
|
||||||
|
with self.assertRaises(URLError) as ctx:
|
||||||
|
send_telegram_message("test_token", "test_chat", "hello")
|
||||||
|
self.assertIn("Connection refused", str(ctx.exception))
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_timeout_raises_url_error(self, mock_urlopen):
|
||||||
|
"""send_telegram_message raises URLError on timeout."""
|
||||||
|
from urllib.error import URLError
|
||||||
|
mock_urlopen.side_effect = URLError("<urlopen error TimeoutError: timed out>")
|
||||||
|
|
||||||
|
with self.assertRaises(URLError):
|
||||||
|
send_telegram_message("test_token", "test_chat", "hello")
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_custom_timeout_used(self, mock_urlopen):
|
||||||
|
"""send_telegram_message respects custom timeout parameter."""
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 456}}'
|
||||||
|
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||||
|
|
||||||
|
send_telegram_message("test_token", "test_chat", "hello", timeout=30)
|
||||||
|
|
||||||
|
call_kwargs = mock_urlopen.call_args[1]
|
||||||
|
self.assertEqual(call_kwargs['timeout'], 30)
|
||||||
|
|
||||||
|
@patch('browse.urlopen')
|
||||||
|
def test_send_html_parsing_mode(self, mock_urlopen):
|
||||||
|
"""send_telegram_message sends with parse_mode=HTML."""
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 789}}'
|
||||||
|
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||||
|
|
||||||
|
send_telegram_message("test_token", "test_chat", "<b>bold</b>")
|
||||||
|
|
||||||
|
call_args = mock_urlopen.call_args
|
||||||
|
req = call_args[0][0]
|
||||||
|
# Verify parse_mode=HTML is in the data
|
||||||
|
self.assertIn(b"parse_mode=HTML", req.data)
|
||||||
|
|
||||||
|
|
||||||
|
class TestHtmlInjection(unittest.TestCase):
|
||||||
|
"""Tests for HTML injection prevention in Telegram messages."""
|
||||||
|
|
||||||
|
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
|
||||||
|
@patch('browse.send_telegram_message')
|
||||||
|
def test_send_to_telegram_html_injection_in_match_title(self, mock_send_msg):
|
||||||
|
"""
|
||||||
|
titles in match events are NOT escaped before inserting into HTML.
|
||||||
|
This test FAILS if HTML chars are unescaped (vulnerable),
|
||||||
|
and PASSES once escape_html() is implemented.
|
||||||
|
"""
|
||||||
|
mock_send_msg.return_value = 123
|
||||||
|
|
||||||
|
# Simulate a Polymarket event with HTML injection in the title
|
||||||
|
malicious_event = {
|
||||||
|
"title": "<script>alert('XSS')</script> - Team A vs Team B",
|
||||||
|
"slug": "test-event",
|
||||||
|
"startTime": "2027-03-26T12:00:00Z",
|
||||||
|
"markets": [{
|
||||||
|
"sportsMarketType": "moneyline",
|
||||||
|
"outcomes": '["Team A", "Team B"]',
|
||||||
|
"outcomePrices": "[0.55, 0.45]",
|
||||||
|
"bestBid": "0.54",
|
||||||
|
"bestAsk": "0.56",
|
||||||
|
"volume": 50000,
|
||||||
|
"acceptingOrders": True,
|
||||||
|
"closed": False,
|
||||||
|
}],
|
||||||
|
}
|
||||||
|
|
||||||
|
from browse import send_to_telegram
|
||||||
|
send_to_telegram([malicious_event], [], "Counter Strike")
|
||||||
|
|
||||||
|
# Check what was passed to send_telegram_message
|
||||||
|
self.assertEqual(mock_send_msg.called, True)
|
||||||
|
sent_text = mock_send_msg.call_args[0][2] # text arg (3rd positional)
|
||||||
|
|
||||||
|
# AFTER FIX: <script> should be escaped as <script>
|
||||||
|
# BEFORE FIX: raw <script> appears in text (vulnerable — test would fail here)
|
||||||
|
self.assertIn("<script>", sent_text,
|
||||||
|
"HTML injection still present — title may NOT be escaped")
|
||||||
|
self.assertIn("</script>", sent_text)
|
||||||
|
|
||||||
|
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
|
||||||
|
@patch('browse.send_telegram_message')
|
||||||
|
def test_send_to_telegram_ampersand_in_title(self, mock_send_msg):
|
||||||
|
"""
|
||||||
|
Ampersands in titles should be escaped as & when using HTML parse_mode.
|
||||||
|
BEFORE fix: "&" appears raw in the HTML (vulnerable).
|
||||||
|
AFTER fix: "&" appears as "&".
|
||||||
|
"""
|
||||||
|
mock_send_msg.return_value = 123
|
||||||
|
|
||||||
|
event_with_ampersand = {
|
||||||
|
"title": "Team A & Team B vs Team C",
|
||||||
|
"slug": "amp-test",
|
||||||
|
"startTime": "2027-03-26T12:00:00Z",
|
||||||
|
"markets": [{
|
||||||
|
"sportsMarketType": "moneyline",
|
||||||
|
"outcomes": '["Team A & Team B", "Team C"]',
|
||||||
|
"outcomePrices": "[0.50, 0.50]",
|
||||||
|
"bestBid": "0.49",
|
||||||
|
"bestAsk": "0.51",
|
||||||
|
"volume": 10000,
|
||||||
|
"acceptingOrders": True,
|
||||||
|
"closed": False,
|
||||||
|
}],
|
||||||
|
}
|
||||||
|
|
||||||
|
from browse import send_to_telegram
|
||||||
|
send_to_telegram([event_with_ampersand], [], "Dota 2")
|
||||||
|
|
||||||
|
sent_text = mock_send_msg.call_args[0][2]
|
||||||
|
|
||||||
|
# AFTER FIX: & should be escaped as &
|
||||||
|
# BEFORE FIX: raw & appears (vulnerable — test would fail here)
|
||||||
|
self.assertIn("&", sent_text,
|
||||||
|
"Ampersand not escaped — title may NOT be escaped")
|
||||||
|
|
||||||
|
|
||||||
|
class TestTimeFunctions(unittest.TestCase):
|
||||||
|
"""Tests for _get_time_data() unified helper.
|
||||||
|
|
||||||
|
These tests verify the helper returns correct time_status, time_urgency,
|
||||||
|
and abs_time for various event scenarios. Callers extract the fields they
|
||||||
|
need from the returned dict.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _make_event(self, start_time):
|
||||||
|
"""Helper to create a minimal event with a startTime."""
|
||||||
|
return {"startTime": start_time}
|
||||||
|
|
||||||
|
def _frozen_dt(self, year, month, day, hour, minute, second=0):
|
||||||
|
return datetime(year, month, day, hour, minute, second,
|
||||||
|
tzinfo=timezone.utc)
|
||||||
|
|
||||||
|
def _mock_datetime(self, frozen):
|
||||||
|
"""Return a mock datetime class that freezes now() to the given datetime."""
|
||||||
|
class MockDatetime:
|
||||||
|
@staticmethod
|
||||||
|
def now(tz=None):
|
||||||
|
if tz is None:
|
||||||
|
return frozen
|
||||||
|
return frozen.astimezone(tz)
|
||||||
|
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||||
|
def __call__(self, *a, **k):
|
||||||
|
return datetime(*a, **k)
|
||||||
|
return MockDatetime
|
||||||
|
|
||||||
|
# === _get_time_data core tests ===
|
||||||
|
|
||||||
|
def test_get_time_data_tbd(self):
|
||||||
|
"""No startTime -> TBD/0urgency/abs TBD."""
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data({})
|
||||||
|
self.assertEqual(td["time_status"], "TBD")
|
||||||
|
self.assertEqual(td["time_urgency"], 0)
|
||||||
|
self.assertEqual(td["abs_time"], "TBD")
|
||||||
|
|
||||||
|
def test_get_time_data_in_30m(self):
|
||||||
|
"""Starts in 30 minutes -> 'In 30m', urgency 3."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-25T12:30:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "In 30m")
|
||||||
|
self.assertEqual(td["time_urgency"], 3)
|
||||||
|
self.assertIn("WIB", td["abs_time"])
|
||||||
|
|
||||||
|
def test_get_time_data_in_6h(self):
|
||||||
|
"""Starts in 6 hours -> 'In 6h', urgency 2."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-25T18:00:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "In 6h")
|
||||||
|
self.assertEqual(td["time_urgency"], 2)
|
||||||
|
self.assertIn("WIB", td["abs_time"])
|
||||||
|
|
||||||
|
def test_get_time_data_in_2d(self):
|
||||||
|
"""Starts in 2 days -> 'In 2d', urgency 1."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-27T12:00:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "In 2d")
|
||||||
|
self.assertEqual(td["time_urgency"], 1)
|
||||||
|
|
||||||
|
def test_get_time_data_live(self):
|
||||||
|
"""Started 30 minutes ago -> 'LIVE', urgency 3."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 30, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "LIVE")
|
||||||
|
self.assertEqual(td["time_urgency"], 3)
|
||||||
|
self.assertIn("WIB", td["abs_time"])
|
||||||
|
|
||||||
|
def test_get_time_data_started_2h_ago(self):
|
||||||
|
"""Started 2 hours ago -> 'LIVE 2h', urgency 3."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 14, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "LIVE 2h")
|
||||||
|
self.assertEqual(td["time_urgency"], 3)
|
||||||
|
|
||||||
|
def test_get_time_data_started_12h_ago(self):
|
||||||
|
"""Started 12 hours ago -> '12h ago', urgency 1."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 26, 0, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "12h ago")
|
||||||
|
self.assertEqual(td["time_urgency"], 1)
|
||||||
|
|
||||||
|
def test_get_time_data_started_2d_ago(self):
|
||||||
|
"""Started 2 days ago -> '2d ago', urgency 0."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 27, 12, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||||
|
self.assertEqual(td["time_status"], "2d ago")
|
||||||
|
self.assertEqual(td["time_urgency"], 0)
|
||||||
|
|
||||||
|
def test_get_time_data_abs_time_format(self):
|
||||||
|
"""abs_time is formatted correctly in WIB."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import _get_time_data
|
||||||
|
# 19:00 UTC = 02:00 WIB next day
|
||||||
|
td = _get_time_data(self._make_event("2026-03-26T02:00:00Z"))
|
||||||
|
self.assertIn("WIB", td["abs_time"])
|
||||||
|
# UTC 12:00 -> WIB 19:00 same day
|
||||||
|
td2 = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||||
|
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
|
||||||
|
|
||||||
|
|
||||||
|
class TestFormatMatchEvent(unittest.TestCase):
|
||||||
|
"""Tests for format_match_event() canonical dict."""
|
||||||
|
|
||||||
|
def _frozen_dt(self, year, month, day, hour, minute):
|
||||||
|
return datetime(year, month, day, hour, minute,
|
||||||
|
tzinfo=timezone.utc)
|
||||||
|
|
||||||
|
def _mock_datetime(self, frozen):
|
||||||
|
class MockDatetime:
|
||||||
|
@staticmethod
|
||||||
|
def now(tz=None):
|
||||||
|
if tz is None:
|
||||||
|
return frozen
|
||||||
|
return frozen.astimezone(tz)
|
||||||
|
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||||
|
def __call__(self, *a, **k):
|
||||||
|
return datetime(*a, **k)
|
||||||
|
return MockDatetime
|
||||||
|
|
||||||
|
def _make_event(self, title, ml_market=None, start_time="2026-03-25T18:00:00Z"):
|
||||||
|
import json as _json
|
||||||
|
e = {
|
||||||
|
"title": title,
|
||||||
|
"slug": "test-slug",
|
||||||
|
"startTime": start_time,
|
||||||
|
"markets": [],
|
||||||
|
}
|
||||||
|
if ml_market:
|
||||||
|
e["markets"].append(ml_market)
|
||||||
|
return e
|
||||||
|
|
||||||
|
def _make_ml_market(self, outcomes, prices, vol=50000):
|
||||||
|
import json
|
||||||
|
return {
|
||||||
|
"sportsMarketType": "moneyline",
|
||||||
|
"outcomes": json.dumps(outcomes),
|
||||||
|
"outcomePrices": json.dumps(prices),
|
||||||
|
"bestBid": str(float(prices[0]) - 0.01) if prices else "0.49",
|
||||||
|
"bestAsk": str(float(prices[0]) + 0.01) if prices else "0.51",
|
||||||
|
"volume": str(vol),
|
||||||
|
"acceptingOrders": True,
|
||||||
|
"closed": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_fields_present(self):
|
||||||
|
"""All canonical fields are present and non-null."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_match_event
|
||||||
|
e = self._make_event(
|
||||||
|
"Counter Strike: Team A vs Team B - ESL Pro League",
|
||||||
|
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||||
|
)
|
||||||
|
fd = format_match_event(e)
|
||||||
|
self.assertIn("title", fd)
|
||||||
|
self.assertIn("title_clean", fd)
|
||||||
|
self.assertIn("tournament", fd)
|
||||||
|
self.assertIn("url", fd)
|
||||||
|
self.assertIn("time_status", fd)
|
||||||
|
self.assertIn("time_urgency", fd)
|
||||||
|
self.assertIn("abs_time", fd)
|
||||||
|
self.assertIn("team_a", fd)
|
||||||
|
self.assertIn("team_b", fd)
|
||||||
|
self.assertIn("odds_a", fd)
|
||||||
|
self.assertIn("odds_b", fd)
|
||||||
|
self.assertIn("vol", fd)
|
||||||
|
|
||||||
|
def test_title_clean_no_tournament(self):
|
||||||
|
"""title_clean strips tournament suffix after ' - '."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_match_event
|
||||||
|
e = self._make_event(
|
||||||
|
"Counter Strike: Team A vs Team B - ESL Pro League",
|
||||||
|
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||||
|
)
|
||||||
|
fd = format_match_event(e)
|
||||||
|
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
|
||||||
|
self.assertEqual(fd["tournament"], "ESL Pro League")
|
||||||
|
|
||||||
|
def test_title_clean_no_dash(self):
|
||||||
|
"""title_clean is unchanged when no ' - ' separator."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_match_event
|
||||||
|
e = self._make_event(
|
||||||
|
"Counter Strike: Team A vs Team B",
|
||||||
|
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||||
|
)
|
||||||
|
fd = format_match_event(e)
|
||||||
|
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
|
||||||
|
self.assertEqual(fd["tournament"], "")
|
||||||
|
|
||||||
|
def test_missing_ml(self):
|
||||||
|
"""Returns valid dict with '?' fallbacks when no ML market."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_match_event
|
||||||
|
e = self._make_event("Team A vs Team B")
|
||||||
|
fd = format_match_event(e)
|
||||||
|
self.assertEqual(fd["team_a"], "?")
|
||||||
|
self.assertEqual(fd["team_b"], "?")
|
||||||
|
self.assertEqual(fd["odds_a"], "?")
|
||||||
|
self.assertEqual(fd["odds_b"], "?")
|
||||||
|
self.assertEqual(fd["vol"], 0)
|
||||||
|
|
||||||
|
def test_missing_outcomes(self):
|
||||||
|
"""Handles empty outcomes list gracefully."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_match_event
|
||||||
|
e = self._make_event(
|
||||||
|
"Team A vs Team B",
|
||||||
|
self._make_ml_market([], []),
|
||||||
|
)
|
||||||
|
fd = format_match_event(e)
|
||||||
|
self.assertEqual(fd["team_a"], "?")
|
||||||
|
self.assertEqual(fd["team_b"], "?")
|
||||||
|
|
||||||
|
def test_time_data_passed_through(self):
|
||||||
|
"""Time fields come from _get_time_data."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_match_event
|
||||||
|
e = self._make_event(
|
||||||
|
"Team A vs Team B",
|
||||||
|
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
|
||||||
|
start_time="2026-03-25T18:00:00Z", # 6h in future
|
||||||
|
)
|
||||||
|
fd = format_match_event(e)
|
||||||
|
self.assertEqual(fd["time_status"], "In 6h")
|
||||||
|
self.assertEqual(fd["time_urgency"], 2)
|
||||||
|
self.assertIn("WIB", fd["abs_time"])
|
||||||
|
|
||||||
|
|
||||||
|
class TestFormatNonMatchEvent(unittest.TestCase):
|
||||||
|
"""Tests for format_non_match_event() canonical dict."""
|
||||||
|
|
||||||
|
def _frozen_dt(self, year, month, day, hour, minute):
|
||||||
|
return datetime(year, month, day, hour, minute,
|
||||||
|
tzinfo=timezone.utc)
|
||||||
|
|
||||||
|
def _mock_datetime(self, frozen):
|
||||||
|
class MockDatetime:
|
||||||
|
@staticmethod
|
||||||
|
def now(tz=None):
|
||||||
|
if tz is None:
|
||||||
|
return frozen
|
||||||
|
return frozen.astimezone(tz)
|
||||||
|
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||||
|
def __call__(self, *a, **k):
|
||||||
|
return datetime(*a, **k)
|
||||||
|
return MockDatetime
|
||||||
|
|
||||||
|
def test_fields_present(self):
|
||||||
|
"""All canonical fields are present."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_non_match_event
|
||||||
|
e = {
|
||||||
|
"title": "Will it rain in Jakarta?",
|
||||||
|
"slug": "rain-jakarta",
|
||||||
|
"startTime": "2026-03-25T18:00:00Z",
|
||||||
|
"markets": [
|
||||||
|
{"volume": "10000"},
|
||||||
|
{"volume": "5000"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
fd = format_non_match_event(e)
|
||||||
|
self.assertIn("title", fd)
|
||||||
|
self.assertIn("url", fd)
|
||||||
|
self.assertIn("time_status", fd)
|
||||||
|
self.assertIn("time_urgency", fd)
|
||||||
|
self.assertIn("abs_time", fd)
|
||||||
|
self.assertIn("market_count", fd)
|
||||||
|
self.assertIn("total_vol", fd)
|
||||||
|
|
||||||
|
def test_market_stats(self):
|
||||||
|
"""market_count and total_vol computed correctly."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_non_match_event
|
||||||
|
e = {
|
||||||
|
"title": "Test",
|
||||||
|
"slug": "test",
|
||||||
|
"startTime": "2026-03-25T18:00:00Z",
|
||||||
|
"markets": [
|
||||||
|
{"volume": "10000"},
|
||||||
|
{"volume": "5000"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
fd = format_non_match_event(e)
|
||||||
|
self.assertEqual(fd["market_count"], 2)
|
||||||
|
self.assertEqual(fd["total_vol"], 15000)
|
||||||
|
|
||||||
|
def test_time_passed_through(self):
|
||||||
|
"""Time fields come from _get_time_data."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import format_non_match_event
|
||||||
|
e = {
|
||||||
|
"title": "Test",
|
||||||
|
"slug": "test",
|
||||||
|
"startTime": "2026-03-25T18:00:00Z",
|
||||||
|
"markets": [],
|
||||||
|
}
|
||||||
|
fd = format_non_match_event(e)
|
||||||
|
self.assertEqual(fd["time_status"], "In 6h")
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderMatchLines(unittest.TestCase):
|
||||||
|
"""Tests for render_match_lines() text and HTML output."""
|
||||||
|
|
||||||
|
def test_text_mode_exact_lines(self):
|
||||||
|
"""text mode produces expected plain text lines."""
|
||||||
|
from browse import render_match_lines
|
||||||
|
fd = {
|
||||||
|
"title_clean": "Team A vs Team B",
|
||||||
|
"url": "https://polymarket.com/market/test",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "In 6h",
|
||||||
|
"vol": 50000,
|
||||||
|
"tournament": "ESL Pro League",
|
||||||
|
"team_a": "Team A",
|
||||||
|
"team_b": "Team B",
|
||||||
|
"odds_a": "55c",
|
||||||
|
"odds_b": "45c",
|
||||||
|
}
|
||||||
|
lines = render_match_lines(fd, 1, mode="text")
|
||||||
|
self.assertEqual(lines[0], "1. [Team A vs Team B](https://polymarket.com/market/test)")
|
||||||
|
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
|
||||||
|
self.assertEqual(lines[2], " Vol: $50,000")
|
||||||
|
self.assertEqual(lines[3], " Tournament: ESL Pro League")
|
||||||
|
self.assertEqual(lines[4], " Odds: Team A 55c | 45c Team B")
|
||||||
|
|
||||||
|
def test_text_mode_no_tournament(self):
|
||||||
|
"""text mode omits Tournament line when tournament is empty."""
|
||||||
|
from browse import render_match_lines
|
||||||
|
fd = {
|
||||||
|
"title_clean": "Team A vs Team B",
|
||||||
|
"url": "https://polymarket.com/market/test",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "In 6h",
|
||||||
|
"vol": 50000,
|
||||||
|
"tournament": "",
|
||||||
|
"team_a": "Team A",
|
||||||
|
"team_b": "Team B",
|
||||||
|
"odds_a": "55c",
|
||||||
|
"odds_b": "45c",
|
||||||
|
}
|
||||||
|
lines = render_match_lines(fd, 2, mode="text")
|
||||||
|
self.assertEqual(len(lines), 4)
|
||||||
|
self.assertEqual(lines[0], "2. [Team A vs Team B](https://polymarket.com/market/test)")
|
||||||
|
self.assertNotIn("Tournament", lines[3])
|
||||||
|
|
||||||
|
def test_html_mode_exact(self):
|
||||||
|
"""html mode produces expected HTML lines with escape_html."""
|
||||||
|
from browse import render_match_lines
|
||||||
|
fd = {
|
||||||
|
"title_clean": "Team A & Team B vs Team C",
|
||||||
|
"url": "https://polymarket.com/market/test",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "LIVE",
|
||||||
|
"vol": 50000,
|
||||||
|
"tournament": "ESL Pro League",
|
||||||
|
"team_a": "Team A & Team B",
|
||||||
|
"team_b": "Team C",
|
||||||
|
"odds_a": "55c",
|
||||||
|
"odds_b": "45c",
|
||||||
|
}
|
||||||
|
lines = render_match_lines(fd, 1, mode="html")
|
||||||
|
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/market/test\">Team A & Team B vs Team C</a>")
|
||||||
|
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | LIVE")
|
||||||
|
self.assertEqual(lines[2], " Vol: $50,000")
|
||||||
|
self.assertEqual(lines[3], " Tournament: ESL Pro League")
|
||||||
|
self.assertEqual(lines[4], " Odds: Team A & Team B 55c | 45c Team C")
|
||||||
|
|
||||||
|
def test_html_mode_xss_prevention(self):
|
||||||
|
"""html mode escapes < and > to prevent XSS."""
|
||||||
|
from browse import render_match_lines
|
||||||
|
fd = {
|
||||||
|
"title_clean": "<script>alert('xss')</script>",
|
||||||
|
"url": "https://polymarket.com/market/test",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "LIVE",
|
||||||
|
"vol": 1000,
|
||||||
|
"tournament": "",
|
||||||
|
"team_a": "Team A",
|
||||||
|
"team_b": "Team B",
|
||||||
|
"odds_a": "50c",
|
||||||
|
"odds_b": "50c",
|
||||||
|
}
|
||||||
|
lines = render_match_lines(fd, 1, mode="html")
|
||||||
|
self.assertIn("<script>", lines[0])
|
||||||
|
self.assertNotIn("<script>", lines[0])
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderNonMatchLines(unittest.TestCase):
|
||||||
|
"""Tests for render_non_match_lines() text and HTML output."""
|
||||||
|
|
||||||
|
def test_text_mode_exact_lines(self):
|
||||||
|
"""text mode produces expected plain text lines."""
|
||||||
|
from browse import render_non_match_lines
|
||||||
|
fd = {
|
||||||
|
"title": "Will it rain in Jakarta?",
|
||||||
|
"url": "https://polymarket.com/event/rain-jakarta",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "In 6h",
|
||||||
|
"market_count": 3,
|
||||||
|
"total_vol": 25000,
|
||||||
|
}
|
||||||
|
lines = render_non_match_lines(fd, 1, mode="text")
|
||||||
|
self.assertEqual(lines[0], "1. [Will it rain in Jakarta?](https://polymarket.com/event/rain-jakarta)")
|
||||||
|
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
|
||||||
|
self.assertEqual(lines[2], " Markets: 3 | Total Vol: $25,000")
|
||||||
|
|
||||||
|
def test_html_mode_exact(self):
|
||||||
|
"""html mode produces expected HTML lines with escape_html."""
|
||||||
|
from browse import render_non_match_lines
|
||||||
|
fd = {
|
||||||
|
"title": "Rain <or> Sun?",
|
||||||
|
"url": "https://polymarket.com/event/rain-sun",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "In 6h",
|
||||||
|
"market_count": 2,
|
||||||
|
"total_vol": 10000,
|
||||||
|
}
|
||||||
|
lines = render_non_match_lines(fd, 1, mode="html")
|
||||||
|
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/event/rain-sun\">Rain <or> Sun?</a>")
|
||||||
|
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
|
||||||
|
self.assertEqual(lines[2], " Markets: 2 | Total Vol: $10,000")
|
||||||
|
|
||||||
|
|
||||||
|
class TestPrintBrowseIntegration(unittest.TestCase):
|
||||||
|
"""Integration tests for print_browse using the new pipeline."""
|
||||||
|
|
||||||
|
def _frozen_dt(self, year, month, day, hour, minute):
|
||||||
|
return datetime(year, month, day, hour, minute,
|
||||||
|
tzinfo=timezone.utc)
|
||||||
|
|
||||||
|
def _mock_datetime(self, frozen):
|
||||||
|
class MockDatetime:
|
||||||
|
@staticmethod
|
||||||
|
def now(tz=None):
|
||||||
|
if tz is None:
|
||||||
|
return frozen
|
||||||
|
return frozen.astimezone(tz)
|
||||||
|
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||||
|
def __call__(self, *a, **k):
|
||||||
|
return datetime(*a, **k)
|
||||||
|
return MockDatetime
|
||||||
|
|
||||||
|
@patch('builtins.print')
|
||||||
|
def test_print_browse_uses_new_pipeline(self, mock_print):
|
||||||
|
"""print_browse calls format_match_event and render_match_lines."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import print_browse
|
||||||
|
match_events = [{
|
||||||
|
"title": "Counter Strike: Team A vs Team B - ESL Pro League",
|
||||||
|
"slug": "csa",
|
||||||
|
"startTime": "2026-03-25T18:00:00Z",
|
||||||
|
"markets": [{
|
||||||
|
"sportsMarketType": "moneyline",
|
||||||
|
"outcomes": '["Team A", "Team B"]',
|
||||||
|
"outcomePrices": "[0.55, 0.45]",
|
||||||
|
"bestBid": "0.54",
|
||||||
|
"bestAsk": "0.56",
|
||||||
|
"volume": "50000",
|
||||||
|
"acceptingOrders": True,
|
||||||
|
"closed": False,
|
||||||
|
}],
|
||||||
|
}]
|
||||||
|
with patch('browse.format_match_event') as mock_fmt, \
|
||||||
|
patch('browse.render_match_lines') as mock_render:
|
||||||
|
mock_fmt.return_value = {
|
||||||
|
"title_clean": "Team A vs Team B",
|
||||||
|
"url": "https://polymarket.com/market/csa",
|
||||||
|
"abs_time": "Mar 25, 19:00 WIB",
|
||||||
|
"time_status": "In 6h",
|
||||||
|
"vol": 50000,
|
||||||
|
"tournament": "ESL Pro League",
|
||||||
|
"team_a": "Team A",
|
||||||
|
"team_b": "Team B",
|
||||||
|
"odds_a": "55c",
|
||||||
|
"odds_b": "45c",
|
||||||
|
}
|
||||||
|
mock_render.return_value = [
|
||||||
|
"1. [Team A vs Team B](https://polymarket.com/market/csa)",
|
||||||
|
" Mar 25, 19:00 WIB | In 6h",
|
||||||
|
" Vol: $50,000",
|
||||||
|
" Tournament: ESL Pro League",
|
||||||
|
" Odds: Team A 55c | 45c Team B",
|
||||||
|
]
|
||||||
|
print_browse(match_events, [], "Counter Strike", 1, 1, 1, 0,
|
||||||
|
non_matches_max=5)
|
||||||
|
|
||||||
|
mock_fmt.assert_called_once_with(match_events[0])
|
||||||
|
mock_render.assert_called_once_with(mock_fmt.return_value, 1, mode="text")
|
||||||
|
|
||||||
|
@patch('builtins.print')
|
||||||
|
def test_print_browse_matches_only(self, mock_print):
|
||||||
|
"""matches_only suppresses non-match section."""
|
||||||
|
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
|
||||||
|
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||||
|
from browse import print_browse
|
||||||
|
with patch('browse.format_non_match_event') as mock_non_fmt:
|
||||||
|
print_browse([], [], "Counter Strike", 0, 0, 0, 0,
|
||||||
|
non_matches_max=5, matches_only=True)
|
||||||
|
mock_non_fmt.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSendChunked(unittest.TestCase):
|
||||||
|
"""Tests for send_chunked() helper."""
|
||||||
|
|
||||||
|
def test_small_message_sent_directly(self):
|
||||||
|
"""Messages under 4096 chars go through without chunking."""
|
||||||
|
sent_texts = []
|
||||||
|
def fake_send(text):
|
||||||
|
sent_texts.append(text)
|
||||||
|
|
||||||
|
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", "", "MATCH MARKETS", "", "1. test"]
|
||||||
|
# This fits in one message
|
||||||
|
from browse import send_chunked
|
||||||
|
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
|
||||||
|
show_matches=True, show_non_matches=False)
|
||||||
|
self.assertEqual(len(sent_texts), 1)
|
||||||
|
|
||||||
|
def test_chunked_message_gets_cont_header(self):
|
||||||
|
"""Messages over 4096 chars get continuation header."""
|
||||||
|
sent_texts = []
|
||||||
|
def fake_send(text):
|
||||||
|
sent_texts.append(text)
|
||||||
|
|
||||||
|
# Build enough content to exceed 4096 chars
|
||||||
|
# Each event line: ~260 chars. Need ~16 events + headers (~4200 chars)
|
||||||
|
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", ""]
|
||||||
|
for i in range(16):
|
||||||
|
lines += [f"<b>{i+1}.</b> <a href=\"https://polymarket.com/market/{i}\">Team {'X' * 250}</a>", " Mar 25, 19:00 WIB | In 6h", " Vol: $50,000", " Odds: TeamA 55c | 45c TeamB", ""]
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
from browse import send_chunked
|
||||||
|
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
|
||||||
|
show_matches=True, show_non_matches=False)
|
||||||
|
|
||||||
|
# Should have sent more than one message (chunked)
|
||||||
|
self.assertGreater(len(sent_texts), 1)
|
||||||
|
# At least one continuation message
|
||||||
|
cont_found = any("(cont.)" in t for t in sent_texts)
|
||||||
|
self.assertTrue(cont_found, f"Expected at least one '(cont.)' message. Got {len(sent_texts)} messages.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user