Compare commits

...

1 Commits

Author SHA1 Message Date
shokollm
a280217254 feat: implement chat interface with CrewAI integration
- Create MiniMax LLM connector for CrewAI integration
- Implement TradingCrew with trading_designer, strategy_validator, strategy_explainer
- Add strategy parsing from natural language to strategy_config JSON
- Update chat endpoint with CrewAI integration and conversation context
- Add strategy validation logic
- Add explanation generation for user-friendly responses
- Add BotChatRequest/BotChatResponse schemas

Fixes #6
2026-04-08 06:29:05 +00:00
4 changed files with 428 additions and 26 deletions

View File

@@ -4,14 +4,18 @@ from typing import List, Annotated
from .auth import get_current_user
from ..core.database import get_db
from ..core.config import get_settings
from ..db.schemas import (
BotCreate,
BotUpdate,
BotResponse,
BotConversationCreate,
BotConversationResponse,
BotChatRequest,
BotChatResponse,
)
from ..db.models import Bot, BotConversation, User
from ..services.ai_agent.crew import get_trading_crew
router = APIRouter()
MAX_BOTS_PER_USER = 3
@@ -154,10 +158,10 @@ def delete_bot(
db.commit()
@router.post("/{bot_id}/chat", response_model=BotConversationResponse)
@router.post("/{bot_id}/chat", response_model=BotChatResponse)
def chat(
bot_id: str,
message: BotConversationCreate,
request: BotChatRequest,
current_user: Annotated[User, Depends(get_current_user)],
db: Session = Depends(get_db),
):
@@ -173,15 +177,75 @@ def chat(
detail="Not authorized to chat with this bot",
)
db_conversation = BotConversation(
bot_id=bot_id,
role=message.role,
content=message.content,
conversation_history = (
db.query(BotConversation)
.filter(BotConversation.bot_id == bot_id)
.order_by(BotConversation.created_at)
.all()
)
db.add(db_conversation)
db.commit()
db.refresh(db_conversation)
return db_conversation
history_for_crew = [
{"role": conv.role, "content": conv.content}
for conv in conversation_history[-10:]
]
user_message = request.message
if request.strategy_config:
crew = get_trading_crew()
result = crew.chat(user_message, history_for_crew)
assistant_content = result.get("response", "I couldn't process your request.")
if result.get("success") and result.get("strategy_config"):
bot.strategy_config = result["strategy_config"]
db.commit()
db_conversation = BotConversation(
bot_id=bot_id,
role="user",
content=user_message,
)
db.add(db_conversation)
db_assistant = BotConversation(
bot_id=bot_id,
role="assistant",
content=assistant_content,
)
db.add(db_assistant)
db.commit()
db.refresh(db_assistant)
return BotChatResponse(
response=assistant_content,
strategy_config=result.get("strategy_config"),
success=result.get("success", False),
)
else:
crew = get_trading_crew()
result = crew.chat(user_message, history_for_crew)
assistant_content = result.get("response", "I couldn't process your request.")
db_conversation = BotConversation(
bot_id=bot_id,
role="user",
content=user_message,
)
db.add(db_conversation)
db_assistant = BotConversation(
bot_id=bot_id,
role="assistant",
content=assistant_content,
)
db.add(db_assistant)
db.commit()
db.refresh(db_assistant)
return BotChatResponse(
response=assistant_content,
strategy_config=result.get("strategy_config"),
success=result.get("success", False),
)
@router.get("/{bot_id}/history", response_model=List[BotConversationResponse])

View File

@@ -118,6 +118,17 @@ class BotConversationResponse(BaseModel):
from_attributes = True
class BotChatRequest(BaseModel):
message: str
strategy_config: Optional[bool] = False
class BotChatResponse(BaseModel):
response: str
strategy_config: Optional[dict] = None
success: bool = False
class SignalResponse(BaseModel):
id: str
bot_id: str

View File

@@ -1,15 +1,247 @@
from typing import List, Optional
from typing import List, Optional, Dict, Any
from crewai import Agent, Task, Crew
from .llm_connector import MiniMaxConnector, MiniMaxLLM
from ..core.config import get_settings
class CrewAgent:
def __init__(self, role: str, goal: str, backstory: str):
self.role = role
self.goal = goal
self.backstory = backstory
class StrategyValidator:
SUPPORTED_CONDITIONS = ["price_drop", "price_rise", "volume_spike", "price_level"]
SUPPORTED_ACTIONS = ["buy", "sell", "notify"]
def execute_task(self, task: str) -> str:
raise NotImplementedError("CrewAI agent not yet implemented")
def validate(self, strategy_config: dict) -> tuple[bool, list[str]]:
errors = []
if "conditions" not in strategy_config:
errors.append("Missing 'conditions' in strategy config")
return False, errors
if not isinstance(strategy_config["conditions"], list):
errors.append("'conditions' must be a list")
return False, errors
if len(strategy_config["conditions"]) == 0:
errors.append("At least one condition is required")
return False, errors
for i, condition in enumerate(strategy_config["conditions"]):
if "type" not in condition:
errors.append(f"Condition {i}: missing 'type'")
continue
cond_type = condition.get("type")
if cond_type not in self.SUPPORTED_CONDITIONS:
errors.append(f"Condition {i}: unsupported type '{cond_type}'")
continue
params = condition.get("params", {})
if cond_type in ["price_drop", "price_rise", "volume_spike"]:
if "token" not in params:
errors.append(f"Condition {i}: missing 'token'")
if "threshold_percent" not in params:
errors.append(f"Condition {i}: missing 'threshold_percent'")
elif not isinstance(params["threshold_percent"], (int, float)):
errors.append(
f"Condition {i}: 'threshold_percent' must be a number"
)
elif params["threshold_percent"] <= 0:
errors.append(
f"Condition {i}: 'threshold_percent' must be positive"
)
elif cond_type == "price_level":
if "token" not in params:
errors.append(f"Condition {i}: missing 'token'")
if "price" not in params:
errors.append(f"Condition {i}: missing 'price'")
if "direction" not in params:
errors.append(f"Condition {i}: missing 'direction'")
elif params["direction"] not in ["above", "below"]:
errors.append(
f"Condition {i}: direction must be 'above' or 'below'"
)
if "actions" in strategy_config:
if not isinstance(strategy_config["actions"], list):
errors.append("'actions' must be a list")
else:
for i, action in enumerate(strategy_config["actions"]):
if "type" not in action:
errors.append(f"Action {i}: missing 'type'")
elif action["type"] not in self.SUPPORTED_ACTIONS:
errors.append(
f"Action {i}: unsupported type '{action['type']}'"
)
return len(errors) == 0, errors
def get_trading_crew():
raise NotImplementedError("Trading crew not yet implemented")
class StrategyExplainer:
def explain(self, strategy_config: dict) -> str:
explanations = []
if "conditions" in strategy_config:
cond_list = strategy_config["conditions"]
if cond_list:
explanations.append("This strategy will trigger when:")
for cond in cond_list:
cond_type = cond.get("type")
params = cond.get("params", {})
token = params.get("token", "the token")
if cond_type == "price_drop":
pct = params.get("threshold_percent", 0)
explanations.append(f" - {token} price drops by {pct}%")
elif cond_type == "price_rise":
pct = params.get("threshold_percent", 0)
explanations.append(f" - {token} price rises by {pct}%")
elif cond_type == "volume_spike":
pct = params.get("threshold_percent", 0)
explanations.append(
f" - {token} trading volume increases by {pct}%"
)
elif cond_type == "price_level":
price = params.get("price", 0)
direction = params.get("direction", "unknown")
explanations.append(
f" - {token} price crosses {direction} ${price}"
)
if "actions" in strategy_config:
actions = strategy_config.get("actions", [])
if actions:
explanations.append("\nWhen triggered, the strategy will:")
for action in actions:
action_type = action.get("type")
if action_type == "buy":
explanations.append(" - Buy the token")
elif action_type == "sell":
explanations.append(" - Sell the token")
elif action_type == "notify":
explanations.append(" - Send a notification")
if not explanations:
explanations.append("Strategy configuration is empty or invalid.")
return "\n".join(explanations)
def create_trading_designer_agent(
api_key: str, model: str = "MiniMax-Text-01"
) -> Agent:
connector = MiniMaxConnector(api_key=api_key, model=model)
system_prompt = """You are a Trading Strategy Designer AI. Your role is to parse user requests
for trading strategies into structured JSON configuration.
Supported conditions (MVP):
- price_drop: Triggers when a token's price drops by a specified percentage
- price_rise: Triggers when a token's price rises by a specified percentage
- volume_spike: Triggers when trading volume increases by a specified percentage
- price_level: Triggers when price crosses above or below a specified level
Always ask clarifying questions if the user's request is ambiguous.
Output strategy_config in valid JSON format only when you have all required information.
"""
return Agent(
role="Trading Strategy Designer",
goal="Convert natural language trading requests into precise strategy configurations",
backstory=system_prompt,
llm=MiniMaxLLM(api_key=api_key, model=model),
verbose=True,
)
def create_strategy_validator_agent(
api_key: str, model: str = "MiniMax-Text-01"
) -> Agent:
return Agent(
role="Strategy Validator",
goal="Validate trading strategy configurations for feasibility and identify potential issues",
backstory="""You are a meticulous strategy validator with expertise in trading systems.
You check that all required parameters are present, values are reasonable, and the
strategy makes logical sense. You never approve strategies with missing or invalid data.""",
llm=MiniMaxLLM(api_key=api_key, model=model),
verbose=True,
)
def create_strategy_explainer_agent(
api_key: str, model: str = "MiniMax-Text-01"
) -> Agent:
return Agent(
role="Strategy Explainer",
goal="Generate clear, user-friendly explanations of trading strategies",
backstory="""You are a patient trading strategy explainer. You translate complex
strategy configurations into easy-to-understand language. You help users understand
exactly what their strategies will do when triggered.""",
llm=MiniMaxLLM(api_key=api_key, model=model),
verbose=True,
)
class TradingCrew:
def __init__(self, api_key: str, model: str = "MiniMax-Text-01"):
self.api_key = api_key
self.model = model
self.validator = StrategyValidator()
self.explainer = StrategyExplainer()
self.connector = MiniMaxConnector(api_key=api_key, model=model)
def parse_strategy(
self, user_message: str, conversation_history: list[dict] = None
) -> dict:
strategy_config = self.connector.parse_strategy(
user_message, conversation_history
)
if "error" in strategy_config:
return strategy_config
is_valid, errors = self.validator.validate(strategy_config)
if not is_valid:
return {
"error": "Strategy validation failed",
"validation_errors": errors,
"partial_config": strategy_config,
}
return strategy_config
def explain_strategy(self, strategy_config: dict) -> str:
return self.explainer.explain(strategy_config)
def chat(self, user_message: str, conversation_history: list[dict] = None) -> dict:
strategy_config = self.parse_strategy(user_message, conversation_history)
if "error" in strategy_config:
explanation = f"I had trouble understanding your strategy: {strategy_config.get('error', 'Unknown error')}"
if "validation_errors" in strategy_config:
explanation += "\n\nValidation issues:"
for err in strategy_config["validation_errors"]:
explanation += f"\n - {err}"
return {
"response": explanation,
"strategy_config": strategy_config.get("partial_config"),
"success": False,
}
explanation = self.explain_strategy(strategy_config)
return {
"response": f"I've configured your strategy:\n\n{explanation}",
"strategy_config": strategy_config,
"success": True,
}
def get_trading_crew(
api_key: Optional[str] = None, model: Optional[str] = None
) -> TradingCrew:
if api_key is None:
settings = get_settings()
api_key = settings.MINIMAX_API_KEY
if model is None:
settings = get_settings()
model = settings.MINIMAX_MODEL
return TradingCrew(api_key=api_key, model=model)

View File

@@ -1,13 +1,108 @@
from typing import Optional
from typing import Optional, List, Dict, Any
import httpx
from crewai import LLM
class LLMConnector:
class MiniMaxLLM(LLM):
def __init__(self, api_key: str, model: str = "MiniMax-Text-01", **kwargs):
super().__init__(**kwargs)
self.api_key = api_key
self.model = model
self.base_url = "https://api.minimax.chat/v1"
def _call(self, messages: List[Dict[str, str]], **kwargs) -> str:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": kwargs.get("temperature", 0.7),
"max_tokens": kwargs.get("max_tokens", 2048),
}
with httpx.Client(timeout=60.0) as client:
response = client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
def call(self, messages: List[Dict[str, str]], **kwargs) -> str:
return self._call(messages, **kwargs)
class MiniMaxConnector:
def __init__(self, api_key: str, model: str = "MiniMax-Text-01"):
self.api_key = api_key
self.model = model
def chat(self, messages: list[dict], **kwargs):
raise NotImplementedError("LLM integration not yet implemented")
def chat(self, messages: list[dict], **kwargs) -> str:
formatted_messages = []
for msg in messages:
if isinstance(msg, dict):
formatted_messages.append(
{
"role": msg.get("role", "user"),
"content": msg.get("content", str(msg)),
}
)
else:
formatted_messages.append({"role": "user", "content": str(msg)})
def parse_strategy(self, user_message: str) -> dict:
raise NotImplementedError("Strategy parsing not yet implemented")
llm = MiniMaxLLM(api_key=self.api_key, model=self.model)
return llm.call(formatted_messages, **kwargs)
def parse_strategy(
self, user_message: str, conversation_history: list[dict] = None
) -> dict:
system_prompt = """You are a trading strategy designer. Parse the user's natural language request into a JSON strategy_config object.
Supported conditions (MVP):
- price_drop: Token price drops by X% (requires: token, threshold_percent)
- price_rise: Token price rises by X% (requires: token, threshold_percent)
- volume_spike: Trading volume increases X% (requires: token, threshold_percent)
- price_level: Price crosses above/below X (requires: token, price, direction)
Output ONLY valid JSON with this schema:
{
"conditions": [
{
"type": "price_drop|price_rise|volume_spike|price_level",
"params": {
"token": "TOKEN_SYMBOL",
"threshold_percent": number, // for price_drop, price_rise, volume_spike
"price": number, // for price_level
"direction": "above|below" // for price_level
}
}
],
"actions": [
{
"type": "buy|sell|notify",
"params": {}
}
]
}
If the user wants a condition not in the supported list, ask for clarification.
"""
messages = [{"role": "system", "content": system_prompt}]
if conversation_history:
for msg in conversation_history:
messages.append(
{"role": msg.get("role", "user"), "content": msg.get("content", "")}
)
messages.append({"role": "user", "content": user_message})
response = self.chat(messages)
try:
import json
result = json.loads(response)
return result
except json.JSONDecodeError:
return {"error": "Failed to parse strategy", "raw_response": response}