9 Commits

Author SHA1 Message Date
shoko
a7837cec0f Merge #15: Unify duplicate time functions 2026-03-25 14:34:05 +00:00
shoko
8cde441996 Fix #15: Unify duplicate time functions into _get_time_data()
Replace three duplicated time parsing functions with a single
_get_time_data(e, tz) helper returning {time_status, time_urgency, abs_time}.

Deleted functions:
- get_match_time_status(e)  — urgency + status string
- get_match_time_str(e)    — status string only
- get_start_time_wib(e)    — (abs_time, rel_str) tuple

New unified helper:
- _get_time_data(e, tz=None) returns {time_status, time_urgency, abs_time}
- tz defaults to WIB (UTC+7, Indonesia)
- canonical rel_str format: 'LIVE', 'In 6h', '12h ago', etc.
- time_urgency: 0-3 (higher=livelier)

All call sites updated to use _get_time_data():
- format_event(), format_detail_event()
- print_browse(), print_detail()
- send_to_telegram()

Also: removed dead code in print_detail() that called get_match_time_str()
but never used the result.

Tests: 9 new tests for _get_time_data() covering TBD, future, live,
and past event scenarios. 19 tests total, all passing.

Fixes: #15
2026-03-25 13:59:54 +00:00
b2180a4a34 Merge pull request 'Fix #5: HTML injection in Telegram messages' (#20) from fix/issue-5-html-injection-telegram into master 2026-03-25 13:13:52 +01:00
shoko
d0534aedbf Fix #5: HTML injection in Telegram messages
Add escape_html() function to prevent HTML injection in Telegram
parse_mode=HTML messages. Apply escaping to event titles inserted
into <a> tags in send_to_telegram().

- Add escape_html() using stdlib html.escape()
- Escape match event titles (line 648) and non-match titles (line 676)
- Add TestHtmlInjection with 2 tests proving fix:
  - <script> tags escaped as &lt;script&gt;
  - & ampersands escaped as &amp;
- Fixes HIGH severity: titles from Polymarket API were inserted
  without escaping, allowing malformed HTML in Telegram messages
2026-03-25 11:42:42 +00:00
2703b942c1 Merge pull request 'Fix #4: Extract send() to module-level send_telegram_message() for testability' (#19) from fix/issue-4-telegram-token-refactor into master 2026-03-25 12:17:00 +01:00
shoko
f9c4bac7b8 Refactor send() to module-level send_telegram_message() for testability
Extract the nested send() function into a module-level
send_telegram_message(bot_token, chat_id, text, timeout=10)
function. This enables unit testing without hitting the real
Telegram API.

Changes:
- Add send_telegram_message() at module level in TELEGRAM section
- Replace nested send() with thin wrapper that calls
  send_telegram_message()
- Update argparse --telegram help text to use TELEGRAM_BOT_TOKEN
- Add tests/test_browse.py with 8 unit tests covering:
  - Success case (returns message_id)
  - API error (RuntimeError)
  - Invalid token (HTTPError 404)
  - Rate limit (HTTPError 429)
  - Network error (URLError)
  - Timeout (URLError)
  - Custom timeout parameter
  - HTML parse_mode in request

Ref: #4
2026-03-25 11:07:10 +00:00
shoko
c49600cd4d Fix CRITICAL: Telegram bot token exposed in process command line
Replace curl subprocess with urllib.request to prevent token leakage via
ps aux / /proc/*/cmdline. Token now stays in process memory only.

Changes:
- Remove subprocess import, add urllib.parse.urlencode and urllib.request
- Replace curl subprocess call with urlopen(Request(...))
- Change env var BOT_TOKEN -> TELEGRAM_BOT_TOKEN (clearer naming)
- Raise RuntimeError on missing env vars, API errors, or network errors
- Add 10s timeout to urlopen

Fixes #4
2026-03-25 10:46:10 +00:00
shoko
3a988943b9 docs: rename review folder to match skill structure
docs/polymarket-browse/ mirrors skills/polymarket-browse/
Future reviews for this skill can use date-based filenames in the same folder.
2026-03-25 10:02:43 +00:00
shoko
da367c594b docs: add polymarket-browse review (2026-03-25)
Security audit + code quality review of polymarket-browse skill.
Contains 8 security issues, 6 code quality issues, 2 docs issues.
Issues tracked in repo.
2026-03-25 10:00:12 +00:00
5 changed files with 453 additions and 158 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@ __pycache__/
*.pyc
*.pyo
.DS_Store
.worktrees/

View File

@@ -4,11 +4,13 @@ Polymarket Event Browser
Browse tradeable Polymarket events by game category.
"""
import subprocess
import html
import json
import time
import argparse
from datetime import datetime, timezone, timedelta
from urllib.parse import urlencode
from urllib.request import urlopen, Request
# ============================================================
# CONFIG
@@ -17,6 +19,7 @@ from datetime import datetime, timezone, timedelta
PAGE_SIZE = 50
MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 2 # exponential backoff starts at 2s
WIB = timezone(timedelta(hours=7)) # UTC+7 for Indonesian users
GAME_CATEGORIES = {
"All Esports": "Esports",
@@ -219,94 +222,79 @@ def format_spread(bid, ask):
spread = ask - bid
return f"{prob_to_cents(spread)}c"
def get_match_time_status(e):
def _get_time_data(e, tz=None):
"""
Return a human-readable match time status.
Returns (status_str, urgency) where urgency is 0-3 (higher = more urgent/live).
Uses startTime for actual match start time.
Displays times in WIB (UTC+7 for Indonesian users).
Unified time data extraction for event timestamps.
Uses startTime (preferred) or startDate as the event start time.
Datetime parsing and all relative calculations are UTC-based.
The tz parameter only affects the abs_time formatting.
Args:
e: Event dict with 'startTime' or 'startDate' key.
tz: datetime.timezone for abs_time formatting.
Defaults to WIB (UTC+7).
Returns:
{
"time_status": str, # e.g. "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3 (higher = more urgent/live)
"abs_time": str, # e.g. "Mar 25, 19:00 WIB" or "TBD"
}
"""
# Use startTime for actual match start, not startDate (which is market creation time)
tz = tz or WIB
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD", 0
return {"time_status": "TBD", "time_urgency": 0, "abs_time": "TBD"}
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now = now_utc.astimezone(utc7)
start_utc7 = start_dt.astimezone(utc7)
delta = start_dt - now_utc
total_sec = delta.total_seconds()
if delta.total_seconds() < 0:
# Started already
hours_ago = abs(delta.total_seconds()) / 3600
if total_sec < 0:
# Event is in the past
hours_ago = abs(total_sec) / 3600
if hours_ago < 1:
return "LIVE", 3
time_status = "LIVE"
time_urgency = 3
elif hours_ago < 4:
return f"LIVE {int(hours_ago)}h", 3
time_status = f"LIVE {int(hours_ago)}h"
time_urgency = 3
elif hours_ago < 24:
return f"Started {int(hours_ago)}h ago", 1
time_status = f"{int(hours_ago)}h ago"
time_urgency = 1
else:
days = int(hours_ago / 24)
return f"{days}d ago", 0
time_status = f"{days}d ago"
time_urgency = 0
else:
# Starts in future
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
return "LIVE", 3
elif hours_until < 1:
mins = int(delta.total_seconds() / 60)
return f"In {mins}m", 3
elif hours_until < 24:
return f"In {int(hours_until)}h", 2
# Event is in the future
if total_sec < 3600:
mins = int(total_sec / 60)
time_status = f"In {mins}m"
time_urgency = 3
elif total_sec < 86400:
hours_until = int(total_sec / 3600)
time_status = f"In {hours_until}h"
time_urgency = 2
else:
days = int(hours_until / 24)
return f"In {days}d", 1
except:
return "", 0
days = int(total_sec / 86400)
time_status = f"In {days}d"
time_urgency = 1
def get_match_time_str(e):
"""
Return just the time status string (e.g. 'LIVE', 'In 6h', 'In 1d').
Uses startTime for actual match start time.
"""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD"
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
delta = start_dt - now_utc
abs_time = start_dt.astimezone(tz).strftime("%b %d, %H:%M ")
if tz == WIB:
abs_time += "WIB"
else:
abs_time += start_dt.astimezone(tz).strftime("%Z")
return {"time_status": time_status, "time_urgency": time_urgency, "abs_time": abs_time}
except Exception:
return {"time_status": "", "time_urgency": 0, "abs_time": "TBD"}
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
return "LIVE"
elif hours_ago < 4:
return f"LIVE {int(hours_ago)}h"
elif hours_ago < 24:
return f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
return f"{days}d ago"
else:
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
return "LIVE"
elif hours_until < 1:
mins = int(delta.total_seconds() / 60)
return f"In {mins}m"
elif hours_until < 24:
return f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
return f"In {days}d"
except:
return ""
def filter_events(events, tradeable_only=True):
"""
@@ -325,6 +313,7 @@ def filter_events(events, tradeable_only=True):
return match_events, non_match_events
def sort_events(events):
return sorted(events, key=get_ml_volume, reverse=True)
@@ -359,12 +348,12 @@ def format_event(e):
best_bid = float(ml.get("bestBid", 0)) if ml else 0
best_ask = float(ml.get("bestAsk", 0)) if ml else 0
vol = get_ml_volume(e)
time_status, urgency = get_match_time_status(e)
td = _get_time_data(e)
return {
"title": e.get("title", ""),
"time_status": time_status,
"time_urgency": urgency,
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"url": get_event_url(e),
"livestream": e.get("resolutionSource"),
"outcomes": outcomes,
@@ -383,11 +372,12 @@ def format_detail_event(e):
]
active_markets = sorted(active_markets, key=lambda m: float(m.get("volume", 0)), reverse=True)
time_status, urgency = get_match_time_status(e)
td = _get_time_data(e)
return {
"title": e.get("title", ""),
"time_status": time_status,
"time_status": td["time_status"],
"abs_time": td["abs_time"],
"url": get_event_url(e),
"livestream": e.get("resolutionSource"),
"outcomes": json.loads(ml.get("outcomes", "[]")) if ml else [],
@@ -414,48 +404,6 @@ def format_detail_event(e):
# DISPLAY
# ============================================================
def get_start_time_wib(e):
"""Return (date_time_str, relative_str) for display."""
start_str = e.get("startTime") or e.get("startDate", "")
if not start_str:
return "TBD", ""
try:
start_dt = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
start_utc7 = start_dt.astimezone(utc7)
# Absolute: "Mar 25, 19:00 WIB"
abs_str = start_utc7.strftime("%b %d, %H:%M WIB")
# Relative: "In 5h", "In 10h", "LIVE", etc.
delta = start_dt - now_utc
if delta.total_seconds() < 0:
hours_ago = abs(delta.total_seconds()) / 3600
if hours_ago < 1:
rel_str = "LIVE"
elif hours_ago < 24:
rel_str = f"{int(hours_ago)}h ago"
else:
days = int(hours_ago / 24)
rel_str = f"{days}d ago"
else:
hours_until = delta.total_seconds() / 3600
if hours_until <= 0:
rel_str = "LIVE"
elif hours_until < 1:
mins_until = int(delta.total_seconds() / 60)
rel_str = f"In {mins_until}m"
elif hours_until < 24:
rel_str = f"In {int(hours_until)}h"
else:
days = int(hours_until / 24)
rel_str = f"In {days}d"
return abs_str, rel_str
except:
return "TBD", ""
def get_header_date():
"""Return current date string like 'Mar 25, 2026'"""
now_utc = datetime.now(timezone.utc)
@@ -511,7 +459,9 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
vol = f["volume"]
title = f["title"]
url = f["url"]
start_time_wib, rel_time = get_start_time_wib(e)
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
@@ -539,7 +489,9 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
for i, e in enumerate(non_match_events[:non_matches_max], 1):
title = e.get("title", "?")
url = get_event_url(e)
start_time_wib, rel_time = get_start_time_wib(e)
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
@@ -549,17 +501,11 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
def print_detail(e, detail):
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7)
print(f"\n{detail['title']}")
print(f"URL: {detail['url']}")
print(f"Livestream: {detail['livestream']}")
spread_str = format_spread(detail["best_bid"], detail["best_ask"]) if detail["best_bid"] and detail["best_ask"] else "N/A"
time_str = get_match_time_str(e)
print(f"\n{detail['time_status']}")
print(f"ML: {detail['outcomes'][0]} {format_odds(float(detail['prices'][0]))} vs {detail['outcomes'][1]} {format_odds(float(detail['prices'][1]))}")
print(f"ML Vol: ${detail['volume']:,.0f} | {spread_str}")
@@ -576,14 +522,44 @@ def print_detail(e, detail):
# TELEGRAM
# ============================================================
def escape_html(text):
"""Escape HTML-sensitive characters for Telegram parse_mode=HTML."""
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
def send_telegram_message(bot_token, chat_id, text, timeout=10):
"""Send a message via Telegram bot API. Returns the message ID on success.
Raises:
RuntimeError: If the Telegram API returns an error (e.g. invalid token, rate limit).
URLError/HTTPError: On network or HTTP-level failures.
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = urlencode({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "true",
}).encode("utf-8")
req = Request(url, data=data, method="POST")
with urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read())
if not result.get("ok"):
raise RuntimeError(f"Telegram API error: {result.get('description')}")
return result["result"]["message_id"]
def send_to_telegram(match_events, non_match_events, category, matches_only=False, non_matches_only=False):
"""Send browse results to Telegram. Reads BOT_TOKEN and CHAT_ID from environment."""
"""Send browse results to Telegram. Reads TELEGRAM_BOT_TOKEN and CHAT_ID from environment."""
import os
bot_token = os.environ.get("BOT_TOKEN")
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("CHAT_ID")
if not bot_token or not chat_id:
print("WARNING: BOT_TOKEN or CHAT_ID not set in environment. Skipping Telegram send.")
return
raise RuntimeError("TELEGRAM_BOT_TOKEN or CHAT_ID not set in environment")
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
@@ -596,19 +572,8 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
def send(text):
result = subprocess.run(
["curl", "-s", f"https://api.telegram.org/bot{bot_token}/sendMessage",
"-d", f"chat_id={chat_id}",
"-d", f"text={text}",
"-d", "parse_mode=HTML",
"-d", "disable_web_page_preview=true"],
capture_output=True
)
resp = json.loads(result.stdout.decode())
if resp.get("ok"):
print(f" Sent msg {resp['result']['message_id']}")
else:
print(f" Error: {resp.get('description')}")
msg_id = send_telegram_message(bot_token, chat_id, text)
print(f" Sent msg {msg_id}")
# Build sections
lines = [f"<b>{category.upper()}</b> | {header_date}"]
@@ -627,14 +592,16 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
vol = get_ml_volume(e)
title = e.get("title", "?")
url = get_event_url(e)
start_time_wib, rel_time = get_start_time_wib(e)
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
tournament = get_tournament(title)
title_clean = title.split(" - ")[0].strip() if " - " in title else title
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title_clean}</a>")
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
@@ -652,10 +619,12 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
for i, e in enumerate(non_match_events, 1):
title = e.get("title", "?")
url = get_event_url(e)
start_time_wib, rel_time = get_start_time_wib(e)
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{title}</a>")
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
lines.append("")
@@ -737,7 +706,7 @@ def main():
parser.add_argument("--raw", action="store_true",
help="Show all events without tradeable filter (for debugging).")
parser.add_argument("--telegram", action="store_true",
help="Send results to Telegram (BOT_TOKEN and CHAT_ID must be set in environment).")
help="Send results to Telegram (TELEGRAM_BOT_TOKEN and CHAT_ID must be set in environment).")
args = parser.parse_args()
if args.list_categories:

View File

@@ -0,0 +1 @@
# Tests package

View File

@@ -0,0 +1,324 @@
"""
Unit tests for browse.py Telegram functions.
Run with: python -m pytest tests/test_browse.py -v
"""
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from browse import send_telegram_message
class TestSendTelegramMessage(unittest.TestCase):
"""Tests for the module-level send_telegram_message function."""
@patch('browse.urlopen')
def test_send_success(self, mock_urlopen):
"""send_telegram_message returns message_id on success."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 123}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
result = send_telegram_message("test_token", "test_chat", "hello world")
self.assertEqual(result, 123)
mock_urlopen.assert_called_once()
call_args = mock_urlopen.call_args
req = call_args[0][0]
self.assertEqual(req.full_url, "https://api.telegram.org/bottest_token/sendMessage")
self.assertEqual(req.method, "POST")
@patch('browse.urlopen')
def test_send_api_error_raises_runtime_error(self, mock_urlopen):
"""send_telegram_message raises RuntimeError when Telegram API returns ok=false."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": false, "description": "Forbidden"}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
with self.assertRaises(RuntimeError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertIn("Telegram API error: Forbidden", str(ctx.exception))
@patch('browse.urlopen')
def test_send_invalid_token_raises_http_error(self, mock_urlopen):
"""send_telegram_message raises HTTPError on invalid token (404)."""
from urllib.error import HTTPError
mock_urlopen.side_effect = HTTPError(
url="https://api.telegram.org/botINVALID/sendMessage",
code=404,
msg="Not Found",
hdrs={},
fp=None
)
with self.assertRaises(HTTPError) as ctx:
send_telegram_message("INVALID", "test_chat", "hello")
self.assertEqual(ctx.exception.code, 404)
@patch('browse.urlopen')
def test_send_rate_limit_raises_http_error(self, mock_urlopen):
"""send_telegram_message raises HTTPError on rate limit (429)."""
from urllib.error import HTTPError
mock_urlopen.side_effect = HTTPError(
url="https://api.telegram.org/bottest_token/sendMessage",
code=429,
msg="Too Many Requests",
hdrs={},
fp=None
)
with self.assertRaises(HTTPError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertEqual(ctx.exception.code, 429)
@patch('browse.urlopen')
def test_send_network_error_raises_url_error(self, mock_urlopen):
"""send_telegram_message raises URLError on network failure."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("Connection refused")
with self.assertRaises(URLError) as ctx:
send_telegram_message("test_token", "test_chat", "hello")
self.assertIn("Connection refused", str(ctx.exception))
@patch('browse.urlopen')
def test_send_timeout_raises_url_error(self, mock_urlopen):
"""send_telegram_message raises URLError on timeout."""
from urllib.error import URLError
mock_urlopen.side_effect = URLError("<urlopen error TimeoutError: timed out>")
with self.assertRaises(URLError):
send_telegram_message("test_token", "test_chat", "hello")
@patch('browse.urlopen')
def test_send_custom_timeout_used(self, mock_urlopen):
"""send_telegram_message respects custom timeout parameter."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 456}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
send_telegram_message("test_token", "test_chat", "hello", timeout=30)
call_kwargs = mock_urlopen.call_args[1]
self.assertEqual(call_kwargs['timeout'], 30)
@patch('browse.urlopen')
def test_send_html_parsing_mode(self, mock_urlopen):
"""send_telegram_message sends with parse_mode=HTML."""
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true, "result": {"message_id": 789}}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
send_telegram_message("test_token", "test_chat", "<b>bold</b>")
call_args = mock_urlopen.call_args
req = call_args[0][0]
# Verify parse_mode=HTML is in the data
self.assertIn(b"parse_mode=HTML", req.data)
class TestHtmlInjection(unittest.TestCase):
"""Tests for HTML injection prevention in Telegram messages."""
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
@patch('browse.send_telegram_message')
def test_send_to_telegram_html_injection_in_match_title(self, mock_send_msg):
"""
titles in match events are NOT escaped before inserting into HTML.
This test FAILS if HTML chars are unescaped (vulnerable),
and PASSES once escape_html() is implemented.
"""
mock_send_msg.return_value = 123
# Simulate a Polymarket event with HTML injection in the title
malicious_event = {
"title": "<script>alert('XSS')</script> - Team A vs Team B",
"slug": "test-event",
"startTime": "2027-03-26T12:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A", "Team B"]',
"outcomePrices": "[0.55, 0.45]",
"bestBid": "0.54",
"bestAsk": "0.56",
"volume": 50000,
"acceptingOrders": True,
"closed": False,
}],
}
from browse import send_to_telegram
send_to_telegram([malicious_event], [], "Counter Strike")
# Check what was passed to send_telegram_message
self.assertEqual(mock_send_msg.called, True)
sent_text = mock_send_msg.call_args[0][2] # text arg (3rd positional)
# AFTER FIX: <script> should be escaped as &lt;script&gt;
# BEFORE FIX: raw <script> appears in text (vulnerable — test would fail here)
self.assertIn("&lt;script&gt;", sent_text,
"HTML injection still present — title may NOT be escaped")
self.assertIn("&lt;/script&gt;", sent_text)
@patch.dict('os.environ', {'TELEGRAM_BOT_TOKEN': 'test_token', 'CHAT_ID': 'test_chat'})
@patch('browse.send_telegram_message')
def test_send_to_telegram_ampersand_in_title(self, mock_send_msg):
"""
Ampersands in titles should be escaped as &amp; when using HTML parse_mode.
BEFORE fix: "&" appears raw in the HTML (vulnerable).
AFTER fix: "&" appears as "&amp;".
"""
mock_send_msg.return_value = 123
event_with_ampersand = {
"title": "Team A & Team B vs Team C",
"slug": "amp-test",
"startTime": "2027-03-26T12:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A & Team B", "Team C"]',
"outcomePrices": "[0.50, 0.50]",
"bestBid": "0.49",
"bestAsk": "0.51",
"volume": 10000,
"acceptingOrders": True,
"closed": False,
}],
}
from browse import send_to_telegram
send_to_telegram([event_with_ampersand], [], "Dota 2")
sent_text = mock_send_msg.call_args[0][2]
# AFTER FIX: & should be escaped as &amp;
# BEFORE FIX: raw & appears (vulnerable — test would fail here)
self.assertIn("&amp;", sent_text,
"Ampersand not escaped — title may NOT be escaped")
class TestTimeFunctions(unittest.TestCase):
"""Tests for _get_time_data() unified helper.
These tests verify the helper returns correct time_status, time_urgency,
and abs_time for various event scenarios. Callers extract the fields they
need from the returned dict.
"""
def _make_event(self, start_time):
"""Helper to create a minimal event with a startTime."""
return {"startTime": start_time}
def _frozen_dt(self, year, month, day, hour, minute, second=0):
return datetime(year, month, day, hour, minute, second,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
"""Return a mock datetime class that freezes now() to the given datetime."""
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
# === _get_time_data core tests ===
def test_get_time_data_tbd(self):
"""No startTime -> TBD/0urgency/abs TBD."""
from browse import _get_time_data
td = _get_time_data({})
self.assertEqual(td["time_status"], "TBD")
self.assertEqual(td["time_urgency"], 0)
self.assertEqual(td["abs_time"], "TBD")
def test_get_time_data_in_30m(self):
"""Starts in 30 minutes -> 'In 30m', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:30:00Z"))
self.assertEqual(td["time_status"], "In 30m")
self.assertEqual(td["time_urgency"], 3)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_in_6h(self):
"""Starts in 6 hours -> 'In 6h', urgency 2."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T18:00:00Z"))
self.assertEqual(td["time_status"], "In 6h")
self.assertEqual(td["time_urgency"], 2)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_in_2d(self):
"""Starts in 2 days -> 'In 2d', urgency 1."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-27T12:00:00Z"))
self.assertEqual(td["time_status"], "In 2d")
self.assertEqual(td["time_urgency"], 1)
def test_get_time_data_live(self):
"""Started 30 minutes ago -> 'LIVE', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 12, 30, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "LIVE")
self.assertEqual(td["time_urgency"], 3)
self.assertIn("WIB", td["abs_time"])
def test_get_time_data_started_2h_ago(self):
"""Started 2 hours ago -> 'LIVE 2h', urgency 3."""
frozen = self._frozen_dt(2026, 3, 25, 14, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "LIVE 2h")
self.assertEqual(td["time_urgency"], 3)
def test_get_time_data_started_12h_ago(self):
"""Started 12 hours ago -> '12h ago', urgency 1."""
frozen = self._frozen_dt(2026, 3, 26, 0, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "12h ago")
self.assertEqual(td["time_urgency"], 1)
def test_get_time_data_started_2d_ago(self):
"""Started 2 days ago -> '2d ago', urgency 0."""
frozen = self._frozen_dt(2026, 3, 27, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
td = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td["time_status"], "2d ago")
self.assertEqual(td["time_urgency"], 0)
def test_get_time_data_abs_time_format(self):
"""abs_time is formatted correctly in WIB."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import _get_time_data
# 19:00 UTC = 02:00 WIB next day
td = _get_time_data(self._make_event("2026-03-26T02:00:00Z"))
self.assertIn("WIB", td["abs_time"])
# UTC 12:00 -> WIB 19:00 same day
td2 = _get_time_data(self._make_event("2026-03-25T12:00:00Z"))
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
if __name__ == "__main__":
unittest.main()