Compare commits
2 Commits
a7837cec0f
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27c8cb3597 | ||
|
|
4a33d6924e |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,4 +2,3 @@ __pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
.DS_Store
|
||||
.worktrees/
|
||||
|
||||
@@ -4,13 +4,11 @@ Polymarket Event Browser
|
||||
Browse tradeable Polymarket events by game category.
|
||||
"""
|
||||
|
||||
import html
|
||||
import subprocess
|
||||
import json
|
||||
import time
|
||||
import argparse
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen, Request
|
||||
|
||||
# ============================================================
|
||||
# CONFIG
|
||||
@@ -19,7 +17,6 @@ from urllib.request import urlopen, Request
|
||||
PAGE_SIZE = 50
|
||||
MAX_RETRIES = 5
|
||||
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
|
||||
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
|
||||
|
||||
GAME_CATEGORIES = {
|
||||
"All Esports": "Esports",
|
||||
@@ -222,79 +219,94 @@ def format_spread(bid, ask):
|
||||
spread = ask - bid
|
||||
return f"{prob_to_cents(spread)}c"
|
||||
|
||||
|
||||
def _get_time_data(e, tz=None):
|
||||
def get_match_time_status(e):
|
||||
"""
|
||||
Unified time data extraction for event timestamps.
|
||||
|
||||
Uses startTime (preferred) or startDate as the event start time.
|
||||
Datetime parsing and all relative calculations are UTC-based.
|
||||
The tz parameter only affects the abs_time formatting.
|
||||
|
||||
Args:
|
||||
e: Event dict with 'startTime' or 'startDate' key.
|
||||
tz: datetime.timezone for abs_time formatting.
|
||||
Defaults to WIB (UTC+7).
|
||||
|
||||
Returns:
|
||||
{
|
||||
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
|
||||
"time_urgency": int, # 0-3 (higher = more urgent/live)
|
||||
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
|
||||
}
|
||||
Return a human-readable match time status.
|
||||
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
|
||||
Uses startTime for actual match start time.
|
||||
Displays times in WIB (UTC+7 for Indonesian users).
|
||||
"""
|
||||
tz = tz or WIB
|
||||
# Use startTime for actual match start, not startDate (which is market creation time)
|
||||
start_str = e.get("startTime") or e.get("startDate", "")
|
||||
|
||||
if not start_str:
|
||||
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
|
||||
return "TBD", 0
|
||||
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
delta = start_dt - now_utc
|
||||
total_sec = delta.total_seconds()
|
||||
utc7 = timezone(timedelta(hours=7))
|
||||
now = now_utc.astimezone(utc7)
|
||||
start_utc7 = start_dt.astimezone(utc7)
|
||||
|
||||
if total_sec < 0:
|
||||
# Event is in the past
|
||||
hours_ago = abs(total_sec) / 3600
|
||||
delta = start_dt - now_utc
|
||||
|
||||
if delta.total_seconds() < 0:
|
||||
# Started already
|
||||
hours_ago = abs(delta.total_seconds()) / 3600
|
||||
if hours_ago < 1:
|
||||
time_status = "LIVE"
|
||||
time_urgency = 3
|
||||
return "LIVE", 3
|
||||
elif hours_ago < 4:
|
||||
time_status = f"LIVE {int(hours_ago)}h"
|
||||
time_urgency = 3
|
||||
return f"LIVE {int(hours_ago)}h", 3
|
||||
elif hours_ago < 24:
|
||||
time_status = f"{int(hours_ago)}h ago"
|
||||
time_urgency = 1
|
||||
return f"Started {int(hours_ago)}h ago", 1
|
||||
else:
|
||||
days = int(hours_ago / 24)
|
||||
time_status = f"{days}d ago"
|
||||
time_urgency = 0
|
||||
return f"{days}d ago", 0
|
||||
else:
|
||||
# Event is in the future
|
||||
if total_sec < 3600:
|
||||
mins = int(total_sec / 60)
|
||||
time_status = f"In {mins}m"
|
||||
time_urgency = 3
|
||||
elif total_sec < 86400:
|
||||
hours_until = int(total_sec / 3600)
|
||||
time_status = f"In {hours_until}h"
|
||||
time_urgency = 2
|
||||
# Starts in future
|
||||
hours_until = delta.total_seconds() / 3600
|
||||
if hours_until <= 0:
|
||||
return "LIVE", 3
|
||||
elif hours_until < 1:
|
||||
mins = int(delta.total_seconds() / 60)
|
||||
return f"In {mins}m", 3
|
||||
elif hours_until < 24:
|
||||
return f"In {int(hours_until)}h", 2
|
||||
else:
|
||||
days = int(total_sec / 86400)
|
||||
time_status = f"In {days}d"
|
||||
time_urgency = 1
|
||||
days = int(hours_until / 24)
|
||||
return f"In {days}d", 1
|
||||
except:
|
||||
return "", 0
|
||||
|
||||
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ")
|
||||
if tz == WIB:
|
||||
abs_time += "WIB"
|
||||
else:
|
||||
abs_time += start_dt.astimezone(tz).strftime("%Z")
|
||||
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
|
||||
except Exception:
|
||||
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
|
||||
def get_match_time_str(e):
|
||||
"""
|
||||
Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
|
||||
Uses startTime for actual match start time.
|
||||
"""
|
||||
start_str = e.get("startTime") or e.get("startDate", "")
|
||||
if not start_str:
|
||||
return "TBD"
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
delta = start_dt - now_utc
|
||||
|
||||
if delta.total_seconds() < 0:
|
||||
hours_ago = abs(delta.total_seconds()) / 3600
|
||||
if hours_ago < 1:
|
||||
return "LIVE"
|
||||
elif hours_ago < 4:
|
||||
return f"LIVE {int(hours_ago)}h"
|
||||
elif hours_ago < 24:
|
||||
return f"{int(hours_ago)}h ago"
|
||||
else:
|
||||
days = int(hours_ago / 24)
|
||||
return f"{days}d ago"
|
||||
else:
|
||||
hours_until = delta.total_seconds() / 3600
|
||||
if hours_until <= 0:
|
||||
return "LIVE"
|
||||
elif hours_until < 1:
|
||||
mins = int(delta.total_seconds() / 60)
|
||||
return f"In {mins}m"
|
||||
elif hours_until < 24:
|
||||
return f"In {int(hours_until)}h"
|
||||
else:
|
||||
days = int(hours_until / 24)
|
||||
return f"In {days}d"
|
||||
except:
|
||||
return ""
|
||||
|
||||
def filter_events(events, tradeable_only=True):
|
||||
"""
|
||||
@@ -313,7 +325,6 @@ def filter_events(events, tradeable_only=True):
|
||||
|
||||
return match_events, non_match_events
|
||||
|
||||
|
||||
def sort_events(events):
|
||||
return sorted(events, key=get_ml_volume, reverse=True)
|
||||
|
||||
@@ -348,12 +359,12 @@ def format_event(e):
|
||||
best_bid = float(ml.get("bestBid", 0)) if ml else 0
|
||||
best_ask = float(ml.get("bestAsk", 0)) if ml else 0
|
||||
vol = get_ml_volume(e)
|
||||
td = _get_time_data(e)
|
||||
time_status, urgency = get_match_time_status(e)
|
||||
|
||||
return {
|
||||
"title": e.get("title", ""),
|
||||
"time_status": td["time_status"],
|
||||
"time_urgency": td["time_urgency"],
|
||||
"time_status": time_status,
|
||||
"time_urgency": urgency,
|
||||
"url": get_event_url(e),
|
||||
"livestream": e.get("resolutionSource"),
|
||||
"outcomes": outcomes,
|
||||
@@ -372,12 +383,11 @@ def format_detail_event(e):
|
||||
]
|
||||
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
|
||||
|
||||
td = _get_time_data(e)
|
||||
time_status, urgency = get_match_time_status(e)
|
||||
|
||||
return {
|
||||
"title": e.get("title", ""),
|
||||
"time_status": td["time_status"],
|
||||
"abs_time": td["abs_time"],
|
||||
"time_status": time_status,
|
||||
"url": get_event_url(e),
|
||||
"livestream": e.get("resolutionSource"),
|
||||
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
|
||||
@@ -404,6 +414,48 @@ def format_detail_event(e):
|
||||
# DISPLAY
|
||||
# ============================================================
|
||||
|
||||
def get_start_time_wib(e):
|
||||
"""Return (date_time_str, relative_str) for display."""
|
||||
start_str = e.get("startTime") or e.get("startDate", "")
|
||||
if not start_str:
|
||||
return "TBD", ""
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
utc7 = timezone(timedelta(hours=7))
|
||||
start_utc7 = start_dt.astimezone(utc7)
|
||||
|
||||
# Absolute: "Mar 25, 19:00 WIB"
|
||||
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
|
||||
|
||||
# Relative: "In 5h", "In 10h", "LIVE", etc.
|
||||
delta = start_dt - now_utc
|
||||
if delta.total_seconds() < 0:
|
||||
hours_ago = abs(delta.total_seconds()) / 3600
|
||||
if hours_ago < 1:
|
||||
rel_str = "LIVE"
|
||||
elif hours_ago < 24:
|
||||
rel_str = f"{int(hours_ago)}h ago"
|
||||
else:
|
||||
days = int(hours_ago / 24)
|
||||
rel_str = f"{days}d ago"
|
||||
else:
|
||||
hours_until = delta.total_seconds() / 3600
|
||||
if hours_until <= 0:
|
||||
rel_str = "LIVE"
|
||||
elif hours_until < 1:
|
||||
mins_until = int(delta.total_seconds() / 60)
|
||||
rel_str = f"In {mins_until}m"
|
||||
elif hours_until < 24:
|
||||
rel_str = f"In {int(hours_until)}h"
|
||||
else:
|
||||
days = int(hours_until / 24)
|
||||
rel_str = f"In {days}d"
|
||||
|
||||
return abs_str, rel_str
|
||||
except:
|
||||
return "TBD", ""
|
||||
|
||||
def get_header_date():
|
||||
"""Return current date string like 'Mar 25, 2026'"""
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
@@ -459,9 +511,7 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
||||
vol = f["volume"]
|
||||
title = f["title"]
|
||||
url = f["url"]
|
||||
td = _get_time_data(e)
|
||||
start_time_wib = td["abs_time"]
|
||||
rel_time = td["time_status"]
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
|
||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
||||
@@ -489,9 +539,7 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
||||
for i, e in enumerate(non_match_events[:non_matches_max], 1):
|
||||
title = e.get("title", "?")
|
||||
url = get_event_url(e)
|
||||
td = _get_time_data(e)
|
||||
start_time_wib = td["abs_time"]
|
||||
rel_time = td["time_status"]
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
|
||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
||||
market_count = len(e.get("markets", []))
|
||||
@@ -501,11 +549,17 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
|
||||
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
||||
|
||||
def print_detail(e, detail):
|
||||
from datetime import datetime, timezone, timedelta
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
utc7 = timezone(timedelta(hours=7))
|
||||
now_utc7 = now_utc.astimezone(utc7)
|
||||
|
||||
print(f"\n{detail['title']}")
|
||||
print(f"URL: {detail['url']}")
|
||||
print(f"Livestream: {detail['livestream']}")
|
||||
|
||||
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
|
||||
time_str = get_match_time_str(e)
|
||||
print(f"\n{detail['time_status']}")
|
||||
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
|
||||
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
|
||||
@@ -522,44 +576,14 @@ def print_detail(e, detail):
|
||||
# TELEGRAM
|
||||
# ============================================================
|
||||
|
||||
def escape_html(text):
|
||||
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
|
||||
return (text
|
||||
.replace("&", "&")
|
||||
.replace("<", "<")
|
||||
.replace(">", ">")
|
||||
.replace('"', """))
|
||||
|
||||
|
||||
def send_telegram_message(bot_token, chat_id, text, timeout=10):
|
||||
"""Send a message via Telegram bot API. Returns the message ID on success.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
|
||||
URLError/HTTPError: On network or HTTP-level failures.
|
||||
"""
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
data = urlencode({
|
||||
"chat_id": chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_web_page_preview": "true",
|
||||
}).encode("utf-8")
|
||||
req = Request(url, data=data, method="POST")
|
||||
with urlopen(req, timeout=timeout) as resp:
|
||||
result = json.loads(resp.read())
|
||||
if not result.get("ok"):
|
||||
raise RuntimeError(f"Telegram API error: {result.get('description')}")
|
||||
return result["result"]["message_id"]
|
||||
|
||||
|
||||
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
|
||||
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
|
||||
"""Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
|
||||
import os
|
||||
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
|
||||
bot_token = os.environ.get("BOT_TOKEN")
|
||||
chat_id = os.environ.get("CHAT_ID")
|
||||
if not bot_token or not chat_id:
|
||||
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
|
||||
print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
|
||||
return
|
||||
|
||||
from datetime import datetime, timezone, timedelta
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
@@ -572,8 +596,19 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
||||
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
|
||||
|
||||
def send(text):
|
||||
msg_id = send_telegram_message(bot_token, chat_id, text)
|
||||
print(f" Sent msg {msg_id}")
|
||||
result = subprocess.run(
|
||||
["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
|
||||
"-d", f"chat_id={chat_id}",
|
||||
"-d", f"text={text}",
|
||||
"-d", "parse_mode=HTML",
|
||||
"-d", "disable_web_page_preview=true"],
|
||||
capture_output=True
|
||||
)
|
||||
resp = json.loads(result.stdout.decode())
|
||||
if resp.get("ok"):
|
||||
print(f" Sent msg {resp['result']['message_id']}")
|
||||
else:
|
||||
print(f" Error: {resp.get('description')}")
|
||||
|
||||
# Build sections
|
||||
lines = [f"<b>{category.upper()}</b> | {header_date}"]
|
||||
@@ -592,16 +627,14 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
||||
vol = get_ml_volume(e)
|
||||
title = e.get("title", "?")
|
||||
url = get_event_url(e)
|
||||
td = _get_time_data(e)
|
||||
start_time_wib = td["abs_time"]
|
||||
rel_time = td["time_status"]
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
team_a = outcomes[0] if len(outcomes) > 0 else "?"
|
||||
team_b = outcomes[1] if len(outcomes) > 1 else "?"
|
||||
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
|
||||
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
|
||||
tournament = get_tournament(title)
|
||||
title_clean = title.split(" - ")[0].strip() if " - " in title else title
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>")
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
|
||||
lines.append(f" {start_time_wib} | {rel_time}")
|
||||
lines.append(f" Vol: ${vol:,.0f}")
|
||||
if tournament:
|
||||
@@ -619,12 +652,10 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
|
||||
for i, e in enumerate(non_match_events, 1):
|
||||
title = e.get("title", "?")
|
||||
url = get_event_url(e)
|
||||
td = _get_time_data(e)
|
||||
start_time_wib = td["abs_time"]
|
||||
rel_time = td["time_status"]
|
||||
start_time_wib, rel_time = get_start_time_wib(e)
|
||||
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
|
||||
market_count = len(e.get("markets", []))
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
|
||||
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
|
||||
lines.append(f" {start_time_wib} | {rel_time}")
|
||||
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
|
||||
lines.append("")
|
||||
@@ -706,7 +737,7 @@ def main():
|
||||
parser.add_argument("--raw", action="store_true",
|
||||
help="Show all events without tradeable filter (for debugging).")
|
||||
parser.add_argument("--telegram", action="store_true",
|
||||
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
|
||||
help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list_categories:
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
# Tests package
|
||||
@@ -1,324 +0,0 @@
|
||||
"""
|
||||
Unit tests for browse.py Telegram functions.
|
||||
|
||||
Run with: python -m pytest tests/test_browse.py -v
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
|
||||
from browse import send_telegram_message
|
||||
|
||||
|
||||
class TestSendTelegramMessage(unittest.TestCase):
|
||||
"""Tests for the module-level send_telegram_message function."""
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_success(self, mock_urlopen):
|
||||
"""send_telegram_message returns message_id on success."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 123}}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
result = send_telegram_message("test_token", "test_chat", "hello world")
|
||||
|
||||
self.assertEqual(result, 123)
|
||||
mock_urlopen.assert_called_once()
|
||||
call_args = mock_urlopen.call_args
|
||||
req = call_args[0][0]
|
||||
self.assertEqual(req.full_url, "https://api.telegram.org/bottest_token/sendMessage")
|
||||
self.assertEqual(req.method, "POST")
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_api_error_raises_runtime_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises RuntimeError when Telegram API returns ok=false."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": false, "description": "Forbidden"}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
self.assertIn("Telegram API error: Forbidden", str(ctx.exception))
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_invalid_token_raises_http_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises HTTPError on invalid token (404)."""
|
||||
from urllib.error import HTTPError
|
||||
mock_urlopen.side_effect = HTTPError(
|
||||
url="https://api.telegram.org/botINVALID/sendMessage",
|
||||
code=404,
|
||||
msg="Not Found",
|
||||
hdrs={},
|
||||
fp=None
|
||||
)
|
||||
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
send_telegram_message("INVALID", "test_chat", "hello")
|
||||
self.assertEqual(ctx.exception.code, 404)
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_rate_limit_raises_http_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises HTTPError on rate limit (429)."""
|
||||
from urllib.error import HTTPError
|
||||
mock_urlopen.side_effect = HTTPError(
|
||||
url="https://api.telegram.org/bottest_token/sendMessage",
|
||||
code=429,
|
||||
msg="Too Many Requests",
|
||||
hdrs={},
|
||||
fp=None
|
||||
)
|
||||
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
self.assertEqual(ctx.exception.code, 429)
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_network_error_raises_url_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises URLError on network failure."""
|
||||
from urllib.error import URLError
|
||||
mock_urlopen.side_effect = URLError("Connection refused")
|
||||
|
||||
with self.assertRaises(URLError) as ctx:
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
self.assertIn("Connection refused", str(ctx.exception))
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_timeout_raises_url_error(self, mock_urlopen):
|
||||
"""send_telegram_message raises URLError on timeout."""
|
||||
from urllib.error import URLError
|
||||
mock_urlopen.side_effect = URLError("<urlopen error TimeoutError: timed out>")
|
||||
|
||||
with self.assertRaises(URLError):
|
||||
send_telegram_message("test_token", "test_chat", "hello")
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_custom_timeout_used(self, mock_urlopen):
|
||||
"""send_telegram_message respects custom timeout parameter."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 456}}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
send_telegram_message("test_token", "test_chat", "hello", timeout=30)
|
||||
|
||||
call_kwargs = mock_urlopen.call_args[1]
|
||||
self.assertEqual(call_kwargs['timeout'], 30)
|
||||
|
||||
@patch('browse.urlopen')
|
||||
def test_send_html_parsing_mode(self, mock_urlopen):
|
||||
"""send_telegram_message sends with parse_mode=HTML."""
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 789}}'
|
||||
mock_urlopen.return_value.__enter__.return_value = mock_resp
|
||||
|
||||
send_telegram_message("test_token", "test_chat", "<b>bold</b>")
|
||||
|
||||
call_args = mock_urlopen.call_args
|
||||
req = call_args[0][0]
|
||||
# Verify parse_mode=HTML is in the data
|
||||
self.assertIn(b"parse_mode=HTML", req.data)
|
||||
|
||||
|
||||
class TestHtmlInjection(unittest.TestCase):
|
||||
"""Tests for HTML injection prevention in Telegram messages."""
|
||||
|
||||
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
|
||||
@patch('browse.send_telegram_message')
|
||||
def test_send_to_telegram_html_injection_in_match_title(self, mock_send_msg):
|
||||
"""
|
||||
titles in match events are NOT escaped before inserting into HTML.
|
||||
This test FAILS if HTML chars are unescaped (vulnerable),
|
||||
and PASSES once escape_html() is implemented.
|
||||
"""
|
||||
mock_send_msg.return_value = 123
|
||||
|
||||
# Simulate a Polymarket event with HTML injection in the title
|
||||
malicious_event = {
|
||||
"title": "<script>alert('XSS')</script> - Team A vs Team B",
|
||||
"slug": "test-event",
|
||||
"startTime": "2027-03-26T12:00:00Z",
|
||||
"markets": [{
|
||||
"sportsMarketType": "moneyline",
|
||||
"outcomes": '["Team A", "Team B"]',
|
||||
"outcomePrices": "[0.55, 0.45]",
|
||||
"bestBid": "0.54",
|
||||
"bestAsk": "0.56",
|
||||
"volume": 50000,
|
||||
"acceptingOrders": True,
|
||||
"closed": False,
|
||||
}],
|
||||
}
|
||||
|
||||
from browse import send_to_telegram
|
||||
send_to_telegram([malicious_event], [], "Counter Strike")
|
||||
|
||||
# Check what was passed to send_telegram_message
|
||||
self.assertEqual(mock_send_msg.called, True)
|
||||
sent_text = mock_send_msg.call_args[0][2] # text arg (3rd positional)
|
||||
|
||||
# AFTER FIX: <script> should be escaped as <script>
|
||||
# BEFORE FIX: raw <script> appears in text (vulnerable — test would fail here)
|
||||
self.assertIn("<script>", sent_text,
|
||||
"HTML injection still present — title may NOT be escaped")
|
||||
self.assertIn("</script>", sent_text)
|
||||
|
||||
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
|
||||
@patch('browse.send_telegram_message')
|
||||
def test_send_to_telegram_ampersand_in_title(self, mock_send_msg):
|
||||
"""
|
||||
Ampersands in titles should be escaped as & when using HTML parse_mode.
|
||||
BEFORE fix: "&" appears raw in the HTML (vulnerable).
|
||||
AFTER fix: "&" appears as "&".
|
||||
"""
|
||||
mock_send_msg.return_value = 123
|
||||
|
||||
event_with_ampersand = {
|
||||
"title": "Team A & Team B vs Team C",
|
||||
"slug": "amp-test",
|
||||
"startTime": "2027-03-26T12:00:00Z",
|
||||
"markets": [{
|
||||
"sportsMarketType": "moneyline",
|
||||
"outcomes": '["Team A & Team B", "Team C"]',
|
||||
"outcomePrices": "[0.50, 0.50]",
|
||||
"bestBid": "0.49",
|
||||
"bestAsk": "0.51",
|
||||
"volume": 10000,
|
||||
"acceptingOrders": True,
|
||||
"closed": False,
|
||||
}],
|
||||
}
|
||||
|
||||
from browse import send_to_telegram
|
||||
send_to_telegram([event_with_ampersand], [], "Dota 2")
|
||||
|
||||
sent_text = mock_send_msg.call_args[0][2]
|
||||
|
||||
# AFTER FIX: & should be escaped as &
|
||||
# BEFORE FIX: raw & appears (vulnerable — test would fail here)
|
||||
self.assertIn("&", sent_text,
|
||||
"Ampersand not escaped — title may NOT be escaped")
|
||||
|
||||
|
||||
class TestTimeFunctions(unittest.TestCase):
|
||||
"""Tests for _get_time_data() unified helper.
|
||||
|
||||
These tests verify the helper returns correct time_status, time_urgency,
|
||||
and abs_time for various event scenarios. Callers extract the fields they
|
||||
need from the returned dict.
|
||||
"""
|
||||
|
||||
def _make_event(self, start_time):
|
||||
"""Helper to create a minimal event with a startTime."""
|
||||
return {"startTime": start_time}
|
||||
|
||||
def _frozen_dt(self, year, month, day, hour, minute, second=0):
|
||||
return datetime(year, month, day, hour, minute, second,
|
||||
tzinfo=timezone.utc)
|
||||
|
||||
def _mock_datetime(self, frozen):
|
||||
"""Return a mock datetime class that freezes now() to the given datetime."""
|
||||
class MockDatetime:
|
||||
@staticmethod
|
||||
def now(tz=None):
|
||||
if tz is None:
|
||||
return frozen
|
||||
return frozen.astimezone(tz)
|
||||
fromisoformat = staticmethod(datetime.fromisoformat)
|
||||
def __call__(self, *a, **k):
|
||||
return datetime(*a, **k)
|
||||
return MockDatetime
|
||||
|
||||
# === _get_time_data core tests ===
|
||||
|
||||
def test_get_time_data_tbd(self):
|
||||
"""No startTime -> TBD/0urgency/abs TBD."""
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data({})
|
||||
self.assertEqual(td["time_status"], "TBD")
|
||||
self.assertEqual(td["time_urgency"], 0)
|
||||
self.assertEqual(td["abs_time"], "TBD")
|
||||
|
||||
def test_get_time_data_in_30m(self):
|
||||
"""Starts in 30 minutes -> 'In 30m', urgency 3."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:30:00Z"))
|
||||
self.assertEqual(td["time_status"], "In 30m")
|
||||
self.assertEqual(td["time_urgency"], 3)
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
|
||||
def test_get_time_data_in_6h(self):
|
||||
"""Starts in 6 hours -> 'In 6h', urgency 2."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T18:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "In 6h")
|
||||
self.assertEqual(td["time_urgency"], 2)
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
|
||||
def test_get_time_data_in_2d(self):
|
||||
"""Starts in 2 days -> 'In 2d', urgency 1."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-27T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "In 2d")
|
||||
self.assertEqual(td["time_urgency"], 1)
|
||||
|
||||
def test_get_time_data_live(self):
|
||||
"""Started 30 minutes ago -> 'LIVE', urgency 3."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 30, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "LIVE")
|
||||
self.assertEqual(td["time_urgency"], 3)
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
|
||||
def test_get_time_data_started_2h_ago(self):
|
||||
"""Started 2 hours ago -> 'LIVE 2h', urgency 3."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 14, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "LIVE 2h")
|
||||
self.assertEqual(td["time_urgency"], 3)
|
||||
|
||||
def test_get_time_data_started_12h_ago(self):
|
||||
"""Started 12 hours ago -> '12h ago', urgency 1."""
|
||||
frozen = self._frozen_dt(2026, 3, 26, 0, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "12h ago")
|
||||
self.assertEqual(td["time_urgency"], 1)
|
||||
|
||||
def test_get_time_data_started_2d_ago(self):
|
||||
"""Started 2 days ago -> '2d ago', urgency 0."""
|
||||
frozen = self._frozen_dt(2026, 3, 27, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td["time_status"], "2d ago")
|
||||
self.assertEqual(td["time_urgency"], 0)
|
||||
|
||||
def test_get_time_data_abs_time_format(self):
|
||||
"""abs_time is formatted correctly in WIB."""
|
||||
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
|
||||
with patch('browse.datetime', self._mock_datetime(frozen)):
|
||||
from browse import _get_time_data
|
||||
# 19:00 UTC = 02:00 WIB next day
|
||||
td = _get_time_data(self._make_event("2026-03-26T02:00:00Z"))
|
||||
self.assertIn("WIB", td["abs_time"])
|
||||
# UTC 12:00 -> WIB 19:00 same day
|
||||
td2 = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
|
||||
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user