diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..856f377 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "ave-cloud-skill"] + path = ave-cloud-skill + url = https://github.com/AveCloud/ave-cloud-skill.git diff --git a/ave-cloud-skill b/ave-cloud-skill new file mode 160000 index 0000000..5eaef99 --- /dev/null +++ b/ave-cloud-skill @@ -0,0 +1 @@ +Subproject commit 5eaef99e151aeb595416f50294152f09d2201556 diff --git a/src/backend/app/ave b/src/backend/app/ave new file mode 120000 index 0000000..926d4c6 --- /dev/null +++ b/src/backend/app/ave @@ -0,0 +1 @@ +../../ave-cloud-skill/scripts/ave \ No newline at end of file diff --git a/src/backend/app/services/ai_agent/conversational.py b/src/backend/app/services/ai_agent/conversational.py index bc1138e..cc6ac46 100644 --- a/src/backend/app/services/ai_agent/conversational.py +++ b/src/backend/app/services/ai_agent/conversational.py @@ -84,19 +84,84 @@ TOOLS = [ "type": "function", "function": { "name": "search_tokens", - "description": "Search for trending tokens on BSC blockchain. Use this when user asks for token recommendations, trending tokens, or wants to discover new tokens to trade. ALWAYS uses BSC chain.", + "description": "Search for tokens by keyword on BSC blockchain. Use this when user asks to search for a specific token or find tokens by name/symbol.", "parameters": { "type": "object", "properties": { + "keyword": { + "type": "string", + "description": "Token symbol or name to search for (e.g., 'PEPE', 'BTC')", + }, "limit": { "type": "integer", "description": "Number of tokens to return (default: 10)", - "default": 10 + "default": 10, + }, + }, + "required": ["keyword"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_token", + "description": "Get detailed information about a specific token including price, market cap, and pairs. Use when user asks for token details or wants to find a specific token by contract address.", + "parameters": { + "type": "object", + "properties": { + "address": { + "type": "string", + "description": "Token contract address (e.g., '0x6982508145454Ce125dDE157d8d64a26D53f60a2')", + }, + "chain": { + "type": "string", + "description": "Blockchain chain (default: bsc)", + "default": "bsc", + }, + }, + "required": ["address"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_price", + "description": "Get current price(s) for tokens. Use when user asks for token price or wants to compare prices of multiple tokens.", + "parameters": { + "type": "object", + "properties": { + "token_ids": { + "type": "string", + "description": "Comma-separated list of token IDs with chain suffix (e.g., 'PEPE-bsc,TRUMP-bsc')", } }, - "required": [] - } - } + "required": ["token_ids"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_risk", + "description": "Get risk analysis for a token contract. Use when user asks about token risk, honeypot analysis, or safety assessment before trading.", + "parameters": { + "type": "object", + "properties": { + "address": { + "type": "string", + "description": "Token contract address (e.g., '0x6982508145454Ce125dDE157d8d64a26D53f60a2')", + }, + "chain": { + "type": "string", + "description": "Blockchain chain (default: bsc)", + "default": "bsc", + }, + }, + "required": ["address"], + }, + }, }, { "type": "function", @@ -108,25 +173,25 @@ TOOLS = [ "properties": { "token_address": { "type": "string", - "description": "The BSC contract address of the token to backtest (required)" + "description": "The BSC contract address of the token to backtest (required)", }, "timeframe": { "type": "string", "description": "Timeframe for klines: '1d' (1 day), '4h' (4 hours), '1h' (1 hour), '15m' (15 minutes)", - "default": "1d" + "default": "1d", }, "start_date": { "type": "string", - "description": "Start date for backtest in YYYY-MM-DD format (e.g., '2024-01-01')" + "description": "Start date for backtest in YYYY-MM-DD format (e.g., '2024-01-01')", }, "end_date": { "type": "string", - "description": "End date for backtest in YYYY-MM-DD format (e.g., '2024-12-01')" - } + "description": "End date for backtest in YYYY-MM-DD format (e.g., '2024-12-01')", + }, }, - "required": ["token_address"] - } - } + "required": ["token_address"], + }, + }, }, { "type": "function", @@ -139,28 +204,33 @@ TOOLS = [ "action": { "type": "string", "enum": ["start", "stop", "status", "results"], - "description": "Action to perform: 'start' (begin new simulation), 'stop' (stop running simulation), 'status' (check if simulation is running), 'results' (get results from current or latest simulation)" + "description": "Action to perform: 'start' (begin new simulation), 'stop' (stop running simulation), 'status' (check if simulation is running), 'results' (get results from current or latest simulation)", }, "token_address": { "type": "string", - "description": "Token contract address for simulation (required for 'start' action)" + "description": "Token contract address for simulation (required for 'start' action)", }, "kline_interval": { "type": "string", "description": "Kline interval: '1m', '5m', '15m', '1h' (default: '1m')", - "default": "1m" - } + "default": "1m", + }, }, - "required": ["action"] - } - } - } + "required": ["action"], + }, + }, + }, ] -SYSTEM_PROMPT_WITH_TOOLS = SYSTEM_PROMPT + """ +SYSTEM_PROMPT_WITH_TOOLS = ( + SYSTEM_PROMPT + + """ You have access to tools: -- search_tokens(chain, limit): Search for trending tokens on a blockchain. Use it when user asks for token recommendations or trending tokens. +- search_tokens(keyword, limit): Search for tokens by keyword. Use it when user asks to search for a token or find tokens by name/symbol. +- get_token(address, chain): Get detailed information about a specific token. Use when user asks for token details. +- get_price(token_ids): Get current price(s) for tokens. Use when user asks for token price. +- get_risk(address, chain): Get risk analysis for a token. Use when user asks about token safety or honeypot analysis. - run_backtest(token_address, timeframe, start_date, end_date): Run a backtest on historical data. Returns performance metrics. Use when user asks to backtest or check historical performance. - manage_simulation(action, token_address, kline_interval): Manage trading simulations. Actions: 'start' (begin new), 'stop' (stop running), 'status' (check if running), 'results' (get current/latest results). @@ -171,6 +241,7 @@ When you want to use a tool, respond with: "tool_call": {"name": "run_backtest", "arguments": {"token_address": "0x...", "timeframe": "1d", "start_date": "2024-01-01", "end_date": "2024-12-01"}} } """ +) class ConversationalAgent: @@ -178,55 +249,54 @@ class ConversationalAgent: self.api_key = api_key self.model = model self.bot_id = bot_id - + # Extended thinking endpoint self.thinking_endpoint = "https://api.minimax.io/v1/text/chatcompletion_v2" - - def chat(self, user_message: str, conversation_history: List[Dict] = None) -> Dict[str, Any]: + + def chat( + self, user_message: str, conversation_history: List[Dict] = None + ) -> Dict[str, Any]: """Process a user message and return a structured response. - + Args: user_message: The user's message conversation_history: Optional list of previous messages - + Returns: Dict with 'response', 'thinking', and 'strategy_updated' """ try: # Build messages array with system prompt and conversation history messages = [{"role": "system", "content": SYSTEM_PROMPT_WITH_TOOLS}] - + # Add conversation history (last 10 messages) if conversation_history: for msg in conversation_history[-10:]: role = "assistant" if msg.get("role") == "assistant" else "user" messages.append({"role": role, "content": msg.get("content", "")}) - + # Add current user message messages.append({"role": "user", "content": user_message}) - + # Make API call to extended thinking endpoint resp = requests.post( self.thinking_endpoint, headers={ "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json" + "Content-Type": "application/json", }, json={ "model": self.model, "messages": messages, "temperature": 0.7, "max_tokens": 2000, - "thinking": { - "type": "human", - "budget_tokens": 1500 - }, - "tools": TOOLS - } + "thinking": {"type": "human", "budget_tokens": 1500}, + "tools": TOOLS, + }, ) - + result = resp.json() - + # Extract thinking from reasoning_content thinking = None if "choices" in result and len(result["choices"]) > 0: @@ -234,7 +304,7 @@ class ConversationalAgent: if "message" in choice: message = choice["message"] thinking = message.get("reasoning_content") - + # Check for native function calls (tool_calls) tool_calls = message.get("tool_calls", []) if tool_calls: @@ -242,158 +312,326 @@ class ConversationalAgent: func = tool_call.get("function", {}) func_name = func.get("name", "") args = json.loads(func.get("arguments", "{}")) - + if func_name == "search_tokens": - chain = "bsc" # Always BSC + keyword = args.get("keyword", "") limit = args.get("limit", 10) - - # Execute the tool - from ..ave.client import AveCloudClient - from ...core.config import get_settings - settings = get_settings() - ave_client = AveCloudClient( - api_key=settings.AVE_API_KEY, - plan=settings.AVE_API_PLAN + + code, output = self._call_ave_script( + "search", + [ + "--keyword", + keyword, + "--chain", + "bsc", + "--limit", + str(limit), + ], ) - import asyncio - tokens = asyncio.run(ave_client.get_tokens(chain=chain, limit=limit)) - - if tokens: - # Format tokens for response - token_list = "" - for t in tokens[:limit]: - addr = t.get("token", "") - symbol = t.get("symbol", "") - name = t.get("name", "") - price_change = t.get("token_price_change_24h", "N/A") - token_list += f"- **{symbol}** ({name}): `{addr}` - 24h change: {price_change}%\n" - - response_text = f"Here are the trending tokens on {chain.upper()}:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + if code == 0: + try: + data = json.loads(output) + tokens = data.get("data", {}).get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:limit]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get( + "token_price_change_24h", "N/A" + ) + mc = t.get("market_cap", "N/A") + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" + response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + else: + response_text = f"No tokens found for '{keyword}'. Try a different keyword." + except json.JSONDecodeError: + response_text = ( + f"Failed to parse search results." + ) else: - response_text = f"I couldn't find any trending tokens on {chain.upper()}. Try again later." - - # Return the tool result directly + response_text = f"Failed to search tokens: {output}" + return { "response": response_text, "thinking": thinking, "strategy_updated": False, "strategy_needs_confirmation": False, - "success": True + "success": True, } - + + elif func_name == "get_token": + address = args.get("address", "") + chain = args.get("chain", "bsc") + + code, output = self._call_ave_script( + "token", ["--address", address, "--chain", chain] + ) + if code == 0: + try: + data = json.loads(output) + token_data = data.get("data", {}) + if token_data: + symbol = token_data.get("symbol", "N/A") + name = token_data.get("name", "N/A") + price = token_data.get("price", "N/A") + mc = token_data.get("market_cap", "N/A") + vol = token_data.get("volume_24h", "N/A") + pairs = token_data.get("top_pairs", []) + pairs_text = "" + if pairs: + pairs_text = "\n**Top Pairs:**\n" + for p in pairs[:3]: + pairs_text += f"- {p.get('pair', 'N/A')}: ${p.get('liquidity', 'N/A'):,.0f} liquidity\n" + response_text = f"**{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: ${mc:,.0f}\n24h Volume: ${vol:,.0f}{pairs_text}" + else: + response_text = ( + f"Token not found: {address}" + ) + except json.JSONDecodeError: + response_text = "Failed to parse token data." + else: + response_text = ( + f"Failed to get token details: {output}" + ) + + return { + "response": response_text, + "thinking": thinking, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + elif func_name == "get_price": + token_ids = args.get("token_ids", "") + + tokens_list = token_ids.replace(",", " ").split() + if not tokens_list: + response_text = "No token IDs provided." + else: + code, output = self._call_ave_script( + "price", ["--tokens"] + tokens_list + ) + if code == 0: + try: + data = json.loads(output) + prices = data.get("data", {}) + if prices: + price_text = "**Token Prices:**\n" + for ( + token_id, + price_data, + ) in prices.items(): + price = price_data.get( + "price", "N/A" + ) + change_24h = price_data.get( + "token_price_change_24h", "N/A" + ) + price_text += f"- {token_id}: ${price} (24h: {change_24h}%)\n" + response_text = price_text + else: + response_text = ( + "No price data available." + ) + except json.JSONDecodeError: + response_text = ( + "Failed to parse price data." + ) + else: + response_text = ( + f"Failed to get prices: {output}" + ) + + return { + "response": response_text, + "thinking": thinking, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + + elif func_name == "get_risk": + address = args.get("address", "") + chain = args.get("chain", "bsc") + + code, output = self._call_ave_script( + "risk", ["--address", address, "--chain", chain] + ) + if code == 0: + try: + data = json.loads(output) + risk_data = data.get("data", {}) + if risk_data: + is_honeypot = risk_data.get( + "is_honeypot", "unknown" + ) + buy_tax = risk_data.get("buy_tax", "N/A") + sell_tax = risk_data.get("sell_tax", "N/A") + status = risk_data.get("status", "unknown") + risk_text = ( + f"**Risk Analysis for {address}**\n\n" + ) + risk_text += f"- Status: {status}\n" + risk_text += f"- Honeypot: {is_honeypot}\n" + risk_text += f"- Buy Tax: {buy_tax}%\n" + risk_text += f"- Sell Tax: {sell_tax}%\n" + if is_honeypot.lower() == "true": + risk_text += "\n⚠️ **Warning: This token appears to be a honeypot. Do not buy!**" + elif ( + float(buy_tax or 0) > 10 + or float(sell_tax or 0) > 10 + ): + risk_text += "\n⚠️ **Warning: High tax detected. Trade with caution!**" + else: + risk_text += "\n✅ This token appears safe to trade." + response_text = risk_text + else: + response_text = ( + f"No risk data available for {address}" + ) + except json.JSONDecodeError: + response_text = "Failed to parse risk data." + else: + response_text = f"Failed to get risk data: {output}" + + return { + "response": response_text, + "thinking": thinking, + "strategy_updated": False, + "strategy_needs_confirmation": False, + "success": True, + } + elif func_name == "run_backtest": token_address = args.get("token_address") timeframe = args.get("timeframe", "1d") start_date = args.get("start_date") end_date = args.get("end_date") - + # Execute backtest backtest_result = self._execute_backtest( token_address=token_address, timeframe=timeframe, start_date=start_date, - end_date=end_date + end_date=end_date, ) - + return { "response": backtest_result, "thinking": thinking, "strategy_updated": False, "strategy_needs_confirmation": False, - "success": True + "success": True, } - + elif func_name == "manage_simulation": action = args.get("action") token_address = args.get("token_address") kline_interval = args.get("kline_interval", "1m") - + # Execute simulation management sim_result = self._manage_simulation( action=action, token_address=token_address, - kline_interval=kline_interval + kline_interval=kline_interval, ) - + return { "response": sim_result, "thinking": thinking, "strategy_updated": False, "strategy_needs_confirmation": False, - "success": True + "success": True, } - + # Get the main response content - content = result.get("choices", [{}])[0].get("message", {}).get("content", "") - + content = ( + result.get("choices", [{}])[0].get("message", {}).get("content", "") + ) + # Parse JSON from the content thinking_field = None response_text = content strategy_update = None - + # Try to extract JSON from the content - json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL) + json_match = re.search( + r"```(?:json)?\s*(\{.*?\})\s*```", content, re.DOTALL + ) if json_match: json_str = json_match.group(1) else: # Try to find JSON object directly - json_match = re.search(r'\{.*\}', content, re.DOTALL) + json_match = re.search(r"\{.*\}", content, re.DOTALL) if json_match: json_str = json_match.group(0) else: json_str = None - + if json_str: try: parsed = json.loads(json_str) thinking_field = parsed.get("thinking", "") response_text = parsed.get("response", content) strategy_update = parsed.get("strategy_update") - + # Handle tool call tool_call = parsed.get("tool_call") if tool_call and tool_call.get("name") == "search_tokens": args = tool_call.get("arguments", {}) - chain = args.get("chain", "bsc") + keyword = args.get("keyword", "") limit = args.get("limit", 10) - - # Execute the tool - from ..ave.client import AveCloudClient - from ...core.config import get_settings - settings = get_settings() - ave_client = AveCloudClient( - api_key=settings.AVE_API_KEY, - plan=settings.AVE_API_PLAN + + # Execute the tool using ave-cloud-skill CLI + code, output = self._call_ave_script( + "search", + [ + "--keyword", + keyword, + "--chain", + "bsc", + "--limit", + str(limit), + ], ) - import asyncio - tokens = asyncio.run(ave_client.get_tokens(chain=chain, limit=limit)) - - if tokens: - # Format tokens for response - token_list = "" - for t in tokens[:limit]: - addr = t.get("token", "") - symbol = t.get("symbol", "") - name = t.get("name", "") - price_change = t.get("token_price_change_24h", "N/A") - token_list += f"- **{symbol}** ({name}): `{addr}` - 24h change: {price_change}%\n" - - response_text = f"Here are the trending tokens on {chain.upper()}:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + if code == 0: + try: + data = json.loads(output) + tokens = data.get("data", {}).get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:limit]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get( + "token_price_change_24h", "N/A" + ) + mc = t.get("market_cap", "N/A") + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" + response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + else: + response_text = f"No tokens found for '{keyword}'. Try a different keyword." + except json.JSONDecodeError: + response_text = "Failed to parse search results." else: - response_text = f"I couldn't find any trending tokens on {chain.upper()}. Try again later." - + response_text = f"Failed to search tokens: {output}" + strategy_update = None - + except json.JSONDecodeError: pass # Use defaults - + # Use the native thinking from API if available, otherwise use parsed thinking final_thinking = thinking or thinking_field - + # Check if token_address is missing in strategy_update strategy_needs_confirmation = False token_search_results = None - + if strategy_update: # Extract token name from conditions token_name = None @@ -402,33 +640,30 @@ class ConversationalAgent: token_name = cond.get("token") strategy_needs_confirmation = True break - + # Search for token if name is found if strategy_needs_confirmation and token_name: try: - from ..ave.client import AveCloudClient - from ...core.config import get_settings - settings = get_settings() - ave_client = AveCloudClient( - api_key=settings.AVE_API_KEY, - plan=settings.AVE_API_PLAN + code, output = self._call_ave_script( + "search", + ["--keyword", token_name, "--chain", "bsc", "--limit", "5"], ) - # Run async search in sync context - import asyncio - tokens = asyncio.run(ave_client.get_tokens(query=token_name, chain="bsc", limit=5)) - if tokens: - token_search_results = [ - { - "symbol": t.get("symbol", ""), - "name": t.get("name", ""), - "address": t.get("token", ""), # trending API uses "token" for contract address - "chain": t.get("chain", "bsc") - } - for t in tokens - ] + if code == 0: + data = json.loads(output) + tokens = data.get("data", {}).get("tokens", []) + if tokens: + token_search_results = [ + { + "symbol": t.get("symbol", ""), + "name": t.get("name", ""), + "address": t.get("token", ""), + "chain": t.get("chain", "bsc"), + } + for t in tokens + ] except Exception as e: print(f"Token search error: {e}") - + # Only update strategy if token_address is provided if strategy_update and strategy_needs_confirmation: # Don't auto-save - user needs to confirm token address @@ -440,35 +675,35 @@ class ConversationalAgent: "strategy_needs_confirmation": True, "strategy_data": strategy_update, "token_search_results": token_search_results, - "success": True + "success": True, } - + # Update strategy in database if provided if strategy_update and self.bot_id: self._update_strategy(strategy_update) - + return { "response": response_text, "thinking": final_thinking, "strategy_updated": strategy_update is not None, "strategy_needs_confirmation": False, - "success": True + "success": True, } - + except Exception as e: return { "response": f"I encountered an error: {str(e)}. Please try again.", "thinking": None, "strategy_updated": False, - "success": False + "success": False, } - + def _execute_backtest( self, token_address: str, timeframe: str = "1d", start_date: str = None, - end_date: str = None + end_date: str = None, ) -> str: """Execute a backtest using the bot's current strategy.""" try: @@ -479,21 +714,21 @@ class ConversationalAgent: from ...core.config import get_settings from datetime import datetime import uuid - + settings = get_settings() db = next(get_db()) - + # Get the bot bot = db.query(Bot).filter(Bot.id == self.bot_id).first() if not bot: return "I couldn't find the bot. Please try again." - + # Default dates if not provided (last 30 days) if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") if not start_date: start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") - + # Create backtest engine backtest_config = { "bot_id": self.bot_id, @@ -507,27 +742,29 @@ class ConversationalAgent: "ave_api_plan": settings.AVE_API_PLAN, "initial_balance": 10000.0, } - + engine = BacktestEngine(backtest_config) results = asyncio.run(engine.run()) - + # Format results for display if "error" in results: return f"Backtest failed: {results['error']}" - + total_return = results.get("total_return", 0) win_rate = results.get("win_rate", 0) total_trades = results.get("total_trades", 0) max_drawdown = results.get("max_drawdown", 0) sharpe_ratio = results.get("sharpe_ratio", 0) final_balance = results.get("final_balance", 10000) - + # Format return with emoji indicators return_emoji = "📈" if total_return >= 0 else "📉" - return_str = f"+{total_return:.2f}%" if total_return >= 0 else f"{total_return:.2f}%" - + return_str = ( + f"+{total_return:.2f}%" if total_return >= 0 else f"{total_return:.2f}%" + ) + drawdown_emoji = "⚠️" if abs(max_drawdown) > 10 else "✅" - + response = f"""Here's the backtest result for {token_address}: **Performance Summary** @@ -543,17 +780,14 @@ class ConversationalAgent: **Period**: {start_date} to {end_date} ({timeframe}) Would you like me to adjust the strategy parameters based on these results?""" - + return response - + except Exception as e: return f"I encountered an error running the backtest: {str(e)}" - + def _manage_simulation( - self, - action: str, - token_address: str = None, - kline_interval: str = "1m" + self, action: str, token_address: str = None, kline_interval: str = "1m" ) -> str: """Manage trading simulations: start, stop, status, or results.""" try: @@ -564,30 +798,34 @@ Would you like me to adjust the strategy parameters based on these results?""" from ...services.simulate.engine import SimulateEngine from ...core.config import get_settings from datetime import datetime - + db = SessionLocal() settings = get_settings() - + try: # Get the bot bot = db.query(Bot).filter(Bot.id == self.bot_id).first() if not bot: return "I couldn't find the bot. Please try again." - + if action == "start": if not token_address: return "I need a token address to start a simulation. Which token would you like to simulate?" - + # Check if there's already a running simulation - running_sim = db.query(Simulation).filter( - Simulation.bot_id == self.bot_id, - Simulation.status == "running" - ).first() - + running_sim = ( + db.query(Simulation) + .filter( + Simulation.bot_id == self.bot_id, + Simulation.status == "running", + ) + .first() + ) + if running_sim: # Stop the existing one first self._stop_simulation_db(running_sim.id) - + # Create new simulation sim_id = str(uuid.uuid4()) simulation = Simulation( @@ -598,15 +836,15 @@ Would you like me to adjust the strategy parameters based on these results?""" config={ "token": token_address, "chain": "bsc", - "kline_interval": kline_interval + "kline_interval": kline_interval, }, signals=[], klines=[], - trade_log=[] + trade_log=[], ) db.add(simulation) db.commit() - + # Start the simulation in background sim_config = { "bot_id": self.bot_id, @@ -620,138 +858,177 @@ Would you like me to adjust the strategy parameters based on these results?""" "ave_api_plan": settings.AVE_API_PLAN, "initial_balance": 10000.0, } - + # Run simulation in background thread def run_sim(): - asyncio.run(self._run_simulation_sync(sim_id, settings.DATABASE_URL, sim_config)) - + asyncio.run( + self._run_simulation_sync( + sim_id, settings.DATABASE_URL, sim_config + ) + ) + thread = threading.Thread(target=run_sim) thread.daemon = True thread.start() - + return f"Started simulation on {token_address} using {kline_interval} klines. The simulation is running and will process up to 100 candles. Ask me for status or results anytime!" - + elif action == "stop": # Find running simulation - running_sim = db.query(Simulation).filter( - Simulation.bot_id == self.bot_id, - Simulation.status == "running" - ).first() - + running_sim = ( + db.query(Simulation) + .filter( + Simulation.bot_id == self.bot_id, + Simulation.status == "running", + ) + .first() + ) + if not running_sim: return "No simulation is currently running." - + self._stop_simulation_db(running_sim.id) - + # Get final results portfolio = running_sim.portfolio or {} current_balance = portfolio.get("current_balance", 10000) initial_balance = portfolio.get("initial_balance", 10000) pnl = current_balance - initial_balance - pnl_pct = (pnl / initial_balance) * 100 if initial_balance > 0 else 0 - + pnl_pct = ( + (pnl / initial_balance) * 100 if initial_balance > 0 else 0 + ) + return f"Simulation stopped!\n\nFinal Results:\n💰 Final Balance: ${current_balance:,.2f}\n📈 P&L: {'+' if pnl >= 0 else ''}${pnl:,.2f} ({'+' if pnl_pct >= 0 else ''}{pnl_pct:.2f}%)\n📊 Trades: {len(running_sim.trade_log or [])}" - + elif action == "status": # Find running simulation - running_sim = db.query(Simulation).filter( - Simulation.bot_id == self.bot_id, - Simulation.status == "running" - ).first() - + running_sim = ( + db.query(Simulation) + .filter( + Simulation.bot_id == self.bot_id, + Simulation.status == "running", + ) + .first() + ) + if not running_sim: return "No simulation is currently running." - + portfolio = running_sim.portfolio or {} klines_count = len(running_sim.klines or []) trade_count = len(running_sim.trade_log or []) - + status = f"**Simulation Status: Running**\n\n" status += f"📊 Candles processed: ~{klines_count}\n" status += f"📈 Trades executed: {trade_count}\n" - + if portfolio.get("position", 0) > 0: status += f"💰 Position: {portfolio['position']:.4f} {portfolio.get('position_token', 'TOKEN')}\n" - status += f"💰 Cash: ${portfolio.get('current_balance', 0):,.2f}\n" + status += ( + f"💰 Cash: ${portfolio.get('current_balance', 0):,.2f}\n" + ) else: status += f"💰 Cash: ${portfolio.get('current_balance', 10000):,.2f}\n" - + status += "\nAsk me to stop or get full results anytime!" return status - + elif action == "results": # Find running or most recent simulation - simulation = db.query(Simulation).filter( - Simulation.bot_id == self.bot_id - ).order_by(Simulation.started_at.desc()).first() - + simulation = ( + db.query(Simulation) + .filter(Simulation.bot_id == self.bot_id) + .order_by(Simulation.started_at.desc()) + .first() + ) + if not simulation: return "No simulation found. Start a simulation first!" - + portfolio = simulation.portfolio or {} current_balance = portfolio.get("current_balance", 10000) initial_balance = portfolio.get("initial_balance", 10000) pnl = current_balance - initial_balance - pnl_pct = (pnl / initial_balance) * 100 if initial_balance > 0 else 0 + pnl_pct = ( + (pnl / initial_balance) * 100 if initial_balance > 0 else 0 + ) trade_log = simulation.trade_log or [] - + status_emoji = "🟢" if simulation.status == "running" else "⚪" - status_text = "Running" if simulation.status == "running" else "Completed/Stopped" - - results = f"**Simulation Results** {status_emoji} ({status_text})\n\n" + status_text = ( + "Running" + if simulation.status == "running" + else "Completed/Stopped" + ) + + results = ( + f"**Simulation Results** {status_emoji} ({status_text})\n\n" + ) results += f"💰 Final Balance: ${current_balance:,.2f}\n" results += f"📈 P&L: {'+' if pnl >= 0 else ''}${pnl:,.2f} ({'+' if pnl_pct >= 0 else ''}{pnl_pct:.2f}%)\n" results += f"📊 Total Trades: {len(trade_log)}\n" - + if simulation.status == "running": - results += f"\n⏳ Simulation still running... (refresh for latest)" - + results += ( + f"\n⏳ Simulation still running... (refresh for latest)" + ) + return results - + else: return f"Unknown action: {action}. Use 'start', 'stop', 'status', or 'results'." - + finally: db.close() - + except Exception as e: return f"I encountered an error managing the simulation: {str(e)}" - + def _stop_simulation_db(self, simulation_id: str): """Stop a simulation in the database.""" from ...core.database import SessionLocal + db = SessionLocal() try: - simulation = db.query(Simulation).filter(Simulation.id == simulation_id).first() + simulation = ( + db.query(Simulation).filter(Simulation.id == simulation_id).first() + ) if simulation: simulation.status = "stopped" db.commit() finally: db.close() - + async def _run_simulation_sync(self, simulation_id: str, db_url: str, config: dict): """Run simulation synchronously in background.""" from ...services.simulate.engine import SimulateEngine from ...core.database import SessionLocal - + async def _run(): engine = SimulateEngine(config) engine.run_id = simulation_id - + def serialize_signal(s): created = s.get("created_at") if hasattr(created, "isoformat"): created = created.isoformat() return {**s, "created_at": created} - + def save_progress(): db = SessionLocal() try: - sim = db.query(Simulation).filter(Simulation.id == simulation_id).first() + sim = ( + db.query(Simulation) + .filter(Simulation.id == simulation_id) + .first() + ) if sim: sim.status = engine.status sim.signals = [serialize_signal(s) for s in engine.signals] - sim.klines = [{"time": k.get("time"), "close": k.get("close")} for k in engine.klines] + sim.klines = [ + {"time": k.get("time"), "close": k.get("close")} + for k in engine.klines + ] sim.trade_log = engine.trade_log sim.portfolio = { "initial_balance": config.get("initial_balance", 10000), @@ -764,19 +1041,19 @@ Would you like me to adjust the strategy parameters based on these results?""" db.commit() finally: db.close() - + try: await engine.run() finally: save_progress() - + asyncio.run(_run()) - + def _update_strategy(self, strategy_update: Dict) -> bool: """Update the bot's strategy in the database.""" try: from ...core.database import get_db - + db = next(get_db()) bot = db.query(Bot).filter(Bot.id == self.bot_id).first() if bot: @@ -787,8 +1064,44 @@ Would you like me to adjust the strategy parameters based on these results?""" print(f"Error updating strategy: {e}") return False + def _call_ave_script(self, command: str, args: list) -> tuple[int, str]: + """Call an ave-cloud-skill CLI script and return (status_code, stdout).""" + import json + import os + import subprocess + from ...core.config import get_settings -def get_conversational_agent(api_key: str = None, model: str = None, bot_id: str = None) -> ConversationalAgent: + settings = get_settings() + repo_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + ) + ave_skill_path = os.path.join( + repo_root, "ave-cloud-skill", "scripts", "ave_data_rest.py" + ) + + env = os.environ.copy() + env["AVE_API_KEY"] = settings.AVE_API_KEY + env["API_PLAN"] = settings.AVE_API_PLAN + env["AVE_USE_DOCKER"] = "false" + + try: + result = subprocess.run( + ["python3", ave_skill_path, command] + args, + capture_output=True, + text=True, + env=env, + timeout=30, + ) + return result.returncode, result.stdout + except subprocess.TimeoutExpired: + return 1, "Error: Command timed out" + except Exception as e: + return 1, f"Error: {str(e)}" + + +def get_conversational_agent( + api_key: str = None, model: str = None, bot_id: str = None +) -> ConversationalAgent: """Get or create a ConversationalAgent instance.""" if api_key is None: settings = get_settings() @@ -796,5 +1109,5 @@ def get_conversational_agent(api_key: str = None, model: str = None, bot_id: str if model is None: settings = get_settings() model = settings.MINIMAX_MODEL - + return ConversationalAgent(api_key=api_key, model=model, bot_id=bot_id)