diff --git a/src/backend/app/services/ai_agent/conversational.py b/src/backend/app/services/ai_agent/conversational.py index d92346f..cc6ac46 100644 --- a/src/backend/app/services/ai_agent/conversational.py +++ b/src/backend/app/services/ai_agent/conversational.py @@ -317,48 +317,41 @@ class ConversationalAgent: keyword = args.get("keyword", "") limit = args.get("limit", 10) - # Execute the tool using ave-cloud-skill - import os - import asyncio - from ...core.config import get_settings - - settings = get_settings() - os.environ["AVE_API_KEY"] = settings.AVE_API_KEY - os.environ["API_PLAN"] = settings.AVE_API_PLAN - - from ave.http import api_get - - resp = asyncio.run( - api_get( - "/tokens", - { - "keyword": keyword, - "limit": limit, - "chain": "bsc", - }, - ) + code, output = self._call_ave_script( + "search", + [ + "--keyword", + keyword, + "--chain", + "bsc", + "--limit", + str(limit), + ], ) - if resp.status_code == 200: - data = resp.json() - tokens = data.get("data", {}).get("tokens", []) - if tokens: - token_list = "" - for t in tokens[:limit]: - addr = t.get("token", "") - symbol = t.get("symbol", "") - name = t.get("name", "") - price_change = t.get( - "token_price_change_24h", "N/A" - ) - mc = t.get("market_cap", "N/A") - token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" - response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" - else: - response_text = f"No tokens found for '{keyword}'. Try a different keyword." + if code == 0: + try: + data = json.loads(output) + tokens = data.get("data", {}).get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:limit]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get( + "token_price_change_24h", "N/A" + ) + mc = t.get("market_cap", "N/A") + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" + response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + else: + response_text = f"No tokens found for '{keyword}'. Try a different keyword." + except json.JSONDecodeError: + response_text = ( + f"Failed to parse search results." + ) else: - response_text = ( - f"Failed to search tokens: {resp.status_code}" - ) + response_text = f"Failed to search tokens: {output}" return { "response": response_text, @@ -372,39 +365,36 @@ class ConversationalAgent: address = args.get("address", "") chain = args.get("chain", "bsc") - import os - import asyncio - from ...core.config import get_settings - - settings = get_settings() - os.environ["AVE_API_KEY"] = settings.AVE_API_KEY - os.environ["API_PLAN"] = settings.AVE_API_PLAN - - from ave.http import api_get - - resp = asyncio.run( - api_get(f"/tokens/{address}-{chain}") + code, output = self._call_ave_script( + "token", ["--address", address, "--chain", chain] ) - if resp.status_code == 200: - data = resp.json() - token_data = data.get("data", {}) - if token_data: - symbol = token_data.get("symbol", "N/A") - name = token_data.get("name", "N/A") - price = token_data.get("price", "N/A") - mc = token_data.get("market_cap", "N/A") - vol = token_data.get("volume_24h", "N/A") - pairs = token_data.get("top_pairs", []) - pairs_text = "" - if pairs: - pairs_text = "\n**Top Pairs:**\n" - for p in pairs[:3]: - pairs_text += f"- {p.get('pair', 'N/A')}: ${p.get('liquidity', 'N/A'):,.0f} liquidity\n" - response_text = f"**{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: ${mc:,.0f}\n24h Volume: ${vol:,.0f}{pairs_text}" - else: - response_text = f"Token not found: {address}" + if code == 0: + try: + data = json.loads(output) + token_data = data.get("data", {}) + if token_data: + symbol = token_data.get("symbol", "N/A") + name = token_data.get("name", "N/A") + price = token_data.get("price", "N/A") + mc = token_data.get("market_cap", "N/A") + vol = token_data.get("volume_24h", "N/A") + pairs = token_data.get("top_pairs", []) + pairs_text = "" + if pairs: + pairs_text = "\n**Top Pairs:**\n" + for p in pairs[:3]: + pairs_text += f"- {p.get('pair', 'N/A')}: ${p.get('liquidity', 'N/A'):,.0f} liquidity\n" + response_text = f"**{symbol}** ({name})\n\nPrice: ${price}\nMarket Cap: ${mc:,.0f}\n24h Volume: ${vol:,.0f}{pairs_text}" + else: + response_text = ( + f"Token not found: {address}" + ) + except json.JSONDecodeError: + response_text = "Failed to parse token data." else: - response_text = f"Failed to get token details: {resp.status_code}" + response_text = ( + f"Failed to get token details: {output}" + ) return { "response": response_text, @@ -417,40 +407,43 @@ class ConversationalAgent: elif func_name == "get_price": token_ids = args.get("token_ids", "") - import os - import asyncio - from ...core.config import get_settings - - settings = get_settings() - os.environ["AVE_API_KEY"] = settings.AVE_API_KEY - os.environ["API_PLAN"] = settings.AVE_API_PLAN - - from ave.http import api_post - - tokens_list = [t.strip() for t in token_ids.split(",")] - resp = asyncio.run( - api_post( - "/tokens/price", {"token_ids": tokens_list} - ) - ) - if resp.status_code == 200: - data = resp.json() - prices = data.get("data", {}) - if prices: - price_text = "**Token Prices:**\n" - for token_id, price_data in prices.items(): - price = price_data.get("price", "N/A") - change_24h = price_data.get( - "token_price_change_24h", "N/A" - ) - price_text += f"- {token_id}: ${price} (24h: {change_24h}%)\n" - response_text = price_text - else: - response_text = "No price data available." + tokens_list = token_ids.replace(",", " ").split() + if not tokens_list: + response_text = "No token IDs provided." else: - response_text = ( - f"Failed to get prices: {resp.status_code}" + code, output = self._call_ave_script( + "price", ["--tokens"] + tokens_list ) + if code == 0: + try: + data = json.loads(output) + prices = data.get("data", {}) + if prices: + price_text = "**Token Prices:**\n" + for ( + token_id, + price_data, + ) in prices.items(): + price = price_data.get( + "price", "N/A" + ) + change_24h = price_data.get( + "token_price_change_24h", "N/A" + ) + price_text += f"- {token_id}: ${price} (24h: {change_24h}%)\n" + response_text = price_text + else: + response_text = ( + "No price data available." + ) + except json.JSONDecodeError: + response_text = ( + "Failed to parse price data." + ) + else: + response_text = ( + f"Failed to get prices: {output}" + ) return { "response": response_text, @@ -464,56 +457,45 @@ class ConversationalAgent: address = args.get("address", "") chain = args.get("chain", "bsc") - import os - import asyncio - from ...core.config import get_settings - - settings = get_settings() - os.environ["AVE_API_KEY"] = settings.AVE_API_KEY - os.environ["API_PLAN"] = settings.AVE_API_PLAN - - from ave.http import api_get - - resp = asyncio.run( - api_get(f"/contracts/{address}-{chain}") + code, output = self._call_ave_script( + "risk", ["--address", address, "--chain", chain] ) - if resp.status_code == 200: - data = resp.json() - risk_data = data.get("data", {}) - if risk_data: - is_honeypot = risk_data.get( - "is_honeypot", "unknown" - ) - buy_tax = risk_data.get("buy_tax", "N/A") - sell_tax = risk_data.get("sell_tax", "N/A") - status = risk_data.get("status", "unknown") - risk_text = ( - f"**Risk Analysis for {address}**\n\n" - ) - risk_text += f"- Status: {status}\n" - risk_text += f"- Honeypot: {is_honeypot}\n" - risk_text += f"- Buy Tax: {buy_tax}%\n" - risk_text += f"- Sell Tax: {sell_tax}%\n" - if is_honeypot.lower() == "true": - risk_text += "\n⚠️ **Warning: This token appears to be a honeypot. Do not buy!**" - elif ( - float(buy_tax or 0) > 10 - or float(sell_tax or 0) > 10 - ): - risk_text += "\n⚠️ **Warning: High tax detected. Trade with caution!**" - else: - risk_text += ( - "\n✅ This token appears safe to trade." + if code == 0: + try: + data = json.loads(output) + risk_data = data.get("data", {}) + if risk_data: + is_honeypot = risk_data.get( + "is_honeypot", "unknown" ) - response_text = risk_text - else: - response_text = ( - f"No risk data available for {address}" - ) + buy_tax = risk_data.get("buy_tax", "N/A") + sell_tax = risk_data.get("sell_tax", "N/A") + status = risk_data.get("status", "unknown") + risk_text = ( + f"**Risk Analysis for {address}**\n\n" + ) + risk_text += f"- Status: {status}\n" + risk_text += f"- Honeypot: {is_honeypot}\n" + risk_text += f"- Buy Tax: {buy_tax}%\n" + risk_text += f"- Sell Tax: {sell_tax}%\n" + if is_honeypot.lower() == "true": + risk_text += "\n⚠️ **Warning: This token appears to be a honeypot. Do not buy!**" + elif ( + float(buy_tax or 0) > 10 + or float(sell_tax or 0) > 10 + ): + risk_text += "\n⚠️ **Warning: High tax detected. Trade with caution!**" + else: + risk_text += "\n✅ This token appears safe to trade." + response_text = risk_text + else: + response_text = ( + f"No risk data available for {address}" + ) + except json.JSONDecodeError: + response_text = "Failed to parse risk data." else: - response_text = ( - f"Failed to get risk data: {resp.status_code}" - ) + response_text = f"Failed to get risk data: {output}" return { "response": response_text, @@ -603,44 +585,40 @@ class ConversationalAgent: keyword = args.get("keyword", "") limit = args.get("limit", 10) - # Execute the tool using ave-cloud-skill - import os - import asyncio - from ...core.config import get_settings - - settings = get_settings() - os.environ["AVE_API_KEY"] = settings.AVE_API_KEY - os.environ["API_PLAN"] = settings.AVE_API_PLAN - - from ave.http import api_get - - resp = asyncio.run( - api_get( - "/tokens", - {"keyword": keyword, "limit": limit, "chain": "bsc"}, - ) + # Execute the tool using ave-cloud-skill CLI + code, output = self._call_ave_script( + "search", + [ + "--keyword", + keyword, + "--chain", + "bsc", + "--limit", + str(limit), + ], ) - if resp.status_code == 200: - data = resp.json() - tokens = data.get("data", {}).get("tokens", []) - if tokens: - token_list = "" - for t in tokens[:limit]: - addr = t.get("token", "") - symbol = t.get("symbol", "") - name = t.get("name", "") - price_change = t.get( - "token_price_change_24h", "N/A" - ) - mc = t.get("market_cap", "N/A") - token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" - response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" - else: - response_text = f"No tokens found for '{keyword}'. Try a different keyword." + if code == 0: + try: + data = json.loads(output) + tokens = data.get("data", {}).get("tokens", []) + if tokens: + token_list = "" + for t in tokens[:limit]: + addr = t.get("token", "") + symbol = t.get("symbol", "") + name = t.get("name", "") + price_change = t.get( + "token_price_change_24h", "N/A" + ) + mc = t.get("market_cap", "N/A") + token_list += f"- **{symbol}** ({name}): `{addr}` - MC: ${mc:,.0f} - 24h: {price_change}%\n" + response_text = f"Here are the search results for '{keyword}' on BSC:\n\n{token_list}\nWould you like me to set up a strategy for any of these?" + else: + response_text = f"No tokens found for '{keyword}'. Try a different keyword." + except json.JSONDecodeError: + response_text = "Failed to parse search results." else: - response_text = ( - f"Failed to search tokens: {resp.status_code}" - ) + response_text = f"Failed to search tokens: {output}" strategy_update = None @@ -666,33 +644,23 @@ class ConversationalAgent: # Search for token if name is found if strategy_needs_confirmation and token_name: try: - from ..ave.client import AveCloudClient - from ...core.config import get_settings - - settings = get_settings() - ave_client = AveCloudClient( - api_key=settings.AVE_API_KEY, plan=settings.AVE_API_PLAN + code, output = self._call_ave_script( + "search", + ["--keyword", token_name, "--chain", "bsc", "--limit", "5"], ) - # Run async search in sync context - import asyncio - - tokens = asyncio.run( - ave_client.get_tokens( - query=token_name, chain="bsc", limit=5 - ) - ) - if tokens: - token_search_results = [ - { - "symbol": t.get("symbol", ""), - "name": t.get("name", ""), - "address": t.get( - "token", "" - ), # trending API uses "token" for contract address - "chain": t.get("chain", "bsc"), - } - for t in tokens - ] + if code == 0: + data = json.loads(output) + tokens = data.get("data", {}).get("tokens", []) + if tokens: + token_search_results = [ + { + "symbol": t.get("symbol", ""), + "name": t.get("name", ""), + "address": t.get("token", ""), + "chain": t.get("chain", "bsc"), + } + for t in tokens + ] except Exception as e: print(f"Token search error: {e}") @@ -1096,6 +1064,40 @@ Would you like me to adjust the strategy parameters based on these results?""" print(f"Error updating strategy: {e}") return False + def _call_ave_script(self, command: str, args: list) -> tuple[int, str]: + """Call an ave-cloud-skill CLI script and return (status_code, stdout).""" + import json + import os + import subprocess + from ...core.config import get_settings + + settings = get_settings() + repo_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + ) + ave_skill_path = os.path.join( + repo_root, "ave-cloud-skill", "scripts", "ave_data_rest.py" + ) + + env = os.environ.copy() + env["AVE_API_KEY"] = settings.AVE_API_KEY + env["API_PLAN"] = settings.AVE_API_PLAN + env["AVE_USE_DOCKER"] = "false" + + try: + result = subprocess.run( + ["python3", ave_skill_path, command] + args, + capture_output=True, + text=True, + env=env, + timeout=30, + ) + return result.returncode, result.stdout + except subprocess.TimeoutExpired: + return 1, "Error: Command timed out" + except Exception as e: + return 1, f"Error: {str(e)}" + def get_conversational_agent( api_key: str = None, model: str = None, bot_id: str = None