feat: use direct LLM with structured JSON for thinking/response separation

This commit is contained in:
shokollm
2026-04-10 09:31:07 +00:00
parent ad4a1e89d5
commit 44453877b3

View File

@@ -5,100 +5,57 @@ This agent can:
1. Have normal conversations with users
2. Update trading strategies when user provides specific instructions
Uses CrewAI's tool-calling capabilities for structured updates.
Uses direct LLM API calls for structured JSON output with separated thinking and response.
"""
import json
import re
from typing import List, Optional, Dict, Any
from crewai import Agent, LLM
from crewai.tools import tool
from sqlalchemy.orm import Session
from openai import OpenAI
from ...core.config import get_settings
from ...db.models import Bot
# Tool definitions
@tool
def get_current_strategy(bot_id: str) -> str:
"""Get the current trading strategy configuration for a bot.
Use this tool to check the current strategy before making changes.
Args:
bot_id: The ID of the bot to get strategy for
Returns:
JSON string with current strategy configuration
"""
from ...core.database import get_db
from ...db.models import Bot
db = next(get_db())
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
return '{"error": "Bot not found"}'
return str(bot.strategy_config)
SYSTEM_PROMPT = """You are a helpful AI trading assistant named Randebu. You help users manage their trading bots.
Your response must be valid JSON with exactly this structure:
{
"thinking": "Your internal reasoning and analysis (what you're thinking about)",
"response": "Your actual response to the user (be concise and helpful)",
"strategy_update": null or {
"conditions": [{"type": "price_drop" | "price_rise" | "volume_spike" | "price_level", "token": "TOKEN", "threshold": number, ...}],
"actions": [{"type": "buy" | "sell" | "hold", "amount_percent": number, ...}],
"risk_management": {"stop_loss_percent": number, "take_profit_percent": number}
}
}
@tool
def update_trading_strategy(
bot_id: str,
conditions: List[Dict],
actions: List[Dict],
risk_management: Optional[Dict] = None
) -> str:
"""Update the trading strategy configuration for a bot.
Call this tool when the user provides specific trading parameters like:
- Buy/sell conditions (price drops, price rises, etc.)
- Take profit percentages
- Stop loss percentages
Args:
bot_id: The ID of the bot to update
conditions: List of trigger conditions (e.g., [{"type": "price_drop", "token": "PEPE", "threshold": 5}])
actions: List of actions to take (e.g., [{"type": "buy", "amount_percent": 50}])
risk_management: Optional risk settings (e.g., {"stop_loss_percent": 10, "take_profit_percent": 50})
Returns:
Confirmation message with updated strategy
"""
from ...core.database import get_db
from ...db.models import Bot
db = next(get_db())
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
return '{"error": "Bot not found"}'
new_config = {
"conditions": conditions,
"actions": actions,
}
if risk_management:
new_config["risk_management"] = risk_management
bot.strategy_config = new_config
db.commit()
return f'Successfully updated trading strategy. New config: {new_config}'
Guidelines:
- "thinking" should be detailed reasoning about the user's request
- "response" should be conversational and clear
- "strategy_update" should be populated ONLY when the user provides specific trading parameters (percentages, tokens, conditions, etc.)
- If no strategy parameters are provided, set "strategy_update" to null
- Be friendly, concise, and helpful in your response
Example 1 (no strategy update):
User: "What can this bot do?"
{
"thinking": "The user is asking about the bot's capabilities. I should explain the main features.",
"response": "Randebu is your AI trading assistant! It can monitor cryptocurrency prices and execute trades based on your configured strategies. Tell me your trading parameters and I'll set them up for you.",
"strategy_update": null
}
SYSTEM_PROMPT = """You are a helpful AI trading assistant. You can:
1. Have normal conversations - answer questions about trading, tokens, strategies, etc.
2. Help users configure their trading bots when they provide specific parameters
When a user asks general questions, just answer conversationally.
When a user provides specific trading parameters (like percentages, tokens, conditions),
use the update_trading_strategy tool to save their configuration.
Example conversations:
- User: "What is this?" → Answer conversationally about the trading bot platform
- User: "I want take profit at 200%" → Use update_trading_strategy with that parameter
- User: "Alert me when PEPE drops 5%" → Use update_trading_strategy with that condition
Be friendly, helpful, and clear in your responses."""
Example 2 (with strategy update):
User: "I want to buy PEPE when it drops 10% and take profit at 50%"
{
"thinking": "User wants to buy PEPE when it drops 10%, with 50% take profit. I should parse these into conditions and actions.",
"response": "Got it! I've configured your strategy to buy PEPE when it drops 10%, with a 50% take profit target.",
"strategy_update": {
"conditions": [{"type": "price_drop", "token": "PEPE", "threshold": 10}],
"actions": [{"type": "buy", "amount_percent": 100}],
"risk_management": {"take_profit_percent": 50}
}
}"""
class ConversationalAgent:
@@ -106,57 +63,84 @@ class ConversationalAgent:
self.api_key = api_key
self.model = model
self.bot_id = bot_id
self.llm = LLM(
model=model,
api_key=api_key,
api_base="https://api.minimax.io/v1"
)
# Create agent with tools
self.agent = Agent(
role="Trading Assistant",
goal="Help users with trading strategies and general questions",
backstory=SYSTEM_PROMPT,
tools=[get_current_strategy, update_trading_strategy],
llm=self.llm,
verbose=True,
allow_delegation=False,
# Create OpenAI-compatible client for MiniMax
self.client = OpenAI(
api_key=api_key,
base_url="https://api.minimax.io/v1"
)
def chat(self, user_message: str, conversation_history: List[Dict] = None) -> Dict[str, Any]:
"""Process a user message and return a response.
"""Process a user message and return a structured response.
Args:
user_message: The user's message
conversation_history: Optional list of previous messages
Returns:
Dict with 'response' (the assistant's reply), 'thinking' (reasoning), and 'strategy_updated' (bool)
Dict with 'response', 'thinking', and 'strategy_updated'
"""
# Execute agent using kickoff
try:
result = self.agent.kickoff(user_message)
# Build messages array with system prompt and conversation history
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
# Try to extract thinking from result if available
thinking = None
if hasattr(result, 'thinking') and result.thinking:
thinking = result.thinking
elif isinstance(result, dict) and 'thinking' in result:
thinking = result.get('thinking')
# Add conversation history (last 10 messages)
if conversation_history:
for msg in conversation_history[-10:]:
role = "assistant" if msg.get("role") == "assistant" else "user"
messages.append({"role": role, "content": msg.get("content", "")})
# The actual response
result_str = str(result) if not isinstance(result, str) else result
# Add current user message
messages.append({"role": "user", "content": user_message})
# Check if strategy was updated
strategy_updated = "update_trading_strategy" in result_str or \
"Successfully updated" in result_str
# Make API call
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=2000
)
# Parse JSON response
content = response.choices[0].message.content.strip()
# Extract JSON from response (handle markdown code blocks)
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# Try to find JSON object directly
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
json_str = content
result = json.loads(json_str)
# Extract fields
thinking = result.get("thinking", "")
response_text = result.get("response", "")
strategy_update = result.get("strategy_update")
# Update strategy in database if provided
if strategy_update and self.bot_id:
self._update_strategy(strategy_update)
return {
"response": result_str,
"response": response_text,
"thinking": thinking,
"strategy_updated": strategy_updated,
"strategy_updated": strategy_update is not None,
"success": True
}
except json.JSONDecodeError as e:
return {
"response": "I had trouble formatting my response. Please try again.",
"thinking": f"JSON parsing error: {str(e)}. Raw content: {content if 'content' in dir() else 'N/A'}",
"strategy_updated": False,
"success": False
}
except Exception as e:
return {
"response": f"I encountered an error: {str(e)}. Please try again.",
@@ -164,6 +148,21 @@ class ConversationalAgent:
"strategy_updated": False,
"success": False
}
def _update_strategy(self, strategy_update: Dict) -> bool:
"""Update the bot's strategy in the database."""
try:
from ...core.database import get_db
db = next(get_db())
bot = db.query(Bot).filter(Bot.id == self.bot_id).first()
if bot:
bot.strategy_config = strategy_update
db.commit()
return True
except Exception as e:
print(f"Error updating strategy: {e}")
return False
def get_conversational_agent(api_key: str = None, model: str = None, bot_id: str = None) -> ConversationalAgent: