diff --git a/skills/polymarket-browse/scripts/browse.py b/skills/polymarket-browse/scripts/browse.py index daf9d6f..51620eb 100644 --- a/skills/polymarket-browse/scripts/browse.py +++ b/skills/polymarket-browse/scripts/browse.py @@ -338,7 +338,176 @@ def browse_events(q, matches_max=10, non_matches_max=10, tradeable_only=True): } # ============================================================ -# FORMAT +# FORMAT — EVENT +# ============================================================ + +def format_match_event(e): + """ + Format a match event into a canonical dict for rendering. + All computing done here; renderers just template. + + Returns: + { + "title": str, # raw title + "title_clean": str, # "Team A vs Team B" + "tournament": str, # "Tournament Name" or "" + "url": str, + "time_status": str, # "LIVE", "In 6h", "12h ago" + "time_urgency": int, # 0-3 + "abs_time": str, # "Mar 25, 19:00 WIB" + "team_a": str, + "team_b": str, + "odds_a": str, # "55c" + "odds_b": str, + "vol": int, + } + """ + ml = get_ml_market(e) + outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] + prices = json.loads(ml.get("outcomePrices", "[]")) if ml else [] + td = _get_time_data(e) + title = e.get("title", "") + + team_a = outcomes[0] if len(outcomes) > 0 else "?" + team_b = outcomes[1] if len(outcomes) > 1 else "?" + odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?" + odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?" + + if " - " in title: + title_clean = title.split(" - ")[0].strip() + else: + title_clean = title + + tournament = get_tournament(title) + + return { + "title": title, + "title_clean": title_clean, + "tournament": tournament, + "url": get_event_url(e), + "time_status": td["time_status"], + "time_urgency": td["time_urgency"], + "abs_time": td["abs_time"], + "team_a": team_a, + "team_b": team_b, + "odds_a": odds_a, + "odds_b": odds_b, + "vol": get_ml_volume(e), + } + + +def format_non_match_event(e): + """ + Format a non-match event into a canonical dict for rendering. + + Returns: + { + "title": str, + "url": str, + "time_status": str, + "time_urgency": int, + "abs_time": str, + "market_count": int, + "total_vol": int, + } + """ + td = _get_time_data(e) + total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) + market_count = len(e.get("markets", [])) + + return { + "title": e.get("title", "?"), + "url": get_event_url(e), + "time_status": td["time_status"], + "time_urgency": td["time_urgency"], + "abs_time": td["abs_time"], + "market_count": market_count, + "total_vol": int(total_vol), + } + + +# ============================================================ +# FORMAT — RENDER +# ============================================================ + +def render_match_lines(event_dict, i, mode): + """ + Render a formatted match event dict into lines of text. + + Args: + event_dict: canonical dict from format_match_event() + i: 1-based index for the event number + mode: "text" for plain text/Markdown, "html" for Telegram HTML + + Returns: + List[str], one line per element (no trailing blank line). + Caller adds the blank line separator between events. + """ + title_clean = event_dict["title_clean"] + url = event_dict["url"] + abs_time = event_dict["abs_time"] + time_status = event_dict["time_status"] + vol = event_dict["vol"] + tournament = event_dict["tournament"] + team_a = event_dict["team_a"] + team_b = event_dict["team_b"] + odds_a = event_dict["odds_a"] + odds_b = event_dict["odds_b"] + + lines = [] + + if mode == "html": + lines.append( + f"{i}. {escape_html(title_clean)}" + ) + else: + lines.append(f"{i}. [{title_clean}]({url})") + + lines.append(f" {abs_time} | {time_status}") + lines.append(f" Vol: ${vol:,.0f}") + + if tournament: + lines.append(f" Tournament: {tournament}") + + lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}") + + return lines + + +def render_non_match_lines(event_dict, i, mode): + """ + Render a formatted non-match event dict into lines of text. + + Args: + event_dict: canonical dict from format_non_match_event() + i: 1-based index for the event number + mode: "text" for plain text/Markdown, "html" for Telegram HTML + + Returns: + List[str], one line per element (no trailing blank line). + """ + title = event_dict["title"] + url = event_dict["url"] + abs_time = event_dict["abs_time"] + time_status = event_dict["time_status"] + market_count = event_dict["market_count"] + total_vol = event_dict["total_vol"] + + lines = [] + + if mode == "html": + lines.append(f"{i}. {escape_html(title)}") + else: + lines.append(f"{i}. [{title}]({url})") + + lines.append(f" {abs_time} | {time_status}") + lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}") + + return lines + + +# ============================================================ +# FORMAT — LEGACY # ============================================================ def format_event(e): @@ -425,18 +594,17 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc utc7 = timezone(timedelta(hours=7)) now_utc7 = now_utc.astimezone(utc7) header_date = get_header_date() - + print(f"\n=== {category.upper()}{' [RAW]' if raw_mode else ''} ===") print(f"Current time (WIB): {now_utc7.strftime('%H:%M WIB')} | {header_date}") - + if raw_mode: print(f"Fetched: {total_fetched} / Total API: {total_raw} | Match: {total_match} | Non-match: {total_non_match}") if partial: print(f"WARNING: Partial fetch (API error or timeout) — data may be incomplete") - - # --- MATCH MARKETS --- + + # Determine sections to show if not matches_only and not non_matches_only: - # Default: show both show_matches = True show_non_matches = True elif matches_only: @@ -445,60 +613,25 @@ def print_browse(match_events, non_match_events, category, total_raw, total_fetc else: show_matches = False show_non_matches = True - + + # Match events if show_matches: - print(f"\nMATCH MARKETS") + print("\nMATCH MARKETS") if not match_events: print(" No match markets found.") else: for i, e in enumerate(match_events, 1): - f = format_event(e) - ml = get_ml_market(e) - outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] - prices = json.loads(ml.get("outcomePrices", "[]")) if ml else [] - vol = f["volume"] - title = f["title"] - url = f["url"] - td = _get_time_data(e) - start_time_wib = td["abs_time"] - rel_time = td["time_status"] - - team_a = outcomes[0] if len(outcomes) > 0 else "?" - team_b = outcomes[1] if len(outcomes) > 1 else "?" - odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?" - odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?" - - if " - " in title: - title_clean = title.split(" - ")[0].strip() - else: - title_clean = title - - tournament = get_tournament(title) - - print(f"\n {i}. [{title_clean}]({url})") - print(f" {start_time_wib} | {rel_time}") - print(f" Vol: ${vol:,.0f}") - if tournament: - print(f" Tournament: {tournament}") - print(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}") - - # --- NON-MATCH MARKETS --- - if show_non_matches and non_match_events: - print(f"\nNON-MATCH MARKETS") - - for i, e in enumerate(non_match_events[:non_matches_max], 1): - title = e.get("title", "?") - url = get_event_url(e) - td = _get_time_data(e) - start_time_wib = td["abs_time"] - rel_time = td["time_status"] + fd = format_match_event(e) + for line in render_match_lines(fd, i, mode="text"): + print(line) - total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) - market_count = len(e.get("markets", [])) - - print(f"\n {i}. [{title}]({url})") - print(f" {start_time_wib} | {rel_time}") - print(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}") + # Non-match events + if show_non_matches and non_match_events: + print("\nNON-MATCH MARKETS") + for i, e in enumerate(non_match_events[:non_matches_max], 1): + fd = format_non_match_event(e) + for line in render_non_match_lines(fd, i, mode="text"): + print(line) def print_detail(e, detail): print(f"\n{detail['title']}") @@ -566,104 +699,89 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals utc7 = timezone(timedelta(hours=7)) now_utc7 = now_utc.astimezone(utc7) header_date = now_utc7.strftime("%b %d, %Y") - + # Determine sections to show show_matches = (not matches_only and not non_matches_only) or matches_only show_non_matches = (not matches_only and not non_matches_only) or non_matches_only - + def send(text): msg_id = send_telegram_message(bot_token, chat_id, text) print(f" Sent msg {msg_id}") - - # Build sections - lines = [f"{category.upper()} | {header_date}"] - lines.append("") - + + # Build lines + lines = [f"{category.upper()} | {header_date}", ""] + if show_matches: - lines.append("MATCH MARKETS") - lines.append("") + lines += ["MATCH MARKETS", ""] if not match_events: lines.append(" No match markets found.") else: for i, e in enumerate(match_events, 1): - ml = get_ml_market(e) - outcomes = json.loads(ml.get("outcomes", "[]")) if ml else [] - prices = json.loads(ml.get("outcomePrices", "[]")) if ml else [] - vol = get_ml_volume(e) - title = e.get("title", "?") - url = get_event_url(e) - td = _get_time_data(e) - start_time_wib = td["abs_time"] - rel_time = td["time_status"] - team_a = outcomes[0] if len(outcomes) > 0 else "?" - team_b = outcomes[1] if len(outcomes) > 1 else "?" - odds_a = format_odds(float(prices[0])) if len(prices) > 0 else "?" - odds_b = format_odds(float(prices[1])) if len(prices) > 1 else "?" - tournament = get_tournament(title) - title_clean = title.split(" - ")[0].strip() if " - " in title else title - lines.append(f"{i}. {escape_html(title_clean)}") - lines.append(f" {start_time_wib} | {rel_time}") - lines.append(f" Vol: ${vol:,.0f}") - if tournament: - lines.append(f" Tournament: {tournament}") - lines.append(f" Odds: {team_a} {odds_a} | {odds_b} {team_b}") + fd = format_match_event(e) + lines += render_match_lines(fd, i, mode="html") lines.append("") lines.append("") - + if show_non_matches: - lines.append("NON-MATCH MARKETS") - lines.append("") + lines += ["NON-MATCH MARKETS", ""] if not non_match_events: lines.append(" No non-match markets found.") else: for i, e in enumerate(non_match_events, 1): - title = e.get("title", "?") - url = get_event_url(e) - td = _get_time_data(e) - start_time_wib = td["abs_time"] - rel_time = td["time_status"] - total_vol = sum(float(m.get("volume", 0)) for m in e.get("markets", [])) - market_count = len(e.get("markets", [])) - lines.append(f"{i}. {escape_html(title)}") - lines.append(f" {start_time_wib} | {rel_time}") - lines.append(f" Markets: {market_count} | Total Vol: ${total_vol:,.0f}") + fd = format_non_match_event(e) + lines += render_non_match_lines(fd, i, mode="html") lines.append("") - - # Chunk by 10 items (events), respecting 4096 char Telegram limit - text = "\n".join(lines) + lines.append("") + + # Chunk and send + send_chunked(lines, send, category, header_date, show_matches, show_non_matches) + + +def send_chunked(all_lines, send_fn, category, header_date, show_matches, show_non_matches): + """ + Split already-built lines into Telegram-safe chunks and send them. + + Telegram messages are capped at 4096 chars. Chunks are grouped by + section header so no event is split across messages. + + Args: + all_lines: Full message lines list (built by caller). + send_fn: Closure that sends a single string and prints confirmation. + category: Category name for header. + header_date: Date string for header. + show_matches: Whether MATCH MARKETS section is present. + show_non_matches: Whether NON-MATCH MARKETS section is present. + """ + text = "\n".join(all_lines) if len(text) <= 4096: - send(text) + send_fn(text) return - - # Split into chunks of 10 events + + # Split into chunks of 10 events, respecting section headers all_items = [] in_match = True - for line in lines: + for line in all_lines: if line == "MATCH MARKETS": in_match = True elif line == "NON-MATCH MARKETS": in_match = False - elif line.startswith("") and ". " in line and "" in line: + elif line.startswith("") and "" in line: + # Event title line: 1. Title all_items.append((in_match, line)) - + chunk = [] - chunk_len = 0 - chunk_num = 1 - - # Header is always first header = f"{category.upper()} | {header_date}\n" if show_matches: header += "\nMATCH MARKETS\n\n" if show_non_matches: header += "\nNON-MATCH MARKETS\n\n" - + for is_match, item_line in all_items: test_chunk = chunk + [item_line, ""] test_text = header + "\n".join(chunk) + "\n".join(test_chunk) if len(test_text) > 4096 or len(chunk) >= 10: - # Send current chunk msg = header + "\n".join(chunk) - send(msg) + send_fn(msg) chunk = [item_line, ""] header = f"{category.upper()} (cont.) | {header_date}\n" if show_matches and is_match: @@ -672,10 +790,10 @@ def send_to_telegram(match_events, non_match_events, category, matches_only=Fals header += "\nNON-MATCH MARKETS\n\n" else: chunk.extend([item_line, ""]) - + if chunk: msg = header + "\n".join(chunk) - send(msg) + send_fn(msg) # ============================================================ diff --git a/skills/polymarket-browse/tests/test_browse.py b/skills/polymarket-browse/tests/test_browse.py index 9be1a1a..e909eba 100644 --- a/skills/polymarket-browse/tests/test_browse.py +++ b/skills/polymarket-browse/tests/test_browse.py @@ -320,5 +320,457 @@ class TestTimeFunctions(unittest.TestCase): self.assertEqual(td2["abs_time"], "Mar 25, 19:00 WIB") +class TestFormatMatchEvent(unittest.TestCase): + """Tests for format_match_event() canonical dict.""" + + def _frozen_dt(self, year, month, day, hour, minute): + return datetime(year, month, day, hour, minute, + tzinfo=timezone.utc) + + def _mock_datetime(self, frozen): + class MockDatetime: + @staticmethod + def now(tz=None): + if tz is None: + return frozen + return frozen.astimezone(tz) + fromisoformat = staticmethod(datetime.fromisoformat) + def __call__(self, *a, **k): + return datetime(*a, **k) + return MockDatetime + + def _make_event(self, title, ml_market=None, start_time="2026-03-25T18:00:00Z"): + import json as _json + e = { + "title": title, + "slug": "test-slug", + "startTime": start_time, + "markets": [], + } + if ml_market: + e["markets"].append(ml_market) + return e + + def _make_ml_market(self, outcomes, prices, vol=50000): + import json + return { + "sportsMarketType": "moneyline", + "outcomes": json.dumps(outcomes), + "outcomePrices": json.dumps(prices), + "bestBid": str(float(prices[0]) - 0.01) if prices else "0.49", + "bestAsk": str(float(prices[0]) + 0.01) if prices else "0.51", + "volume": str(vol), + "acceptingOrders": True, + "closed": False, + } + + def test_fields_present(self): + """All canonical fields are present and non-null.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_match_event + e = self._make_event( + "Counter Strike: Team A vs Team B - ESL Pro League", + self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]), + ) + fd = format_match_event(e) + self.assertIn("title", fd) + self.assertIn("title_clean", fd) + self.assertIn("tournament", fd) + self.assertIn("url", fd) + self.assertIn("time_status", fd) + self.assertIn("time_urgency", fd) + self.assertIn("abs_time", fd) + self.assertIn("team_a", fd) + self.assertIn("team_b", fd) + self.assertIn("odds_a", fd) + self.assertIn("odds_b", fd) + self.assertIn("vol", fd) + + def test_title_clean_no_tournament(self): + """title_clean strips tournament suffix after ' - '.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_match_event + e = self._make_event( + "Counter Strike: Team A vs Team B - ESL Pro League", + self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]), + ) + fd = format_match_event(e) + self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B") + self.assertEqual(fd["tournament"], "ESL Pro League") + + def test_title_clean_no_dash(self): + """title_clean is unchanged when no ' - ' separator.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_match_event + e = self._make_event( + "Counter Strike: Team A vs Team B", + self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]), + ) + fd = format_match_event(e) + self.assertEqual(fd["title_clean"], "Counter Strike: Team A vs Team B") + self.assertEqual(fd["tournament"], "") + + def test_missing_ml(self): + """Returns valid dict with '?' fallbacks when no ML market.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_match_event + e = self._make_event("Team A vs Team B") + fd = format_match_event(e) + self.assertEqual(fd["team_a"], "?") + self.assertEqual(fd["team_b"], "?") + self.assertEqual(fd["odds_a"], "?") + self.assertEqual(fd["odds_b"], "?") + self.assertEqual(fd["vol"], 0) + + def test_missing_outcomes(self): + """Handles empty outcomes list gracefully.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_match_event + e = self._make_event( + "Team A vs Team B", + self._make_ml_market([], []), + ) + fd = format_match_event(e) + self.assertEqual(fd["team_a"], "?") + self.assertEqual(fd["team_b"], "?") + + def test_time_data_passed_through(self): + """Time fields come from _get_time_data.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_match_event + e = self._make_event( + "Team A vs Team B", + self._make_ml_market(['"Team A"', '"Team B"'], [0.55, 0.45]), + start_time="2026-03-25T18:00:00Z", # 6h in future + ) + fd = format_match_event(e) + self.assertEqual(fd["time_status"], "In 6h") + self.assertEqual(fd["time_urgency"], 2) + self.assertIn("WIB", fd["abs_time"]) + + +class TestFormatNonMatchEvent(unittest.TestCase): + """Tests for format_non_match_event() canonical dict.""" + + def _frozen_dt(self, year, month, day, hour, minute): + return datetime(year, month, day, hour, minute, + tzinfo=timezone.utc) + + def _mock_datetime(self, frozen): + class MockDatetime: + @staticmethod + def now(tz=None): + if tz is None: + return frozen + return frozen.astimezone(tz) + fromisoformat = staticmethod(datetime.fromisoformat) + def __call__(self, *a, **k): + return datetime(*a, **k) + return MockDatetime + + def test_fields_present(self): + """All canonical fields are present.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_non_match_event + e = { + "title": "Will it rain in Jakarta?", + "slug": "rain-jakarta", + "startTime": "2026-03-25T18:00:00Z", + "markets": [ + {"volume": "10000"}, + {"volume": "5000"}, + ], + } + fd = format_non_match_event(e) + self.assertIn("title", fd) + self.assertIn("url", fd) + self.assertIn("time_status", fd) + self.assertIn("time_urgency", fd) + self.assertIn("abs_time", fd) + self.assertIn("market_count", fd) + self.assertIn("total_vol", fd) + + def test_market_stats(self): + """market_count and total_vol computed correctly.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_non_match_event + e = { + "title": "Test", + "slug": "test", + "startTime": "2026-03-25T18:00:00Z", + "markets": [ + {"volume": "10000"}, + {"volume": "5000"}, + ], + } + fd = format_non_match_event(e) + self.assertEqual(fd["market_count"], 2) + self.assertEqual(fd["total_vol"], 15000) + + def test_time_passed_through(self): + """Time fields come from _get_time_data.""" + frozen = self._frozen_dt(2026, 3, 25, 12, 0) + with patch('browse.datetime', self._mock_datetime(frozen)): + from browse import format_non_match_event + e = { + "title": "Test", + "slug": "test", + "startTime": "2026-03-25T18:00:00Z", + "markets": [], + } + fd = format_non_match_event(e) + self.assertEqual(fd["time_status"], "In 6h") + + +class TestRenderMatchLines(unittest.TestCase): + """Tests for render_match_lines() text and HTML output.""" + + def test_text_mode_exact_lines(self): + """text mode produces expected plain text lines.""" + from browse import render_match_lines + fd = { + "title_clean": "Team A vs Team B", + "url": "https://polymarket.com/market/test", + "abs_time": "Mar 25, 19:00 WIB", + "time_status": "In 6h", + "vol": 50000, + "tournament": "ESL Pro League", + "team_a": "Team A", + "team_b": "Team B", + "odds_a": "55c", + "odds_b": "45c", + } + lines = render_match_lines(fd, 1, mode="text") + self.assertEqual(lines[0], "1. [Team A vs Team B](https://polymarket.com/market/test)") + self.assertEqual(lines[1], " Mar 25, 19:00 WIB | In 6h") + self.assertEqual(lines[2], " Vol: $50,000") + self.assertEqual(lines[3], " Tournament: ESL Pro League") + self.assertEqual(lines[4], " Odds: Team A 55c | 45c Team B") + + def test_text_mode_no_tournament(self): + """text mode omits Tournament line when tournament is empty.""" + from browse import render_match_lines + fd = { + "title_clean": "Team A vs Team B", + "url": "https://polymarket.com/market/test", + "abs_time": "Mar 25, 19:00 WIB", + "time_status": "In 6h", + "vol": 50000, + "tournament": "", + "team_a": "Team A", + "team_b": "Team B", + "odds_a": "55c", + "odds_b": "45c", + } + lines = render_match_lines(fd, 2, mode="text") + self.assertEqual(len(lines), 4) + self.assertEqual(lines[0], "2. [Team A vs Team B](https://polymarket.com/market/test)") + self.assertNotIn("Tournament", lines[3]) + + def test_html_mode_exact(self): + """html mode produces expected HTML lines with escape_html.""" + from browse import render_match_lines + fd = { + "title_clean": "Team A & Team B vs Team C", + "url": "https://polymarket.com/market/test", + "abs_time": "Mar 25, 19:00 WIB", + "time_status": "LIVE", + "vol": 50000, + "tournament": "ESL Pro League", + "team_a": "Team A & Team B", + "team_b": "Team C", + "odds_a": "55c", + "odds_b": "45c", + } + lines = render_match_lines(fd, 1, mode="html") + self.assertEqual(lines[0], "1. Team A & Team B vs Team C") + self.assertEqual(lines[1], " Mar 25, 19:00 WIB | LIVE") + self.assertEqual(lines[2], " Vol: $50,000") + self.assertEqual(lines[3], " Tournament: ESL Pro League") + self.assertEqual(lines[4], " Odds: Team A & Team B 55c | 45c Team C") + + def test_html_mode_xss_prevention(self): + """html mode escapes < and > to prevent XSS.""" + from browse import render_match_lines + fd = { + "title_clean": "", + "url": "https://polymarket.com/market/test", + "abs_time": "Mar 25, 19:00 WIB", + "time_status": "LIVE", + "vol": 1000, + "tournament": "", + "team_a": "Team A", + "team_b": "Team B", + "odds_a": "50c", + "odds_b": "50c", + } + lines = render_match_lines(fd, 1, mode="html") + self.assertIn("<script>", lines[0]) + self.assertNotIn("