From e92506a7870d5148f5fe6c9ad3782fc3828500bb Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Mon, 13 Apr 2026 23:23:01 +0000 Subject: [PATCH] feat: Two-step command execution flow Commands now execute in two steps: 1. User sends /search -> acknowledge and wait for param 2. User sends 'pepe' -> auto-execute search with 'pepe' Commands without params (/backtest, /simulate, /trending, /strategy) execute directly. Pending commands tracked via self.pending_command state. --- .../app/services/ai_agent/conversational.py | 322 +++++++++++++++++- 1 file changed, 320 insertions(+), 2 deletions(-) diff --git a/src/backend/app/services/ai_agent/conversational.py b/src/backend/app/services/ai_agent/conversational.py index 99acd57..77db17a 100644 --- a/src/backend/app/services/ai_agent/conversational.py +++ b/src/backend/app/services/ai_agent/conversational.py @@ -469,6 +469,9 @@ class ConversationalAgent: # Extended thinking endpoint self.thinking_endpoint = "https://api.minimax.io/v1/text/chatcompletion_v2" + # Track pending command after acknowledgment + self.pending_command = None + def _handle_slash_command(self, user_message: str) -> Dict[str, Any]: """Handle slash command help requests. @@ -511,9 +514,11 @@ class ConversationalAgent: # Special handling for /trending - execute trending directly if tool_name == "trending" and not has_args: return self._execute_trending() - # If no additional arguments, return skill acknowledgment - # If has arguments, return None to let AI handle it + # For commands that need params (/search, /risk, /token, /price) + # execute immediately if args provided, otherwise set pending if not has_args: + # Set pending command for next message + self.pending_command = tool_name return { "response": format_skill_acknowledgment( tool["name"], tool["description"] @@ -715,6 +720,304 @@ class ConversationalAgent: "success": True, } + def _execute_search(self, keyword: str) -> Dict[str, Any]: + """Execute search with the given keyword.""" + try: + code, output = self._call_ave_script( + "search", + ["--keyword", keyword.strip(), "--chain", "bsc", "--limit", "10"], + ) + if code == 0: + try: + data = json.loads(output) + # Handle both dict with 'tokens' key and direct list + data_field = data.get("data", []) + if isinstance(data_field, list): + tokens = data_field + else: + tokens = data_field.get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:10]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get("token_price_change_24h", "N/A") + mc = t.get("market_cap", "N/A") + try: + mc_str = f"${float(mc):,.0f}" + except (ValueError, TypeError): + mc_str = str(mc) + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n" + return { + "response": f"🔍 **Search Results for '{keyword}':**\n\n{token_list}\nWould you like me to set up a strategy for any of these?", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"No tokens found for '{keyword}'. Try a different keyword.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse search results: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"Failed to search tokens: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error searching tokens: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_risk(self, address: str) -> Dict[str, Any]: + """Execute risk analysis for the given token address.""" + try: + code, output = self._call_ave_script( + "risk", + ["--address", address.strip(), "--chain", "bsc"], + ) + if code == 0: + try: + data = json.loads(output) + data_field = data.get("data") + risk_data = data_field if isinstance(data_field, dict) else {} + if risk_data: + is_honeypot = risk_data.get("is_honeypot", "unknown") + buy_tax = risk_data.get("buy_tax", 0) + sell_tax = risk_data.get("sell_tax", 0) + status = risk_data.get("status", "unknown") + # Convert is_honeypot to string + if isinstance(is_honeypot, bool): + is_honeypot_str = str(is_honeypot).lower() + elif isinstance(is_honeypot, int): + is_honeypot_str = "true" if is_honeypot == 1 else "false" if is_honeypot == 0 else "unknown" + else: + is_honeypot_str = str(is_honeypot).lower() if is_honeypot else "unknown" + # Convert tax values + try: + buy_tax_val = float(buy_tax) if buy_tax not in (None, "N/A") else 0 + except (ValueError, TypeError): + buy_tax_val = 0 + try: + sell_tax_val = float(sell_tax) if sell_tax not in (None, "N/A") else 0 + except (ValueError, TypeError): + sell_tax_val = 0 + risk_text = f"🛡️ **Risk Analysis for `{address}`**\n\n" + risk_text += f"- Status: {status}\n" + risk_text += f"- Honeypot: {is_honeypot_str}\n" + risk_text += f"- Buy Tax: {buy_tax}%\n" + risk_text += f"- Sell Tax: {sell_tax}%\n" + if is_honeypot_str == "true": + risk_text += "\n⚠️ **Warning: This token appears to be a honeypot. Do not buy!**" + elif buy_tax_val > 10 or sell_tax_val > 10: + risk_text += "\n⚠️ **Warning: High tax detected. Trade with caution!**" + else: + risk_text += "\n✅ This token appears safe to trade." + return { + "response": risk_text, + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"No risk data available for `{address}`", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse risk data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"Failed to get risk data: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting risk data: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_token(self, address: str) -> Dict[str, Any]: + """Execute token details for the given address.""" + try: + code, output = self._call_ave_script( + "token", + ["--address", address.strip(), "--chain", "bsc"], + ) + if code == 0: + try: + data = json.loads(output) + data_field = data.get("data") + token_data = data_field if isinstance(data_field, dict) else {} + token_info = token_data.get("token", token_data) + symbol = token_info.get("symbol") or token_data.get("symbol") + name = token_info.get("name") or token_data.get("name") + if not symbol or symbol == "N/A" or not name or name == "N/A": + return { + "response": f"Token not found for `{address}`.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + price = token_info.get("current_price_usd") or token_info.get("price_usd") or token_info.get("price") or token_data.get("price") or "N/A" + mc = token_info.get("market_cap") or token_info.get("fdv") or token_data.get("market_cap") or "N/A" + vol = token_info.get("tx_volume_u_24h") or token_info.get("volume_24h") or token_data.get("volume_24h") or "N/A" + pairs = token_info.get("top_pairs") or token_data.get("top_pairs") or [] + pairs_text = "" + if pairs: + pairs_text = "\n**Top Pairs:**\n" + for p in pairs[:3]: + liq = p.get('liquidity', 'N/A') + try: + liq_str = f"${float(liq):,.0f}" if liq and liq != "N/A" else liq + except (ValueError, TypeError): + liq_str = str(liq) + pairs_text += f"- {p.get('pair', 'N/A')}: {liq_str} liquidity\n" + try: + mc_str = f"${float(mc):,.0f}" if mc != "N/A" else mc + except (ValueError, TypeError): + mc_str = str(mc) + try: + vol_str = f"${float(vol):,.0f}" if vol != "N/A" else vol + except (ValueError, TypeError): + vol_str = str(vol) + return { + "response": f"🪙 **{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: {mc_str}\n24h Volume: {vol_str}{pairs_text}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse token data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"Failed to get token details: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting token details: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + def _execute_price(self, token_ids: str) -> Dict[str, Any]: + """Execute price lookup for the given token IDs.""" + try: + tokens_list = token_ids.replace(",", " ").split() + if not tokens_list: + return { + "response": "No token IDs provided. Please provide token IDs like 'PEPE-bsc TRUMP-bsc'", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + code, output = self._call_ave_script( + "price", + ["--tokens"] + tokens_list, + ) + if code == 0: + try: + data = json.loads(output) + prices = data.get("data", {}) + if not isinstance(prices, dict): + prices = {} + if prices: + price_text = "💰 **Token Prices:**\n" + for token_id, price_data in prices.items(): + price = price_data.get("price", "N/A") if isinstance(price_data, dict) else "N/A" + change_24h = price_data.get("token_price_change_24h", "N/A") if isinstance(price_data, dict) else "N/A" + price_text += f"- {token_id}: ${price} (24h: {change_24h}%)\n" + return { + "response": price_text, + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": "No price data available.", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except json.JSONDecodeError: + return { + "response": f"Failed to parse price data: {output[:200]}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + else: + return { + "response": f"Failed to get prices: {output}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + except Exception as e: + return { + "response": f"Error getting prices: {str(e)}", + "thinking": None, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + def chat( self, user_message: str, conversation_history: List[Dict] = None ) -> Dict[str, Any]: @@ -735,6 +1038,21 @@ class ConversationalAgent: if result is not None: return result + # Check if there's a pending command from previous message + if self.pending_command: + pending = self.pending_command + self.pending_command = None # Clear pending + + # Auto-execute the pending command with user's message as param + if pending == "search": + return self._execute_search(user_message) + elif pending == "risk": + return self._execute_risk(user_message) + elif pending == "token": + return self._execute_token(user_message) + elif pending == "price": + return self._execute_price(user_message) + # Build messages array with system prompt and conversation history messages = [{"role": "system", "content": SYSTEM_PROMPT_WITH_TOOLS}]