fix: Execute trending command directly when /trending is called
- Added _execute_trending method that runs the trending CLI command - Returns formatted list of trending tokens on BSC - Shows error message if no tokens found or command fails
This commit is contained in:
@@ -508,6 +508,9 @@ class ConversationalAgent:
|
||||
# Special handling for /strategy - fetch current strategy from DB
|
||||
if tool_name == "strategy" and not has_args:
|
||||
return self._get_strategy_response()
|
||||
# Special handling for /trending - execute trending directly
|
||||
if tool_name == "trending" and not has_args:
|
||||
return self._execute_trending()
|
||||
# If no additional arguments, return skill acknowledgment
|
||||
# If has arguments, return None to let AI handle it
|
||||
if not has_args:
|
||||
@@ -643,6 +646,75 @@ class ConversationalAgent:
|
||||
"success": True,
|
||||
}
|
||||
|
||||
def _execute_trending(self) -> Dict[str, Any]:
|
||||
"""Execute the trending tokens command and return results."""
|
||||
try:
|
||||
code, output = self._call_ave_script(
|
||||
"trending",
|
||||
["--chain", "bsc", "--page-size", "10"],
|
||||
)
|
||||
if code == 0:
|
||||
try:
|
||||
data = json.loads(output)
|
||||
# Handle both dict with 'tokens' key and direct list
|
||||
data_field = data.get("data", [])
|
||||
if isinstance(data_field, list):
|
||||
tokens = data_field
|
||||
else:
|
||||
tokens = data_field.get("tokens", [])
|
||||
if tokens:
|
||||
token_list = ""
|
||||
for t in tokens[:10]:
|
||||
addr = t.get("token", "")
|
||||
symbol = t.get("symbol", "")
|
||||
name = t.get("name", "")
|
||||
price_change = t.get("token_price_change_24h", "N/A")
|
||||
mc = t.get("market_cap", "N/A")
|
||||
try:
|
||||
mc_str = f"${float(mc):,.0f}"
|
||||
except (ValueError, TypeError):
|
||||
mc_str = str(mc)
|
||||
token_list += f"- **{symbol}** ({name}): `{addr}` - MC: {mc_str} - 24h: {price_change}%\n"
|
||||
return {
|
||||
"response": f"📈 **Trending Tokens on BSC:**\n\n{token_list}\nWould you like me to set up a strategy for any of these?",
|
||||
"thinking": None,
|
||||
"strategy_updated": False,
|
||||
"strategy_needs_confirmation": False,
|
||||
"success": True,
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"response": "No trending tokens found on BSC right now. Try again later!",
|
||||
"thinking": None,
|
||||
"strategy_updated": False,
|
||||
"strategy_needs_confirmation": False,
|
||||
"success": True,
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
return {
|
||||
"response": f"Failed to parse trending data: {output[:200]}",
|
||||
"thinking": None,
|
||||
"strategy_updated": False,
|
||||
"strategy_needs_confirmation": False,
|
||||
"success": True,
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"response": f"Failed to get trending tokens: {output}",
|
||||
"thinking": None,
|
||||
"strategy_updated": False,
|
||||
"strategy_needs_confirmation": False,
|
||||
"success": True,
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"response": f"Error getting trending tokens: {str(e)}",
|
||||
"thinking": None,
|
||||
"strategy_updated": False,
|
||||
"strategy_needs_confirmation": False,
|
||||
"success": True,
|
||||
}
|
||||
|
||||
def chat(
|
||||
self, user_message: str, conversation_history: List[Dict] = None
|
||||
) -> Dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user