diff --git a/src/backend/app/services/ai_agent/conversational.py b/src/backend/app/services/ai_agent/conversational.py index cc6ac46..d893bc5 100644 --- a/src/backend/app/services/ai_agent/conversational.py +++ b/src/backend/app/services/ai_agent/conversational.py @@ -18,6 +18,199 @@ from ...core.config import get_settings from ...db.models import Bot, Simulation +TOOL_REGISTRY = { + "randebu": [ + { + "name": "backtest", + "description": "Run strategy backtest", + "category": "Randebu Built-in", + "command": "/backtest", + "details": { + "description": "Run a backtest to evaluate how the current trading strategy would have performed historically.", + "usage": "/backtest [token_address] [--timeframe 1d|4h|1h|15m] [--start YYYY-MM-DD] [--end YYYY-MM-DD]", + "example": "Run a backtest on PEPE for the last 30 days", + }, + }, + { + "name": "simulate", + "description": "Start/stop simulation", + "category": "Randebu Built-in", + "command": "/simulate", + "details": { + "description": "Start or stop trading simulations that run on real-time klines.", + "usage": "/simulate start|stop|status|results [token_address]", + "example": "Start a simulation on PEPE", + }, + }, + { + "name": "strategy", + "description": "View/update strategy", + "category": "Randebu Built-in", + "command": "/strategy", + "details": { + "description": "View your current trading strategy or update it with new parameters.", + "usage": "Describe your strategy in plain English, e.g., 'Buy PEPE when price drops 5%'", + "example": "Buy PEPE when it drops 10% within 1 hour", + }, + }, + ], + "ave": [ + { + "name": "search", + "description": "Token search", + "category": "AVE Cloud Skills", + "command": "/search", + "details": { + "description": "Find tokens by keyword, symbol, or contract address on BSC.", + "usage": "search [--chain bsc] [--limit 20]", + "example": "search PEPE\nsearch 0x1234... --chain bsc", + }, + }, + { + "name": "trending", + "description": "Popular tokens", + "category": "AVE Cloud Skills", + "command": "/trending", + "details": { + "description": "Get list of trending/popular tokens on BSC.", + "usage": "trending [--chain bsc] [--limit 20]", + "example": "trending --chain bsc\ntrending --limit 10", + }, + }, + { + "name": "risk", + "description": "Honeypot detection", + "category": "AVE Cloud Skills", + "command": "/risk", + "details": { + "description": "Get risk analysis for a token contract including honeypot assessment.", + "usage": "risk [--chain bsc]", + "example": "risk 0x6982508145454Ce125dDE157d8d64a26D53f60a2", + }, + }, + { + "name": "token", + "description": "Token details", + "category": "AVE Cloud Skills", + "command": "/token", + "details": { + "description": "Get detailed information about a specific token including price, market cap, and pairs.", + "usage": "token
[--chain bsc]", + "example": "token 0x6982508145454Ce125dDE157d8d64a26D53f60a2", + }, + }, + { + "name": "price", + "description": "Batch prices", + "category": "AVE Cloud Skills", + "command": "/price", + "details": { + "description": "Get current price(s) for multiple tokens.", + "usage": "price ,,... (e.g., PEPE-bsc,TRUMP-bsc)", + "example": "price PEPE-bsc,TRUMP-bsc", + }, + }, + ], +} + + +# Skill emojis mapping +SKILL_EMOJIS = { + "backtest": "šŸ“Š", + "simulate": "šŸŽ®", + "strategy": "šŸ“", + "search": "šŸ”", + "trending": "šŸ“ˆ", + "risk": "šŸ“‰", + "token": "šŸŖ™", + "price": "šŸ’°", +} + + +def get_tool_registry() -> Dict[str, Any]: + """Return the tool registry for slash command help.""" + return TOOL_REGISTRY + + +def format_tools_list() -> str: + """Format the tool registry as a help message.""" + message = "šŸ“‹ Available Tools\n\n" + + for category in ["randebu", "ave"]: + tools = TOOL_REGISTRY.get(category, []) + if category == "randedu": + message += "šŸ¤– Randebu Built-in:\n" + else: + message += "ā˜ļø AVE Cloud Skills:\n" + + for tool in tools: + message += f" • {tool['command']} - {tool['description']}\n" + message += "\n" + + message = ( + message.rstrip() + "\n\nType / for detailed help on a specific tool." + ) + return message + + +def format_skill_acknowledgment(tool_name: str, description: str) -> str: + """Format a brief acknowledgment when a skill is activated.""" + emoji = SKILL_EMOJIS.get(tool_name.lower(), "✨") + return f"{emoji} **{tool_name}** loaded. Ready for *{description}*, ask me away!" + + +def format_tool_help(tool_name: str) -> str: + """Format detailed help for a specific tool.""" + tool_name = tool_name.lstrip("/") + + for category in ["randebu", "ave"]: + for tool in TOOL_REGISTRY.get(category, []): + if tool["name"].lower() == tool_name.lower(): + cat_label = ( + "Randebu Built-in" if category == "randebu" else "AVE Cloud Skill" + ) + details = tool["details"] + message = ( + f"šŸ” {tool['command']} - {details['description']} ({cat_label})\n\n" + ) + message += f"**Description:** {details['description']}\n" + message += f"**Commands:**\n {details['usage']}\n\n" + message += f"**Example:**\n```\n{details['example']}\n```" + return message + + return f"Tool '{tool_name}' not found. Type / to see all available tools." + + +def format_general_help() -> str: + """Format general help about Randebu.""" + return """šŸ¤– **Randebu - AI Trading Assistant** + +Randebu is your AI trading assistant that helps you manage your trading bots on BSC (Binance Smart Chain). + +**Getting Started:** +1. Create a bot on the dashboard +2. Describe your trading strategy in plain English +3. Run backtests to validate your strategy +4. Start simulations to see live trading + +**Example Strategies:** +- "Buy PEPE when it drops 5%" +- "Sell if price rises 10% within 1 hour" +- "Buy when volume spikes by 200%" + +**Slash Commands:** +- `/` - Show all available tools +- `/help` - Show this help message +- `/` - Get help on a specific tool + +**Natural Language:** +You can also just describe what you want in natural language. For example: +- "What's the price of PEPE?" +- "Run a backtest on 0x... token" +- "Start a simulation on TRUMP" +""" + + SYSTEM_PROMPT = """You are a helpful AI trading assistant named Randebu. You help users manage their trading bots. IMPORTANT CHAIN LIMITATION: @@ -35,7 +228,7 @@ Your response must be valid JSON with exactly this structure: "actions": [{"type": "buy" | "sell" | "hold", "amount_percent": number, ...}], "risk_management": {"stop_loss_percent": number, "take_profit_percent": number} } -} +}" Guidelines: - "thinking" should be detailed reasoning about the user's request @@ -163,6 +356,28 @@ TOOLS = [ }, }, }, + { + "type": "function", + "function": { + "name": "get_trending", + "description": "Get trending tokens on a blockchain. Use when user asks what's trending, top tokens, or popular tokens right now.", + "parameters": { + "type": "object", + "properties": { + "chain": { + "type": "string", + "description": "Blockchain chain (default: bsc)", + "default": "bsc", + }, + "limit": { + "type": "integer", + "description": "Number of trending tokens to return (default: 10, max: 50)", + "default": 10, + }, + }, + }, + }, + }, { "type": "function", "function": { @@ -231,6 +446,7 @@ You have access to tools: - get_token(address, chain): Get detailed information about a specific token. Use when user asks for token details. - get_price(token_ids): Get current price(s) for tokens. Use when user asks for token price. - get_risk(address, chain): Get risk analysis for a token. Use when user asks about token safety or honeypot analysis. +- get_trending(chain, limit): Get trending tokens on a blockchain. Use when user asks what's trending, top tokens, or popular tokens. - run_backtest(token_address, timeframe, start_date, end_date): Run a backtest on historical data. Returns performance metrics. Use when user asks to backtest or check historical performance. - manage_simulation(action, token_address, kline_interval): Manage trading simulations. Actions: 'start' (begin new), 'stop' (stop running), 'status' (check if running), 'results' (get current/latest results). @@ -253,6 +469,795 @@ class ConversationalAgent: # Extended thinking endpoint self.thinking_endpoint = "https://api.minimax.io/v1/text/chatcompletion_v2" + # Track pending command after acknowledgment + self.pending_command = None + + # Track recent search results for context + self.recent_search_results = [] # List of {symbol, name, address} + + def _is_error_output(self, code: int, output: str) -> bool: + """Check if the command output contains an error.""" + if code != 0: + return True + # Check for common error patterns in output + if output.startswith("Error:") or "API error" in output or "api key invalid" in output.lower(): + return True + return False + + def _handle_slash_command(self, user_message: str) -> Dict[str, Any]: + """Handle slash command help requests. + + Args: + user_message: The slash command message (e.g., '/', '/help', '/search') + + Returns: + Dict with 'response', 'thinking', and other fields + """ + cmd = user_message.strip().lower() + + if cmd == "/": + return { + "response": format_tools_list(), + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + elif cmd == "/help": + return { + "response": format_general_help(), + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + elif cmd.startswith("/"): + # Check if it's a known skill (only look at first word after /) + parts = cmd[1:].split() + tool_name = parts[0].lower() if parts else "" + has_args = len(parts) > 1 + + for category in ["randebu", "ave"]: + for tool in TOOL_REGISTRY.get(category, []): + if tool["name"].lower() == tool_name: + # Special handling for /strategy - fetch current strategy from DB + if tool_name == "strategy" and not has_args: + return self._get_strategy_response() + # Special handling for /trending - execute trending directly + if tool_name == "trending" and not has_args: + return self._execute_trending() + # Special handling for /backtest - execute directly + if tool_name == "backtest": + return self._execute_backtest_direct(user_message if has_args else "") + # Special handling for /simulate - execute directly + if tool_name == "simulate": + return self._execute_simulate_direct(user_message if has_args else "") + # For commands that need params (/search, /risk, /token, /price) + # execute immediately if args provided, otherwise set pending + if not has_args: + # Set pending command for next message + self.pending_command = tool_name + return { + "response": format_skill_acknowledgment( + tool["name"], tool["description"] + ), + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + return None # Has args - let AI handle it + + # Unknown skill + return { + "response": f"Unknown command '{tool_name}'. Type / to see available tools.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + return { + "response": "Unknown command. Type / for available tools or /help for general help.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _get_strategy_response(self) -> Dict[str, Any]: + """Fetch and format the current strategy from the database.""" + if not self.bot_id: + return { + "response": "No bot selected. Please select a bot first.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + try: + from ...core.database import get_db + db = next(get_db()) + try: + bot = db.query(Bot).filter(Bot.id == self.bot_id).first() + if not bot: + return { + "response": "Bot not found.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + strategy_config = bot.strategy_config + if not strategy_config: + return { + "response": "šŸ“ **Your Current Strategy**\n\nNo strategy has been configured yet. Tell me what trading strategy you'd like to use, and I'll set it up for you!\n\nExample: \"Buy PEPE when it drops 5%\"", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + # Format the strategy nicely + conditions = strategy_config.get("conditions", []) + actions = strategy_config.get("actions", []) + risk = strategy_config.get("risk_management", {}) + + response = "šŸ“ **Your Current Strategy**\n\n" + + # Format conditions + if conditions: + response += "**Conditions:**\n" + for cond in conditions: + cond_type = cond.get("type", "unknown") + token = cond.get("token", "") + threshold = cond.get("threshold", 0) + timeframe = cond.get("timeframe", "") + if cond_type == "price_drop": + response += f"- Buy when {token} drops {threshold}%" + elif cond_type == "price_rise": + response += f"- Sell when {token} rises {threshold}%" + elif cond_type == "volume_spike": + response += f"- Buy when volume spikes {threshold}%" + elif cond_type == "price_level": + response += f"- Buy/sell at price level {threshold}" + else: + response += f"- {cond_type}: {token} {threshold}" + if timeframe: + response += f" within {timeframe}" + response += "\n" + response += "\n" + + # Format actions + if actions: + response += "**Actions:**\n" + for action in actions: + action_type = action.get("type", "unknown") + amount = action.get("amount_percent", 0) + response += f"- {action_type.capitalize()} {amount}% of balance\n" + response += "\n" + + # Format risk management + if risk: + response += "**Risk Management:**\n" + stop_loss = risk.get("stop_loss_percent", 0) + take_profit = risk.get("take_profit_percent", 0) + if stop_loss: + response += f"- Stop loss: {stop_loss}%\n" + if take_profit: + response += f"- Take profit: {take_profit}%\n" + + response += "\nWould you like to modify this strategy?" + + return { + "response": response, + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + finally: + db.close() + except Exception as e: + return { + "response": f"Error fetching strategy: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_trending(self) -> Dict[str, Any]: + """Execute the trending tokens command and return results.""" + try: + code, output = self._call_ave_script( + "trending", + ["--chain", "bsc", "--page-size", "10"], + ) + if self._is_error_output(code, output): + return { + "response": f"Failed to get trending tokens: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + try: + data = json.loads(output) + # Handle both dict with 'tokens' key and direct list + data_field = data.get("data", []) + if isinstance(data_field, list): + tokens = data_field + else: + tokens = data_field.get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:10]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get("token_price_change_24h", "N/A") + mc = t.get("market_cap", "N/A") + try: + mc_str = f"${float(mc):,.0f}" + except (ValueError, TypeError): + mc_str = str(mc) + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n" + return { + "response": f"šŸ“ˆ **Trending Tokens on BSC:**\n\n{token_list}\nWould you like me to set up a strategy for any of these?", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": "No trending tokens found on BSC right now. Try again later!", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse trending data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting trending tokens: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_search(self, keyword: str) -> Dict[str, Any]: + """Execute search with the given keyword.""" + try: + code, output = self._call_ave_script( + "search", + ["--keyword", keyword.strip(), "--chain", "bsc", "--limit", "10"], + ) + if self._is_error_output(code, output): + return { + "response": f"Failed to search tokens: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + try: + data = json.loads(output) + # Handle both dict with 'tokens' key and direct list + data_field = data.get("data", []) + if isinstance(data_field, list): + tokens = data_field + else: + tokens = data_field.get("tokens", []) + if tokens: + # Store search results for context + self.recent_search_results = [] + token_list = "" + for t in tokens[:10]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + # Search API uses price_change_24h, trending uses token_price_change_24h + price_change = t.get("price_change_24h") or t.get("token_price_change_24h") or "N/A" + mc = t.get("market_cap", "N/A") + # Store for context + if addr and symbol: + self.recent_search_results.append({ + "symbol": symbol, + "name": name, + "address": addr, + "chain": "bsc" + }) + try: + mc_str = f"${float(mc):,.0f}" + except (ValueError, TypeError): + mc_str = str(mc) + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n" + return { + "response": f"šŸ” **Search Results for '{keyword}':**\n\n{token_list}\nWould you like me to set up a strategy for any of these?", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + self.recent_search_results = [] + return { + "response": f"No tokens found for '{keyword}'. Try a different keyword.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse search results: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error searching tokens: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_risk(self, address: str) -> Dict[str, Any]: + """Execute risk analysis for the given token address.""" + try: + code, output = self._call_ave_script( + "risk", + ["--address", address.strip(), "--chain", "bsc"], + ) + if self._is_error_output(code, output): + return { + "response": f"Failed to get risk data: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + try: + data = json.loads(output) + data_field = data.get("data") + risk_data = data_field if isinstance(data_field, dict) else {} + if risk_data: + is_honeypot = risk_data.get("is_honeypot", "unknown") + buy_tax = risk_data.get("buy_tax", 0) + sell_tax = risk_data.get("sell_tax", 0) + risk_level = risk_data.get("risk_level", 0) + risk_score = risk_data.get("risk_score", "N/A") + token_symbol = risk_data.get("token_symbol", "") + token_name = risk_data.get("token_name", "") + + # Format token label + if token_symbol: + token_label = f"**{token_symbol}** ({token_name}) - `{address}`" + else: + token_label = f"`{address}`" + + # Convert is_honeypot to string + # -1 = unknown/could not determine, 0 = false, 1 = true + if isinstance(is_honeypot, bool): + is_honeypot_str = str(is_honeypot).lower() + elif isinstance(is_honeypot, int): + if is_honeypot == 1: + is_honeypot_str = "true" + elif is_honeypot == 0: + is_honeypot_str = "false" + else: + is_honeypot_str = "Unknown (could not determine)" + else: + is_honeypot_str = str(is_honeypot).lower() if is_honeypot else "Unknown (could not determine)" + + # Convert tax values + try: + buy_tax_val = float(buy_tax) if buy_tax not in (None, "N/A") else 0 + except (ValueError, TypeError): + buy_tax_val = 0 + try: + sell_tax_val = float(sell_tax) if sell_tax not in (None, "N/A") else 0 + except (ValueError, TypeError): + sell_tax_val = 0 + + # Determine risk level label + risk_level_str = "Low" if risk_level == 0 else "Medium" if risk_level == 1 else "High" if risk_level == 2 else "Unknown" + + risk_text = f"šŸ›”ļø **Risk Analysis for {token_label}**\n\n" + risk_text += f"- Risk Level: {risk_level_str} (Score: {risk_score})\n" + risk_text += f"- Honeypot: {is_honeypot_str}\n" + risk_text += f"- Buy Tax: {buy_tax}%\n" + risk_text += f"- Sell Tax: {sell_tax}%\n" + + if is_honeypot_str == "true": + risk_text += "\nāš ļø **Warning: This token appears to be a honeypot. Do not buy!**" + elif buy_tax_val > 10 or sell_tax_val > 10: + risk_text += "\nāš ļø **Warning: High tax detected. Trade with caution!**" + else: + risk_text += "\nāœ… This token appears safe to trade." + return { + "response": risk_text, + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"No risk data available for `{address}`", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse risk data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting risk data: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _get_token_info(self, address: str) -> Dict[str, str]: + """Get basic token info (symbol, name) without formatting for display.""" + try: + code, output = self._call_ave_script( + "token", + ["--address", address.strip(), "--chain", "bsc"], + ) + if code == 0: + try: + data = json.loads(output) + data_field = data.get("data") + token_data = data_field if isinstance(data_field, dict) else {} + token_info = token_data.get("token", token_data) + symbol = token_info.get("symbol") or token_data.get("symbol") + name = token_info.get("name") or token_data.get("name") + return {"symbol": symbol or "", "name": name or ""} + except (json.JSONDecodeError, AttributeError): + return {"symbol": "", "name": ""} + return {"symbol": "", "name": ""} + except Exception: + return {"symbol": "", "name": ""} + + def _execute_token(self, address: str) -> Dict[str, Any]: + """Execute token details for the given address.""" + try: + code, output = self._call_ave_script( + "token", + ["--address", address.strip(), "--chain", "bsc"], + ) + if code == 0: + try: + data = json.loads(output) + data_field = data.get("data") + token_data = data_field if isinstance(data_field, dict) else {} + token_info = token_data.get("token", token_data) + symbol = token_info.get("symbol") or token_data.get("symbol") + name = token_info.get("name") or token_data.get("name") + if not symbol or symbol == "N/A" or not name or name == "N/A": + return { + "response": f"Token not found for `{address}`.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + price = token_info.get("current_price_usd") or token_info.get("price_usd") or token_info.get("price") or token_data.get("price") or "N/A" + mc = token_info.get("market_cap") or token_info.get("fdv") or token_data.get("market_cap") or "N/A" + vol = token_info.get("tx_volume_u_24h") or token_info.get("volume_24h") or token_data.get("volume_24h") or "N/A" + pairs = token_info.get("top_pairs") or token_data.get("top_pairs") or [] + pairs_text = "" + if pairs: + pairs_text = "\n**Top Pairs:**\n" + for p in pairs[:3]: + liq = p.get('liquidity', 'N/A') + try: + liq_str = f"${float(liq):,.0f}" if liq and liq != "N/A" else liq + except (ValueError, TypeError): + liq_str = str(liq) + pairs_text += f"- {p.get('pair', 'N/A')}: {liq_str} liquidity\n" + try: + mc_str = f"${float(mc):,.0f}" if mc != "N/A" else mc + except (ValueError, TypeError): + mc_str = str(mc) + try: + vol_str = f"${float(vol):,.0f}" if vol != "N/A" else vol + except (ValueError, TypeError): + vol_str = str(vol) + return { + "response": f"šŸŖ™ **{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: {mc_str}\n24h Volume: {vol_str}{pairs_text}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse token data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"Failed to get token details: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting token details: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_price(self, token_ids: str) -> Dict[str, Any]: + """Execute price lookup for the given token IDs.""" + try: + tokens_list = token_ids.replace(",", " ").split() + if not tokens_list: + return { + "response": "No token provided. Please provide a token address (e.g., '0x...-bsc') or use /search to find a token first.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + # Check if input matches recent search results + token_input = tokens_list[0].lower() + matched_address = None + for result in self.recent_search_results: + if (result["symbol"].lower() == token_input or + result["name"].lower() == token_input or + result["address"].lower() == token_input): + matched_address = f"{result['address']}-{result['chain']}" + break + + # Use matched address or original input + price_tokens = [matched_address] if matched_address else tokens_list + + code, output = self._call_ave_script( + "price", + ["--tokens"] + price_tokens, + ) + if self._is_error_output(code, output): + return { + "response": f"Failed to get prices: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + try: + data = json.loads(output) + prices = data.get("data", {}) + if not isinstance(prices, dict): + prices = {} + if prices: + price_text = "šŸ’° **Token Prices:**\n" + for token_id, price_data in prices.items(): + price = price_data.get("price", "N/A") if isinstance(price_data, dict) else "N/A" + change_24h = price_data.get("token_price_change_24h", "N/A") if isinstance(price_data, dict) else "N/A" + mc = price_data.get("market_cap", "N/A") if isinstance(price_data, dict) else "N/A" + try: + price_str = f"${float(price):,.6f}" if price and price != "N/A" else price + except (ValueError, TypeError): + price_str = str(price) if price else "N/A" + try: + mc_str = f"${float(mc):,.0f}" if mc and mc != "N/A" else mc + except (ValueError, TypeError): + mc_str = str(mc) if mc else "N/A" + price_text += f"- **{token_id}**: {price_str} (MC: {mc_str})\n" + return { + "response": price_text, + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + if matched_address: + return { + "response": f"No price data available for {matched_address}. Try using /search to find the token first.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + return { + "response": "No price data available. The /price tool requires a token contract address (e.g., '0x6982508145454Ce125dDE157d8d64a26D53f60a2-bsc'). Use /search to find a token first, then use its contract address with /price.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse price data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting prices: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_backtest_direct(self, message: str) -> Dict[str, Any]: + """Execute backtest directly using token from strategy or message.""" + # Extract token address from message if provided + parts = message.split() + token_address = None + timeframe = "1d" + start_date = None + end_date = None + + # Parse arguments from message + for i, part in enumerate(parts[1:], 1): # Skip /backtest + if part.startswith("0x") and len(part) > 20: + token_address = part + elif part in ["1d", "4h", "1h", "15m"]: + timeframe = part + elif part.startswith("20") and len(part) == 10: + if not start_date: + start_date = part + else: + end_date = part + + # If no token address in message, try to get from strategy + if not token_address and self.bot_id: + try: + from ...core.database import get_db + db = next(get_db()) + try: + bot = db.query(Bot).filter(Bot.id == self.bot_id).first() + if bot and bot.strategy_config: + conditions = bot.strategy_config.get("conditions", []) + for cond in conditions: + addr = cond.get("token_address") + if addr: + token_address = addr + break + finally: + db.close() + except Exception: + pass + + if not token_address: + return { + "response": "šŸ“Š **Backtest**\n\nI need a token address to run a backtest. Please provide:\n- Token contract address (e.g., `0x...`)\n- Timeframe (1d, 4h, 1h, 15m) - default is 1d\n- Start and end dates (YYYY-MM-DD) - optional, defaults to last 30 days", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + # Look up token details first + token_info = self._get_token_info(token_address) + token_label = f"`{token_address}`" + if token_info.get("symbol"): + token_label = f"**{token_info['symbol']}** ({token_info.get('name', 'Unknown')}) - `{token_address}`" + + # Execute backtest + result = self._execute_backtest( + token_address=token_address, + timeframe=timeframe, + start_date=start_date, + end_date=end_date, + ) + # Prepend token info to backtest result + return { + "response": f"šŸ“Š **Backtest for {token_label}**\n\n{result}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_simulate_direct(self, message: str) -> Dict[str, Any]: + """Execute simulate directly using token from strategy or message.""" + # Extract parameters from message + parts = message.split() + action = None + token_address = None + kline_interval = "1m" + + # Parse arguments from message + for i, part in enumerate(parts[1:], 1): # Skip /simulate + if part in ["start", "stop", "status", "results"]: + action = part + elif part.startswith("0x") and len(part) > 20: + token_address = part + elif part in ["1m", "5m", "15m", "1h", "4h"]: + kline_interval = part + + # If no token in message and action is start, try to get from strategy + if not token_address and self.bot_id and action == "start": + try: + from ...core.database import get_db + db = next(get_db()) + try: + bot = db.query(Bot).filter(Bot.id == self.bot_id).first() + if bot and bot.strategy_config: + conditions = bot.strategy_config.get("conditions", []) + for cond in conditions: + addr = cond.get("token_address") + if addr: + token_address = addr + break + finally: + db.close() + except Exception: + pass + + if action == "start" and not token_address: + return { + "response": "šŸŽ® **Simulation**\n\nI need a token address to start a simulation. Please provide:\n- Token contract address (e.g., `0x...`)\n- Kline interval (1m, 5m, 15m, 1h, 4h) - default is 1m", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + if not action: + return { + "response": "šŸŽ® **Simulation**\n\nPlease specify an action:\n- `/simulate start [token_address]` - Start new simulation\n- `/simulate stop` - Stop running simulation\n- `/simulate status` - Check simulation status\n- `/simulate results` - Get simulation results", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + # Execute simulation + result = self._manage_simulation( + action=action, + token_address=token_address, + kline_interval=kline_interval, + ) + return { + "response": result, + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + def chat( self, user_message: str, conversation_history: List[Dict] = None ) -> Dict[str, Any]: @@ -266,6 +1271,34 @@ class ConversationalAgent: Dict with 'response', 'thinking', and 'strategy_updated' """ try: + # Handle slash commands + if user_message.startswith("/"): + result = self._handle_slash_command(user_message) + # If None returned, it means a skill was recognized but has args - let AI handle it + if result is not None: + return result + + # Check if there's a pending command from previous message + if self.pending_command: + pending = self.pending_command + self.pending_command = None # Clear pending + + # Auto-execute the pending command with user's message as param + if pending == "search": + return self._execute_search(user_message) + elif pending == "risk": + return self._execute_risk(user_message) + elif pending == "token": + return self._execute_token(user_message) + elif pending == "price": + return self._execute_price(user_message) + + # Check for backtest/simulate with args in message + if user_message.startswith("/backtest"): + return self._execute_backtest_direct(user_message) + elif user_message.startswith("/simulate"): + return self._execute_simulate_direct(user_message) + # Build messages array with system prompt and conversation history messages = [{"role": "system", "content": SYSTEM_PROMPT_WITH_TOOLS}] @@ -295,11 +1328,11 @@ class ConversationalAgent: }, ) - result = resp.json() + result = resp.json() or {} # Extract thinking from reasoning_content thinking = None - if "choices" in result and len(result["choices"]) > 0: + if result.get("choices") and len(result.get("choices", [])) > 0: choice = result["choices"][0] if "message" in choice: message = choice["message"] @@ -331,18 +1364,26 @@ class ConversationalAgent: if code == 0: try: data = json.loads(output) - tokens = data.get("data", {}).get("tokens", []) + # Handle both dict with 'tokens' key and direct list + data_field = data.get("data", []) + if isinstance(data_field, list): + tokens = data_field + else: + tokens = data_field.get("tokens", []) if tokens: token_list = "" for t in tokens[:limit]: addr = t.get("token", "") symbol = t.get("symbol", "") name = t.get("name", "") - price_change = t.get( - "token_price_change_24h", "N/A" - ) + # Search API uses price_change_24h, trending uses token_price_change_24h + price_change = t.get("price_change_24h") or t.get("token_price_change_24h") or "N/A" mc = t.get("market_cap", "N/A") - token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" + try: + mc_str = f"${float(mc):,.0f}" + except (ValueError, TypeError): + mc_str = str(mc) + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n" response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" else: response_text = f"No tokens found for '{keyword}'. Try a different keyword." @@ -371,24 +1412,51 @@ class ConversationalAgent: if code == 0: try: data = json.loads(output) - token_data = data.get("data", {}) - if token_data: - symbol = token_data.get("symbol", "N/A") - name = token_data.get("name", "N/A") - price = token_data.get("price", "N/A") - mc = token_data.get("market_cap", "N/A") - vol = token_data.get("volume_24h", "N/A") - pairs = token_data.get("top_pairs", []) + data_field = data.get("data") + # Handle both dict and list responses + token_data = data_field if isinstance(data_field, dict) else {} + # Token details may be nested in 'token' key + token_info = token_data.get("token", token_data) + # Check if token has valid symbol/name (not None, not 'N/A') + symbol = token_info.get("symbol") or token_data.get("symbol") + name = token_info.get("name") or token_data.get("name") + if not symbol or symbol == "N/A" or not name or name == "N/A": + response_text = f"Token not found for {address}. Raw response: {output[:500]}" + else: + # Try different price field names + price = (token_info.get("current_price_usd") + or token_info.get("price_usd") + or token_info.get("price") + or token_data.get("price") + or "N/A") + mc = (token_info.get("market_cap") + or token_info.get("fdv") + or token_data.get("market_cap") + or "N/A") + vol = (token_info.get("tx_volume_u_24h") + or token_info.get("volume_24h") + or token_data.get("volume_24h") + or "N/A") + pairs = token_info.get("top_pairs") or token_data.get("top_pairs") or [] pairs_text = "" if pairs: pairs_text = "\n**Top Pairs:**\n" for p in pairs[:3]: - pairs_text += f"- {p.get('pair', 'N/A')}: ${p.get('liquidity', 'N/A'):,.0f} liquidity\n" - response_text = f"**{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: ${mc:,.0f}\n24h Volume: ${vol:,.0f}{pairs_text}" - else: - response_text = ( - f"Token not found: {address}" - ) + liq = p.get('liquidity', 'N/A') + try: + liq_str = f"${float(liq):,.0f}" + except (ValueError, TypeError): + liq_str = str(liq) + pairs_text += f"- {p.get('pair', 'N/A')}: {liq_str} liquidity\n" + try: + mc_str = f"${float(mc):,.0f}" if mc != "N/A" else "N/A" + except (ValueError, TypeError): + mc_str = str(mc) + try: + vol_str = f"${float(vol):,.0f}" if vol != "N/A" else "N/A" + except (ValueError, TypeError): + vol_str = str(vol) + response_text = f"**{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: {mc_str}\n24h Volume: {vol_str}{pairs_text}" except json.JSONDecodeError: response_text = "Failed to parse token data." else: @@ -418,6 +1486,9 @@ class ConversationalAgent: try: data = json.loads(output) prices = data.get("data", {}) + # Ensure prices is a dict + if not isinstance(prices, dict): + prices = {} if prices: price_text = "**Token Prices:**\n" for ( @@ -463,27 +1534,53 @@ class ConversationalAgent: if code == 0: try: data = json.loads(output) - risk_data = data.get("data", {}) + data_field = data.get("data") + # Handle both dict and list responses + risk_data = data_field if isinstance(data_field, dict) else {} if risk_data: is_honeypot = risk_data.get( "is_honeypot", "unknown" ) - buy_tax = risk_data.get("buy_tax", "N/A") - sell_tax = risk_data.get("sell_tax", "N/A") + buy_tax = risk_data.get("buy_tax", 0) + sell_tax = risk_data.get("sell_tax", 0) status = risk_data.get("status", "unknown") + # Convert is_honeypot to string for comparison + if isinstance(is_honeypot, bool): + is_honeypot_str = str(is_honeypot).lower() + elif isinstance(is_honeypot, int): + if is_honeypot == 1: + is_honeypot_str = "true" + elif is_honeypot == 0: + is_honeypot_str = "false" + else: + is_honeypot_str = "unknown" # -1 or other means couldn't determine + else: + is_honeypot_str = str(is_honeypot).lower() if is_honeypot else "unknown" + + # Format honeypot display value + if is_honeypot_str == "unknown": + honeypot_display = "Unknown (could not determine)" + else: + honeypot_display = is_honeypot_str + # Convert tax values to float for comparison + try: + buy_tax_val = float(buy_tax) if buy_tax not in (None, "N/A") else 0 + except (ValueError, TypeError): + buy_tax_val = 0 + try: + sell_tax_val = float(sell_tax) if sell_tax not in (None, "N/A") else 0 + except (ValueError, TypeError): + sell_tax_val = 0 risk_text = ( f"**Risk Analysis for {address}**\n\n" ) risk_text += f"- Status: {status}\n" - risk_text += f"- Honeypot: {is_honeypot}\n" + risk_text += f"- Honeypot: {honeypot_display}\n" risk_text += f"- Buy Tax: {buy_tax}%\n" risk_text += f"- Sell Tax: {sell_tax}%\n" - if is_honeypot.lower() == "true": + if is_honeypot_str == "true": risk_text += "\nāš ļø **Warning: This token appears to be a honeypot. Do not buy!**" - elif ( - float(buy_tax or 0) > 10 - or float(sell_tax or 0) > 10 - ): + elif buy_tax_val > 10 or sell_tax_val > 10: risk_text += "\nāš ļø **Warning: High tax detected. Trade with caution!**" else: risk_text += "\nāœ… This token appears safe to trade." @@ -505,6 +1602,49 @@ class ConversationalAgent: "success": True, } + elif func_name == "get_trending": + chain = args.get("chain", "bsc") + limit = args.get("limit", 10) + + code, output = self._call_ave_script( + "trending", + ["--chain", chain, "--page-size", str(min(limit, 50))], + ) + if code == 0: + try: + data = json.loads(output) + data_field = data.get("data") + # Handle both dict and list responses + tokens = data_field if isinstance(data_field, list) else data_field.get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:limit]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get("token_price_change_24h", "N/A") + mc = t.get("market_cap", "N/A") + try: + mc_str = f"${float(mc):,.0f}" + except (ValueError, TypeError): + mc_str = str(mc) + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n" + response_text = f"šŸ”„ Trending tokens on {chain.upper()}:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + else: + response_text = f"No trending tokens found on {chain.upper()}." + except json.JSONDecodeError: + response_text = "Failed to parse trending data." + else: + response_text = f"Failed to get trending tokens: {output}" + + return { + "response": response_text, + "thinking": thinking, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + elif func_name == "run_backtest": token_address = args.get("token_address") timeframe = args.get("timeframe", "1d") @@ -600,7 +1740,12 @@ class ConversationalAgent: if code == 0: try: data = json.loads(output) - tokens = data.get("data", {}).get("tokens", []) + # Handle both dict with 'tokens' key and direct list + data_field = data.get("data", []) + if isinstance(data_field, list): + tokens = data_field + else: + tokens = data_field.get("tokens", []) if tokens: token_list = "" for t in tokens[:limit]: @@ -611,7 +1756,11 @@ class ConversationalAgent: "token_price_change_24h", "N/A" ) mc = t.get("market_cap", "N/A") - token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" + try: + mc_str = f"${float(mc):,.0f}" + except (ValueError, TypeError): + mc_str = str(mc) + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n" response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" else: response_text = f"No tokens found for '{keyword}'. Try a different keyword." @@ -635,7 +1784,7 @@ class ConversationalAgent: if strategy_update: # Extract token name from conditions token_name = None - for cond in strategy_update.get("conditions", []): + for cond in (strategy_update.get("conditions") or []): if not cond.get("token_address") and cond.get("token"): token_name = cond.get("token") strategy_needs_confirmation = True @@ -650,7 +1799,12 @@ class ConversationalAgent: ) if code == 0: data = json.loads(output) - tokens = data.get("data", {}).get("tokens", []) + # Handle both dict with 'tokens' key and direct list + data_field = data.get("data", []) + if isinstance(data_field, list): + tokens = data_field + else: + tokens = data_field.get("tokens", []) if tokens: token_search_results = [ { @@ -1073,7 +2227,7 @@ Would you like me to adjust the strategy parameters based on these results?""" settings = get_settings() repo_root = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))) ) ave_skill_path = os.path.join( repo_root, "ave-cloud-skill", "scripts", "ave_data_rest.py" @@ -1092,7 +2246,11 @@ Would you like me to adjust the strategy parameters based on these results?""" env=env, timeout=30, ) - return result.returncode, result.stdout + # Include stderr in output for debugging + output = result.stdout + if result.returncode != 0 and result.stderr: + output = f"{output}\n{result.stderr}".strip() + return result.returncode, output except subprocess.TimeoutExpired: return 1, "Error: Command timed out" except Exception as e: diff --git a/src/frontend/src/lib/components/ChatInterface.svelte b/src/frontend/src/lib/components/ChatInterface.svelte index c97ce88..16ba05f 100644 --- a/src/frontend/src/lib/components/ChatInterface.svelte +++ b/src/frontend/src/lib/components/ChatInterface.svelte @@ -3,6 +3,35 @@ import type { ChatMessage } from '$lib/stores/chatStore'; import { parseMarkdown, parseInlineElements, type InlineSegment } from '$lib/utils/markdown'; + interface ToolItem { + name: string; + description: string; + command: string; + } + + const TOOLS: { category: string; label: string; tools: ToolItem[] }[] = [ + { + category: 'randebu', + label: 'šŸ¤– Randebu Built-in', + tools: [ + { name: 'backtest', description: 'Run strategy backtest', command: '/backtest' }, + { name: 'simulate', description: 'Start/stop simulation', command: '/simulate' }, + { name: 'strategy', description: 'View/update strategy', command: '/strategy' }, + ] + }, + { + category: 'ave', + label: 'ā˜ļø AVE Cloud Skills', + tools: [ + { name: 'search', description: 'Token search', command: '/search' }, + { name: 'trending', description: 'Popular tokens', command: '/trending' }, + { name: 'risk', description: 'Honeypot detection', command: '/risk' }, + { name: 'token', description: 'Token details', command: '/token' }, + { name: 'price', description: 'Batch prices', command: '/price' }, + ] + } + ]; + interface Props { bot: Bot | null; messages: ChatMessage[]; @@ -26,9 +55,16 @@ let messageInput = $state(''); let chatContainer: HTMLDivElement; let expandedThinking: Record = $state({}); + let showSlashMenu = $state(false); + let slashMenuPosition = $state({ top: 0, left: 0 }); + let selectedIndex = $state(0); + + // Use $derived for filteredTools + let filteredTools = $derived(messageInput.startsWith('/') ? TOOLS.flatMap(t => t.tools).filter(tool => tool.name.toLowerCase().startsWith(messageInput.slice(1).toLowerCase()) || tool.command.toLowerCase().startsWith(messageInput.slice(1).toLowerCase())) : []); function handleSend() { if (!messageInput.trim()) return; + showSlashMenu = false; onSendMessage(messageInput); messageInput = ''; } @@ -36,7 +72,54 @@ function handleKeydown(e: KeyboardEvent) { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); - handleSend(); + if (showSlashMenu && filteredTools.length > 0) { + selectTool(filteredTools[selectedIndex]); + } else { + handleSend(); + } + } else if (e.key === 'ArrowDown' && showSlashMenu) { + e.preventDefault(); + selectedIndex = Math.min(selectedIndex + 1, filteredTools.length - 1); + } else if (e.key === 'ArrowUp' && showSlashMenu) { + e.preventDefault(); + selectedIndex = Math.max(selectedIndex - 1, 0); + } else if (e.key === 'Escape' && showSlashMenu) { + showSlashMenu = false; + } else if (e.key === 'Tab' && showSlashMenu && filteredTools.length > 0) { + e.preventDefault(); + selectTool(filteredTools[selectedIndex]); + } + } + + function handleInput(e: Event) { + const target = e.target as HTMLTextAreaElement; + const value = target.value; + messageInput = value; + + if (value.startsWith('/')) { + selectedIndex = 0; + showSlashMenu = filteredTools.length > 0; + + if (showSlashMenu) { + // Position menu above the textarea + const rect = target.getBoundingClientRect(); + const menuHeight = 300; + slashMenuPosition = { + top: Math.max(10, rect.top - menuHeight), + left: rect.left + }; + } + } else { + showSlashMenu = false; + } + } + + function selectTool(tool: ToolItem) { + messageInput = tool.command + ' '; + showSlashMenu = false; + const textarea = document.querySelector('.input-container textarea') as HTMLTextAreaElement; + if (textarea) { + textarea.focus(); } } @@ -74,8 +157,17 @@ } }).join(''); } + + function handleClickOutside(e: MouseEvent) { + const target = e.target as HTMLElement; + if (!target.closest('.slash-menu') && !target.closest('.input-container textarea')) { + showSlashMenu = false; + } + } + +
{#if showBotSelector && availableBots.length > 0}
@@ -215,10 +307,32 @@ {#if bot}
+ {#if showSlashMenu && filteredTools.length > 0} +
+
Available Commands
+ {#each TOOLS as group} + {#if group.tools.some(t => filteredTools.includes(t))} +
{group.label}
+ {#each group.tools.filter(t => filteredTools.includes(t)) as tool, i} + + {/each} + {/if} + {/each} +
Press Tab to select, Enter to send
+
+ {/if}