Fix #14: Refactor print_browse/send_to_telegram into single pipeline

Replace duplicate inline formatting with unified format+render pipeline.

New functions:
- format_match_event(e) — canonical dict for match events
- format_non_match_event(e) — canonical dict for non-match events
- render_match_lines(event_dict, i, mode) — text/HTML renderer
- render_non_match_lines(event_dict, i, mode) — text/HTML renderer
- send_chunked(...) — extracted Telegram chunking logic

Also fixed send_chunked() chunking bug: the original '. ' in line
check never matched event lines (period is followed by '</b>' not space).

Tests: 38 total, all passing.

Fixes: #14
This commit is contained in:
shoko
2026-03-25 17:47:03 +00:00
parent 8cde441996
commit d018e87b35
2 changed files with 685 additions and 115 deletions

View File

@@ -320,5 +320,457 @@ class TestTimeFunctions(unittest.TestCase):
self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB")
class TestFormatMatchEvent(unittest.TestCase):
"""Tests for format_match_event() canonical dict."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
def _make_event(self, title, ml_market=None, start_time="2026-03-25T18:00:00Z"):
import json as _json
e = {
"title": title,
"slug": "test-slug",
"startTime": start_time,
"markets": [],
}
if ml_market:
e["markets"].append(ml_market)
return e
def _make_ml_market(self, outcomes, prices, vol=50000):
import json
return {
"sportsMarketType": "moneyline",
"outcomes": json.dumps(outcomes),
"outcomePrices": json.dumps(prices),
"bestBid": str(float(prices[0]) - 0.01) if prices else "0.49",
"bestAsk": str(float(prices[0]) + 0.01) if prices else "0.51",
"volume": str(vol),
"acceptingOrders": True,
"closed": False,
}
def test_fields_present(self):
"""All canonical fields are present and non-null."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B - ESL Pro League",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertIn("title", fd)
self.assertIn("title_clean", fd)
self.assertIn("tournament", fd)
self.assertIn("url", fd)
self.assertIn("time_status", fd)
self.assertIn("time_urgency", fd)
self.assertIn("abs_time", fd)
self.assertIn("team_a", fd)
self.assertIn("team_b", fd)
self.assertIn("odds_a", fd)
self.assertIn("odds_b", fd)
self.assertIn("vol", fd)
def test_title_clean_no_tournament(self):
"""title_clean strips tournament suffix after ' - '."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B - ESL Pro League",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
self.assertEqual(fd["tournament"], "ESL Pro League")
def test_title_clean_no_dash(self):
"""title_clean is unchanged when no ' - ' separator."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Counter Strike: Team A vs Team B",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
)
fd = format_match_event(e)
self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B")
self.assertEqual(fd["tournament"], "")
def test_missing_ml(self):
"""Returns valid dict with '?' fallbacks when no ML market."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event("Team A vs Team B")
fd = format_match_event(e)
self.assertEqual(fd["team_a"], "?")
self.assertEqual(fd["team_b"], "?")
self.assertEqual(fd["odds_a"], "?")
self.assertEqual(fd["odds_b"], "?")
self.assertEqual(fd["vol"], 0)
def test_missing_outcomes(self):
"""Handles empty outcomes list gracefully."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Team A vs Team B",
self._make_ml_market([], []),
)
fd = format_match_event(e)
self.assertEqual(fd["team_a"], "?")
self.assertEqual(fd["team_b"], "?")
def test_time_data_passed_through(self):
"""Time fields come from _get_time_data."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_match_event
e = self._make_event(
"Team A vs Team B",
self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]),
start_time="2026-03-25T18:00:00Z", # 6h in future
)
fd = format_match_event(e)
self.assertEqual(fd["time_status"], "In 6h")
self.assertEqual(fd["time_urgency"], 2)
self.assertIn("WIB", fd["abs_time"])
class TestFormatNonMatchEvent(unittest.TestCase):
"""Tests for format_non_match_event() canonical dict."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
def test_fields_present(self):
"""All canonical fields are present."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Will it rain in Jakarta?",
"slug": "rain-jakarta",
"startTime": "2026-03-25T18:00:00Z",
"markets": [
{"volume": "10000"},
{"volume": "5000"},
],
}
fd = format_non_match_event(e)
self.assertIn("title", fd)
self.assertIn("url", fd)
self.assertIn("time_status", fd)
self.assertIn("time_urgency", fd)
self.assertIn("abs_time", fd)
self.assertIn("market_count", fd)
self.assertIn("total_vol", fd)
def test_market_stats(self):
"""market_count and total_vol computed correctly."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Test",
"slug": "test",
"startTime": "2026-03-25T18:00:00Z",
"markets": [
{"volume": "10000"},
{"volume": "5000"},
],
}
fd = format_non_match_event(e)
self.assertEqual(fd["market_count"], 2)
self.assertEqual(fd["total_vol"], 15000)
def test_time_passed_through(self):
"""Time fields come from _get_time_data."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import format_non_match_event
e = {
"title": "Test",
"slug": "test",
"startTime": "2026-03-25T18:00:00Z",
"markets": [],
}
fd = format_non_match_event(e)
self.assertEqual(fd["time_status"], "In 6h")
class TestRenderMatchLines(unittest.TestCase):
"""Tests for render_match_lines() text and HTML output."""
def test_text_mode_exact_lines(self):
"""text mode produces expected plain text lines."""
from browse import render_match_lines
fd = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 1, mode="text")
self.assertEqual(lines[0], "1. [Team A vs Team B](https://polymarket.com/market/test)")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Vol: $50,000")
self.assertEqual(lines[3], " Tournament: ESL Pro League")
self.assertEqual(lines[4], " Odds: Team A 55c | 45c Team B")
def test_text_mode_no_tournament(self):
"""text mode omits Tournament line when tournament is empty."""
from browse import render_match_lines
fd = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 2, mode="text")
self.assertEqual(len(lines), 4)
self.assertEqual(lines[0], "2. [Team A vs Team B](https://polymarket.com/market/test)")
self.assertNotIn("Tournament", lines[3])
def test_html_mode_exact(self):
"""html mode produces expected HTML lines with escape_html."""
from browse import render_match_lines
fd = {
"title_clean": "Team A & Team B vs Team C",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "LIVE",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A & Team B",
"team_b": "Team C",
"odds_a": "55c",
"odds_b": "45c",
}
lines = render_match_lines(fd, 1, mode="html")
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/market/test\">Team A &amp; Team B vs Team C</a>")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | LIVE")
self.assertEqual(lines[2], " Vol: $50,000")
self.assertEqual(lines[3], " Tournament: ESL Pro League")
self.assertEqual(lines[4], " Odds: Team A & Team B 55c | 45c Team C")
def test_html_mode_xss_prevention(self):
"""html mode escapes < and > to prevent XSS."""
from browse import render_match_lines
fd = {
"title_clean": "<script>alert('xss')</script>",
"url": "https://polymarket.com/market/test",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "LIVE",
"vol": 1000,
"tournament": "",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "50c",
"odds_b": "50c",
}
lines = render_match_lines(fd, 1, mode="html")
self.assertIn("&lt;script&gt;", lines[0])
self.assertNotIn("<script>", lines[0])
class TestRenderNonMatchLines(unittest.TestCase):
"""Tests for render_non_match_lines() text and HTML output."""
def test_text_mode_exact_lines(self):
"""text mode produces expected plain text lines."""
from browse import render_non_match_lines
fd = {
"title": "Will it rain in Jakarta?",
"url": "https://polymarket.com/event/rain-jakarta",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"market_count": 3,
"total_vol": 25000,
}
lines = render_non_match_lines(fd, 1, mode="text")
self.assertEqual(lines[0], "1. [Will it rain in Jakarta?](https://polymarket.com/event/rain-jakarta)")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Markets: 3 | Total Vol: $25,000")
def test_html_mode_exact(self):
"""html mode produces expected HTML lines with escape_html."""
from browse import render_non_match_lines
fd = {
"title": "Rain <or> Sun?",
"url": "https://polymarket.com/event/rain-sun",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"market_count": 2,
"total_vol": 10000,
}
lines = render_non_match_lines(fd, 1, mode="html")
self.assertEqual(lines[0], "<b>1.</b> <a href=\"https://polymarket.com/event/rain-sun\">Rain &lt;or&gt; Sun?</a>")
self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h")
self.assertEqual(lines[2], " Markets: 2 | Total Vol: $10,000")
class TestPrintBrowseIntegration(unittest.TestCase):
"""Integration tests for print_browse using the new pipeline."""
def _frozen_dt(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute,
tzinfo=timezone.utc)
def _mock_datetime(self, frozen):
class MockDatetime:
@staticmethod
def now(tz=None):
if tz is None:
return frozen
return frozen.astimezone(tz)
fromisoformat = staticmethod(datetime.fromisoformat)
def __call__(self, *a, **k):
return datetime(*a, **k)
return MockDatetime
@patch('builtins.print')
def test_print_browse_uses_new_pipeline(self, mock_print):
"""print_browse calls format_match_event and render_match_lines."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import print_browse
match_events = [{
"title": "Counter Strike: Team A vs Team B - ESL Pro League",
"slug": "csa",
"startTime": "2026-03-25T18:00:00Z",
"markets": [{
"sportsMarketType": "moneyline",
"outcomes": '["Team A", "Team B"]',
"outcomePrices": "[0.55, 0.45]",
"bestBid": "0.54",
"bestAsk": "0.56",
"volume": "50000",
"acceptingOrders": True,
"closed": False,
}],
}]
with patch('browse.format_match_event') as mock_fmt, \
patch('browse.render_match_lines') as mock_render:
mock_fmt.return_value = {
"title_clean": "Team A vs Team B",
"url": "https://polymarket.com/market/csa",
"abs_time": "Mar 25, 19:00 WIB",
"time_status": "In 6h",
"vol": 50000,
"tournament": "ESL Pro League",
"team_a": "Team A",
"team_b": "Team B",
"odds_a": "55c",
"odds_b": "45c",
}
mock_render.return_value = [
"1. [Team A vs Team B](https://polymarket.com/market/csa)",
" Mar 25, 19:00 WIB | In 6h",
" Vol: $50,000",
" Tournament: ESL Pro League",
" Odds: Team A 55c | 45c Team B",
]
print_browse(match_events, [], "Counter Strike", 1, 1, 1, 0,
non_matches_max=5)
mock_fmt.assert_called_once_with(match_events[0])
mock_render.assert_called_once_with(mock_fmt.return_value, 1, mode="text")
@patch('builtins.print')
def test_print_browse_matches_only(self, mock_print):
"""matches_only suppresses non-match section."""
frozen = self._frozen_dt(2026, 3, 25, 12, 0)
with patch('browse.datetime', self._mock_datetime(frozen)):
from browse import print_browse
with patch('browse.format_non_match_event') as mock_non_fmt:
print_browse([], [], "Counter Strike", 0, 0, 0, 0,
non_matches_max=5, matches_only=True)
mock_non_fmt.assert_not_called()
class TestSendChunked(unittest.TestCase):
"""Tests for send_chunked() helper."""
def test_small_message_sent_directly(self):
"""Messages under 4096 chars go through without chunking."""
sent_texts = []
def fake_send(text):
sent_texts.append(text)
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", "", "MATCH MARKETS", "", "1. test"]
# This fits in one message
from browse import send_chunked
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
show_matches=True, show_non_matches=False)
self.assertEqual(len(sent_texts), 1)
def test_chunked_message_gets_cont_header(self):
"""Messages over 4096 chars get continuation header."""
sent_texts = []
def fake_send(text):
sent_texts.append(text)
# Build enough content to exceed 4096 chars
# Each event line: ~260 chars. Need ~16 events + headers (~4200 chars)
lines = ["<b>COUNTER STRIKE</b> | Mar 25, 2026", ""]
for i in range(16):
lines += [f"<b>{i+1}.</b> <a href=\"https://polymarket.com/market/{i}\">Team {'X' * 250}</a>", " Mar 25, 19:00 WIB | In 6h", " Vol: $50,000", " Odds: TeamA 55c | 45c TeamB", ""]
lines.append("")
from browse import send_chunked
send_chunked(lines, fake_send, "Counter Strike", "Mar 25, 2026",
show_matches=True, show_non_matches=False)
# Should have sent more than one message (chunked)
self.assertGreater(len(sent_texts), 1)
# At least one continuation message
cont_found = any("(cont.)" in t for t in sent_texts)
self.assertTrue(cont_found, f"Expected at least one '(cont.)' message. Got {len(sent_texts)} messages.")
if __name__ == "__main__":
unittest.main()