Fix #14: Refactor print_browse/send_to_telegram into single pipeline #22

Merged
shoko merged 3 commits from fix/issue-14-refactor-browse into master 2026-03-25 20:11:07 +01:00
2 changed files with 685 additions and 115 deletions
Showing only changes of commit 3a9f8fb365 - Show all commits

View File

@@ -338,7 +338,176 @@ def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True):
} }
# ============================================================ # ============================================================
# FORMAT # FORMAT — EVENT
# ============================================================
def format_match_event(e):
"""
Format a match event into a canonical dict for rendering.
All computing done here; renderers just template.
Returns:
{
"title": str, # raw title
"title_clean": str, # "Team A vs Team B"
"tournament": str, # "Tournament Name" or ""
"url": str,
"time_status": str, # "LIVE", "In 6h", "12h ago"
"time_urgency": int, # 0-3
"abs_time": str, # "Mar 25, 19:00 WIB"
"team_a": str,
"team_b": str,
"odds_a": str, # "55c"
"odds_b": str,
"vol": int,
}
"""
ml = get_ml_market(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else []
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
td = _get_time_data(e)
title = e.get("title", "")
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
if " - " in title:
title_clean = title.split(" - ")[0].strip()
else:
title_clean = title
tournament = get_tournament(title)
return {
"title": title,
"title_clean": title_clean,
"tournament": tournament,
"url": get_event_url(e),
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"abs_time": td["abs_time"],
"team_a": team_a,
"team_b": team_b,
"odds_a": odds_a,
"odds_b": odds_b,
"vol": get_ml_volume(e),
}
def format_non_match_event(e):
"""
Format a non-match event into a canonical dict for rendering.
Returns:
{
"title": str,
"url": str,
"time_status": str,
"time_urgency": int,
"abs_time": str,
"market_count": int,
"total_vol": int,
}
"""
td = _get_time_data(e)
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
return {
"title": e.get("title", "?"),
"url": get_event_url(e),
"time_status": td["time_status"],
"time_urgency": td["time_urgency"],
"abs_time": td["abs_time"],
"market_count": market_count,
"total_vol": int(total_vol),
}
# ============================================================
# FORMAT — RENDER
# ============================================================
def render_match_lines(event_dict, i, mode):
"""
Render a formatted match event dict into lines of text.
Args:
event_dict: canonical dict from format_match_event()
i: 1-based index for the event number
mode: "text" for plain text/Markdown, "html" for Telegram HTML
Returns:
List[str], one line per element (no trailing blank line).
Caller adds the blank line separator between events.
"""
title_clean = event_dict["title_clean"]
url = event_dict["url"]
abs_time = event_dict["abs_time"]
time_status = event_dict["time_status"]
vol = event_dict["vol"]
tournament = event_dict["tournament"]
team_a = event_dict["team_a"]
team_b = event_dict["team_b"]
odds_a = event_dict["odds_a"]
odds_b = event_dict["odds_b"]
lines = []
if mode == "html":
lines.append(
f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>"
)
else:
lines.append(f"{i}. [{title_clean}]({url})")
lines.append(f" {abs_time} | {time_status}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
lines.append(f" Tournament: {tournament}")
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
return lines
def render_non_match_lines(event_dict, i, mode):
"""
Render a formatted non-match event dict into lines of text.
Args:
event_dict: canonical dict from format_non_match_event()
i: 1-based index for the event number
mode: "text" for plain text/Markdown, "html" for Telegram HTML
Returns:
List[str], one line per element (no trailing blank line).
"""
title = event_dict["title"]
url = event_dict["url"]
abs_time = event_dict["abs_time"]
time_status = event_dict["time_status"]
market_count = event_dict["market_count"]
total_vol = event_dict["total_vol"]
lines = []
if mode == "html":
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
else:
lines.append(f"{i}. [{title}]({url})")
lines.append(f" {abs_time} | {time_status}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
return lines
# ============================================================
# FORMAT — LEGACY
# ============================================================ # ============================================================
def format_event(e): def format_event(e):
@@ -425,18 +594,17 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
header_date = get_header_date() header_date = get_header_date()
print(f"\n=== {category.upper()}{' [RAW]' if raw_mode else ''} ===") print(f"\n=== {category.upper()}{' [RAW]' if raw_mode else ''} ===")
print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}") print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}")
if raw_mode: if raw_mode:
print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}") print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}")
if partial: if partial:
print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete") print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete")
# --- MATCH MARKETS --- # Determine sections to show
if not matches_only and not non_matches_only: if not matches_only and not non_matches_only:
# Default: show both
show_matches = True show_matches = True
show_non_matches = True show_non_matches = True
elif matches_only: elif matches_only:
@@ -445,60 +613,25 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc
else: else:
show_matches = False show_matches = False
show_non_matches = True show_non_matches = True
# Match events
if show_matches: if show_matches:
print(f"\nMATCH MARKETS") print("\nMATCH MARKETS")
if not match_events: if not match_events:
print(" No match markets found.") print(" No match markets found.")
else: else:
for i, e in enumerate(match_events, 1): for i, e in enumerate(match_events, 1):
f = format_event(e) fd = format_match_event(e)
ml = get_ml_market(e) for line in render_match_lines(fd, i, mode="text"):
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] print(line)
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
vol = f["volume"]
title = f["title"]
url = f["url"]
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
if " - " in title:
title_clean = title.split(" - ")[0].strip()
else:
title_clean = title
tournament = get_tournament(title)
print(f"\n {i}. [{title_clean}]({url})")
print(f" {start_time_wib} | {rel_time}")
print(f" Vol: ${vol:,.0f}")
if tournament:
print(f" Tournament: {tournament}")
print(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
# --- NON-MATCH MARKETS ---
if show_non_matches and non_match_events:
print(f"\nNON-MATCH MARKETS")
for i, e in enumerate(non_match_events[:non_matches_max], 1):
title = e.get("title", "?")
url = get_event_url(e)
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) # Non-match events
market_count = len(e.get("markets", [])) if show_non_matches and non_match_events:
print("\nNON-MATCH MARKETS")
print(f"\n {i}. [{title}]({url})") for i, e in enumerate(non_match_events[:non_matches_max], 1):
print(f" {start_time_wib} | {rel_time}") fd = format_non_match_event(e)
print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}") for line in render_non_match_lines(fd, i, mode="text"):
print(line)
def print_detail(e, detail): def print_detail(e, detail):
print(f"\n{detail['title']}") print(f"\n{detail['title']}")
@@ -566,104 +699,89 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
utc7 = timezone(timedelta(hours=7)) utc7 = timezone(timedelta(hours=7))
now_utc7 = now_utc.astimezone(utc7) now_utc7 = now_utc.astimezone(utc7)
header_date = now_utc7.strftime("%b %d, %Y") header_date = now_utc7.strftime("%b %d, %Y")
# Determine sections to show # Determine sections to show
show_matches = (not matches_only and not non_matches_only) or matches_only show_matches = (not matches_only and not non_matches_only) or matches_only
show_non_matches = (not matches_only and not non_matches_only) or non_matches_only show_non_matches = (not matches_only and not non_matches_only) or non_matches_only
def send(text): def send(text):
msg_id = send_telegram_message(bot_token, chat_id, text) msg_id = send_telegram_message(bot_token, chat_id, text)
print(f" Sent msg {msg_id}") print(f" Sent msg {msg_id}")
# Build sections # Build lines
lines = [f"<b>{category.upper()}</b> | {header_date}"] lines = [f"<b>{category.upper()}</b> | {header_date}", ""]
lines.append("")
if show_matches: if show_matches:
lines.append("MATCH MARKETS") lines += ["MATCH MARKETS", ""]
lines.append("")
if not match_events: if not match_events:
lines.append(" No match markets found.") lines.append(" No match markets found.")
else: else:
for i, e in enumerate(match_events, 1): for i, e in enumerate(match_events, 1):
ml = get_ml_market(e) fd = format_match_event(e)
outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] lines += render_match_lines(fd, i, mode="html")
prices = json.loads(ml.get("outcomePrices", "[]")) if ml else []
vol = get_ml_volume(e)
title = e.get("title", "?")
url = get_event_url(e)
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
team_a = outcomes[0] if len(outcomes) > 0 else "?"
team_b = outcomes[1] if len(outcomes) > 1 else "?"
odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?"
odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?"
tournament = get_tournament(title)
title_clean = title.split(" - ")[0].strip() if " - " in title else title
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title_clean)}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Vol: ${vol:,.0f}")
if tournament:
lines.append(f" Tournament: {tournament}")
lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}")
lines.append("") lines.append("")
lines.append("") lines.append("")
if show_non_matches: if show_non_matches:
lines.append("NON-MATCH MARKETS") lines += ["NON-MATCH MARKETS", ""]
lines.append("")
if not non_match_events: if not non_match_events:
lines.append(" No non-match markets found.") lines.append(" No non-match markets found.")
else: else:
for i, e in enumerate(non_match_events, 1): for i, e in enumerate(non_match_events, 1):
title = e.get("title", "?") fd = format_non_match_event(e)
url = get_event_url(e) lines += render_non_match_lines(fd, i, mode="html")
td = _get_time_data(e)
start_time_wib = td["abs_time"]
rel_time = td["time_status"]
total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", []))
market_count = len(e.get("markets", []))
lines.append(f"<b>{i}.</b> <a href=\"{url}\">{escape_html(title)}</a>")
lines.append(f" {start_time_wib} | {rel_time}")
lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}")
lines.append("") lines.append("")
lines.append("")
# Chunk by 10 items (events), respecting 4096 char Telegram limit
text = "\n".join(lines) # Chunk and send
send_chunked(lines, send, category, header_date, show_matches, show_non_matches)
def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches):
"""
Split already-built lines into Telegram-safe chunks and send them.
Telegram messages are capped at 4096 chars. Chunks are grouped by
section header so no event is split across messages.
Args:
all_lines: Full message lines list (built by caller).
send_fn: Closure that sends a single string and prints confirmation.
category: Category name for header.
header_date: Date string for header.
show_matches: Whether MATCH MARKETS section is present.
show_non_matches: Whether NON-MATCH MARKETS section is present.
"""
text = "\n".join(all_lines)
if len(text) <= 4096: if len(text) <= 4096:
send(text) send_fn(text)
return return
# Split into chunks of 10 events # Split into chunks of 10 events, respecting section headers
all_items = [] all_items = []
in_match = True in_match = True
for line in lines: for line in all_lines:
if line == "MATCH MARKETS": if line == "MATCH MARKETS":
in_match = True in_match = True
elif line == "NON-MATCH MARKETS": elif line == "NON-MATCH MARKETS":
in_match = False in_match = False
elif line.startswith("<b>") and ". " in line and "</a>" in line: elif line.startswith("<b>") and "</a>" in line:
# Event title line: <b>1.</b> <a href="...">Title</a>
all_items.append((in_match, line)) all_items.append((in_match, line))
chunk = [] chunk = []
chunk_len = 0
chunk_num = 1
# Header is always first
header = f"<b>{category.upper()}</b> | {header_date}\n" header = f"<b>{category.upper()}</b> | {header_date}\n"
if show_matches: if show_matches:
header += "\nMATCH MARKETS\n\n" header += "\nMATCH MARKETS\n\n"
if show_non_matches: if show_non_matches:
header += "\nNON-MATCH MARKETS\n\n" header += "\nNON-MATCH MARKETS\n\n"
for is_match, item_line in all_items: for is_match, item_line in all_items:
test_chunk = chunk + [item_line, ""] test_chunk = chunk + [item_line, ""]
test_text = header + "\n".join(chunk) + "\n".join(test_chunk) test_text = header + "\n".join(chunk) + "\n".join(test_chunk)
if len(test_text) > 4096 or len(chunk) >= 10: if len(test_text) > 4096 or len(chunk) >= 10:
# Send current chunk
msg = header + "\n".join(chunk) msg = header + "\n".join(chunk)
send(msg) send_fn(msg)
chunk = [item_line, ""] chunk = [item_line, ""]
header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n" header = f"<b>{category.upper()}</b> (cont.) | {header_date}\n"
if show_matches and is_match: if show_matches and is_match:
@@ -672,10 +790,10 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals
header += "\nNON-MATCH MARKETS\n\n" header += "\nNON-MATCH MARKETS\n\n"
else: else:
chunk.extend([item_line, ""]) chunk.extend([item_line, ""])
if chunk: if chunk:
msg = header + "\n".join(chunk) msg = header + "\n".join(chunk)
send(msg) send_fn(msg)
# ============================================================ # ============================================================

View File

@@ -320,5 +320,457 @@ class TestTimeFunctions(unittest.TestCase):
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB") self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
class TestFormatMatchEvent(unittest.TestCase):
"""Tests for format_match_event() canonical dict."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
def _make_event(self, title, ml_market=None, start_time="2026-03-25T18:00:00Z"):
import json as _json
e = {
"title": title,
"slug": "test-slug",
"startTime": start_time,
"markets": [],
}
if ml_market:
e["markets"].append(ml_market)
return e
def _make_ml_market(self, outcomes, prices, vol=50000):
import json
return {
"sportsMarketType": "moneyline",
"outcomes": json.dumps(outcomes),
"outcomePrices": json.dumps(prices),
"bestBid": str(float(prices[0]) - 0.01) if prices else "0.49",
"bestAsk": str(float(prices[0]) + 0.01) if prices else "0.51",
"volume": str(vol),
"acceptingOrders": True,
"closed": False,
}
def test_fields_present(self):
"""All canonical fields are present and non-null."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B - ESL Pro League",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertIn("title", fd)
self.assertIn("title_clean", fd)
self.assertIn("tournament", fd)
self.assertIn("url", fd)
self.assertIn("time_status", fd)
self.assertIn("time_urgency", fd)
self.assertIn("abs_time", fd)
self.assertIn("team_a", fd)
self.assertIn("team_b", fd)
self.assertIn("odds_a", fd)
self.assertIn("odds_b", fd)
self.assertIn("vol", fd)
def test_title_clean_no_tournament(self):
"""title_clean strips tournament suffix after ' - '."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B - ESL Pro League",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
self.assertEqual(fd["tournament"], "ESL Pro League")
def test_title_clean_no_dash(self):
"""title_clean is unchanged when no ' - ' separator."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
self.assertEqual(fd["tournament"], "")
def test_missing_ml(self):
"""Returns valid dict with '?' fallbacks when no ML market."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event("Team A vs Team B")
fd = format_match_event(e)
self.assertEqual(fd["team_a"], "?")
self.assertEqual(fd["team_b"], "?")
self.assertEqual(fd["odds_a"], "?")
self.assertEqual(fd["odds_b"], "?")
self.assertEqual(fd["vol"], 0)
def test_missing_outcomes(self):
"""Handles empty outcomes list gracefully."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Team A vs Team B",
self._make_ml_market([], []),
)
fd = format_match_event(e)
self.assertEqual(fd["team_a"], "?")
self.assertEqual(fd["team_b"], "?")
def test_time_data_passed_through(self):
"""Time fields come from _get_time_data."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Team A vs Team B",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
start_time="2026-03-25T18:00:00Z", # 6h in future
)
fd = format_match_event(e)
self.assertEqual(fd["time_status"], "In 6h")
self.assertEqual(fd["time_urgency"], 2)
self.assertIn("WIB", fd["abs_time"])
class TestFormatNonMatchEvent(unittest.TestCase):
"""Tests for format_non_match_event() canonical dict."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
def test_fields_present(self):
"""All canonical fields are present."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Will it rain in Jakarta?",
"slug": "rain-jakarta",
"startTime": "2026-03-25T18:00:00Z",
"markets": [
{"volume": "10000"},
{"volume": "5000"},
],
}
fd = format_non_match_event(e)
self.assertIn("title", fd)
self.assertIn("url", fd)
self.assertIn("time_status", fd)
self.assertIn("time_urgency", fd)
self.assertIn("abs_time", fd)
self.assertIn("market_count", fd)
self.assertIn("total_vol", fd)
def test_market_stats(self):
"""market_count and total_vol computed correctly."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Test",
"slug": "test",
"startTime": "2026-03-25T18:00:00Z",
"markets": [
{"volume": "10000"},
{"volume": "5000"},
],
}
fd = format_non_match_event(e)
self.assertEqual(fd["market_count"], 2)
self.assertEqual(fd["total_vol"], 15000)
def test_time_passed_through(self):
"""Time fields come from _get_time_data."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Test",
"slug": "test",
"startTime": "2026-03-25T18:00:00Z",
"markets": [],
}
fd = format_non_match_event(e)
self.assertEqual(fd["time_status"], "In 6h")
class TestRenderMatchLines(unittest.TestCase):
"""Tests for render_match_lines() text and HTML output."""
def test_text_mode_exact_lines(self):
"""text mode produces expected plain text lines."""
from browse import render_match_lines
fd = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 1, mode="text")
self.assertEqual(lines[0], "1. [Team A vs Team B](https://polymarket.com/market/test)")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Vol: $50,000")
self.assertEqual(lines[3], " Tournament: ESL Pro League")
self.assertEqual(lines[4], " Odds: Team A 55c | 45c Team B")
def test_text_mode_no_tournament(self):
"""text mode omits Tournament line when tournament is empty."""
from browse import render_match_lines
fd = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 2, mode="text")
self.assertEqual(len(lines), 4)
self.assertEqual(lines[0], "2. [Team A vs Team B](https://polymarket.com/market/test)")
self.assertNotIn("Tournament", lines[3])
def test_html_mode_exact(self):
"""html mode produces expected HTML lines with escape_html."""
from browse import render_match_lines
fd = {
"title_clean": "Team A & Team B vs Team C",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "LIVE",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A & Team B",
"team_b": "Team C",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 1, mode="html")
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/market/test\">Team A &amp; Team B vs Team C</a>")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | LIVE")
self.assertEqual(lines[2], " Vol: $50,000")
self.assertEqual(lines[3], " Tournament: ESL Pro League")
self.assertEqual(lines[4], " Odds: Team A & Team B 55c | 45c Team C")
def test_html_mode_xss_prevention(self):
"""html mode escapes < and > to prevent XSS."""
from browse import render_match_lines
fd = {
"title_clean": "<script>alert('xss')</script>",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "LIVE",
"vol": 1000,
"tournament": "",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "50c",
"odds_b": "50c",
}
lines = render_match_lines(fd, 1, mode="html")
self.assertIn("&lt;script&gt;", lines[0])
self.assertNotIn("<script>", lines[0])
class TestRenderNonMatchLines(unittest.TestCase):
"""Tests for render_non_match_lines() text and HTML output."""
def test_text_mode_exact_lines(self):
"""text mode produces expected plain text lines."""
from browse import render_non_match_lines
fd = {
"title": "Will it rain in Jakarta?",
"url": "https://polymarket.com/event/rain-jakarta",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"market_count": 3,
"total_vol": 25000,
}
lines = render_non_match_lines(fd, 1, mode="text")
self.assertEqual(lines[0], "1. [Will it rain in Jakarta?](https://polymarket.com/event/rain-jakarta)")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Markets: 3 | Total Vol: $25,000")
def test_html_mode_exact(self):
"""html mode produces expected HTML lines with escape_html."""
from browse import render_non_match_lines
fd = {
"title": "Rain <or> Sun?",
"url": "https://polymarket.com/event/rain-sun",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"market_count": 2,
"total_vol": 10000,
}
lines = render_non_match_lines(fd, 1, mode="html")
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/event/rain-sun\">Rain &lt;or&gt; Sun?</a>")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Markets: 2 | Total Vol: $10,000")
class TestPrintBrowseIntegration(unittest.TestCase):
"""Integration tests for print_browse using the new pipeline."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
@patch('builtins.print')
def test_print_browse_uses_new_pipeline(self, mock_print):
"""print_browse calls format_match_event and render_match_lines."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import print_browse
match_events = [{
"title": "Counter Strike: Team A vs Team B - ESL Pro League",
"slug": "csa",
"startTime": "2026-03-25T18:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A", "Team B"]',
"outcomePrices": "[0.55, 0.45]",
"bestBid": "0.54",
"bestAsk": "0.56",
"volume": "50000",
"acceptingOrders": True,
"closed": False,
}],
}]
with patch('browse.format_match_event') as mock_fmt, \
patch('browse.render_match_lines') as mock_render:
mock_fmt.return_value = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/csa",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
mock_render.return_value = [
"1. [Team A vs Team B](https://polymarket.com/market/csa)",
" Mar 25, 19:00 WIB | In 6h",
" Vol: $50,000",
" Tournament: ESL Pro League",
" Odds: Team A 55c | 45c Team B",
]
print_browse(match_events, [], "Counter Strike", 1, 1, 1, 0,
non_matches_max=5)
mock_fmt.assert_called_once_with(match_events[0])
mock_render.assert_called_once_with(mock_fmt.return_value, 1, mode="text")
@patch('builtins.print')
def test_print_browse_matches_only(self, mock_print):
"""matches_only suppresses non-match section."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import print_browse
with patch('browse.format_non_match_event') as mock_non_fmt:
print_browse([], [], "Counter Strike", 0, 0, 0, 0,
non_matches_max=5, matches_only=True)
mock_non_fmt.assert_not_called()
class TestSendChunked(unittest.TestCase):
"""Tests for send_chunked() helper."""
def test_small_message_sent_directly(self):
"""Messages under 4096 chars go through without chunking."""
sent_texts = []
def fake_send(text):
sent_texts.append(text)
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", "", "MATCH MARKETS", "", "1. test"]
# This fits in one message
from browse import send_chunked
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
show_matches=True, show_non_matches=False)
self.assertEqual(len(sent_texts), 1)
def test_chunked_message_gets_cont_header(self):
"""Messages over 4096 chars get continuation header."""
sent_texts = []
def fake_send(text):
sent_texts.append(text)
# Build enough content to exceed 4096 chars
# Each event line: ~260 chars. Need ~16 events + headers (~4200 chars)
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", ""]
for i in range(16):
lines += [f"<b>{i+1}.</b> <a href=\"https://polymarket.com/market/{i}\">Team {'X' * 250}</a>", " Mar 25, 19:00 WIB | In 6h", " Vol: $50,000", " Odds: TeamA 55c | 45c TeamB", ""]
lines.append("")
from browse import send_chunked
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
show_matches=True, show_non_matches=False)
# Should have sent more than one message (chunked)
self.assertGreater(len(sent_texts), 1)
# At least one continuation message
cont_found = any("(cont.)" in t for t in sent_texts)
self.assertTrue(cont_found, f"Expected at least one '(cont.)' message. Got {len(sent_texts)} messages.")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()