feat: add simulation management as agent skill

Agent can now manage simulations via conversation:
- 'start' (run simulation on token)
- 'stop' (stop running simulation)
- 'status' (check if running, show progress)
- 'results' (get current/latest simulation results)

Example conversations:
- 'Run a simulation on GRIAN' → starts simulation
- 'How's the simulation going?' → shows status
- 'Stop the simulation' → stops and shows results
- 'Show me the results' → shows simulation results
This commit is contained in:
shokollm
2026-04-12 14:21:32 +00:00
parent 5ae1165ad9
commit 632e1bf524

View File

@@ -127,6 +127,33 @@ TOOLS = [
"required": ["token_address"] "required": ["token_address"]
} }
} }
},
{
"type": "function",
"function": {
"name": "manage_simulation",
"description": "Manage trading simulations: start, stop, or check status. Simulations run on real-time klines and show live portfolio updates. Use when user asks to run simulation, check simulation status, or stop simulation.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["start", "stop", "status", "results"],
"description": "Action to perform: 'start' (begin new simulation), 'stop' (stop running simulation), 'status' (check if simulation is running), 'results' (get results from current or latest simulation)"
},
"token_address": {
"type": "string",
"description": "Token contract address for simulation (required for 'start' action)"
},
"kline_interval": {
"type": "string",
"description": "Kline interval: '1m', '5m', '15m', '1h' (default: '1m')",
"default": "1m"
}
},
"required": ["action"]
}
}
} }
] ]
@@ -135,6 +162,7 @@ SYSTEM_PROMPT_WITH_TOOLS = SYSTEM_PROMPT + """
You have access to tools: You have access to tools:
- search_tokens(chain, limit): Search for trending tokens on a blockchain. Use it when user asks for token recommendations or trending tokens. - search_tokens(chain, limit): Search for trending tokens on a blockchain. Use it when user asks for token recommendations or trending tokens.
- run_backtest(token_address, timeframe, start_date, end_date): Run a backtest on historical data. Returns performance metrics. Use when user asks to backtest or check historical performance. - run_backtest(token_address, timeframe, start_date, end_date): Run a backtest on historical data. Returns performance metrics. Use when user asks to backtest or check historical performance.
- manage_simulation(action, token_address, kline_interval): Manage trading simulations. Actions: 'start' (begin new), 'stop' (stop running), 'status' (check if running), 'results' (get current/latest results).
When you want to use a tool, respond with: When you want to use a tool, respond with:
{ {
@@ -274,6 +302,26 @@ class ConversationalAgent:
"strategy_needs_confirmation": False, "strategy_needs_confirmation": False,
"success": True "success": True
} }
elif func_name == "manage_simulation":
action = args.get("action")
token_address = args.get("token_address")
kline_interval = args.get("kline_interval", "1m")
# Execute simulation management
sim_result = self._manage_simulation(
action=action,
token_address=token_address,
kline_interval=kline_interval
)
return {
"response": sim_result,
"thinking": thinking,
"strategy_updated": False,
"strategy_needs_confirmation": False,
"success": True
}
# Get the main response content # Get the main response content
content = result.get("choices", [{}])[0].get("message", {}).get("content", "") content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
@@ -501,6 +549,230 @@ Would you like me to adjust the strategy parameters based on these results?"""
except Exception as e: except Exception as e:
return f"I encountered an error running the backtest: {str(e)}" return f"I encountered an error running the backtest: {str(e)}"
def _manage_simulation(
self,
action: str,
token_address: str = None,
kline_interval: str = "1m"
) -> str:
"""Manage trading simulations: start, stop, status, or results."""
try:
from ...core.database import SessionLocal
from ...db.models import Simulation
from ...services.simulate.engine import SimulateEngine
from ...core.config import get_settings
from datetime import datetime
import asyncio
import uuid
import threading
db = SessionLocal()
settings = get_settings()
try:
# Get the bot
bot = db.query(Bot).filter(Bot.id == self.bot_id).first()
if not bot:
return "I couldn't find the bot. Please try again."
if action == "start":
if not token_address:
return "I need a token address to start a simulation. Which token would you like to simulate?"
# Check if there's already a running simulation
running_sim = db.query(Simulation).filter(
Simulation.bot_id == self.bot_id,
Simulation.status == "running"
).first()
if running_sim:
# Stop the existing one first
self._stop_simulation_db(running_sim.id)
# Create new simulation
sim_id = str(uuid.uuid4())
simulation = Simulation(
id=sim_id,
bot_id=self.bot_id,
started_at=datetime.utcnow(),
status="running",
config={
"token": token_address,
"chain": "bsc",
"kline_interval": kline_interval
},
signals=[],
klines=[],
trade_log=[]
)
db.add(simulation)
db.commit()
# Start the simulation in background
sim_config = {
"bot_id": self.bot_id,
"token": token_address,
"chain": "bsc",
"kline_interval": kline_interval,
"max_candles": 100,
"candle_delay": 30 if kline_interval == "1m" else 60,
"strategy_config": bot.strategy_config,
"ave_api_key": settings.AVE_API_KEY,
"ave_api_plan": settings.AVE_API_PLAN,
"initial_balance": 10000.0,
}
# Run simulation in background thread
def run_sim():
asyncio.run(self._run_simulation_sync(sim_id, settings.DATABASE_URL, sim_config))
thread = threading.Thread(target=run_sim)
thread.daemon = True
thread.start()
return f"Started simulation on {token_address} using {kline_interval} klines. The simulation is running and will process up to 100 candles. Ask me for status or results anytime!"
elif action == "stop":
# Find running simulation
running_sim = db.query(Simulation).filter(
Simulation.bot_id == self.bot_id,
Simulation.status == "running"
).first()
if not running_sim:
return "No simulation is currently running."
self._stop_simulation_db(running_sim.id)
# Get final results
portfolio = running_sim.portfolio or {}
current_balance = portfolio.get("current_balance", 10000)
initial_balance = portfolio.get("initial_balance", 10000)
pnl = current_balance - initial_balance
pnl_pct = (pnl / initial_balance) * 100 if initial_balance > 0 else 0
return f"Simulation stopped!\n\nFinal Results:\n💰 Final Balance: ${current_balance:,.2f}\n📈 P&L: {'+' if pnl >= 0 else ''}${pnl:,.2f} ({'+' if pnl_pct >= 0 else ''}{pnl_pct:.2f}%)\n📊 Trades: {len(running_sim.trade_log or [])}"
elif action == "status":
# Find running simulation
running_sim = db.query(Simulation).filter(
Simulation.bot_id == self.bot_id,
Simulation.status == "running"
).first()
if not running_sim:
return "No simulation is currently running."
portfolio = running_sim.portfolio or {}
klines_count = len(running_sim.klines or [])
trade_count = len(running_sim.trade_log or [])
status = f"**Simulation Status: Running**\n\n"
status += f"📊 Candles processed: ~{klines_count}\n"
status += f"📈 Trades executed: {trade_count}\n"
if portfolio.get("position", 0) > 0:
status += f"💰 Position: {portfolio['position']:.4f} {portfolio.get('position_token', 'TOKEN')}\n"
status += f"💰 Cash: ${portfolio.get('current_balance', 0):,.2f}\n"
else:
status += f"💰 Cash: ${portfolio.get('current_balance', 10000):,.2f}\n"
status += "\nAsk me to stop or get full results anytime!"
return status
elif action == "results":
# Find running or most recent simulation
simulation = db.query(Simulation).filter(
Simulation.bot_id == self.bot_id
).order_by(Simulation.started_at.desc()).first()
if not simulation:
return "No simulation found. Start a simulation first!"
portfolio = simulation.portfolio or {}
current_balance = portfolio.get("current_balance", 10000)
initial_balance = portfolio.get("initial_balance", 10000)
pnl = current_balance - initial_balance
pnl_pct = (pnl / initial_balance) * 100 if initial_balance > 0 else 0
trade_log = simulation.trade_log or []
status_emoji = "🟢" if simulation.status == "running" else ""
status_text = "Running" if simulation.status == "running" else "Completed/Stopped"
results = f"**Simulation Results** {status_emoji} ({status_text})\n\n"
results += f"💰 Final Balance: ${current_balance:,.2f}\n"
results += f"📈 P&L: {'+' if pnl >= 0 else ''}${pnl:,.2f} ({'+' if pnl_pct >= 0 else ''}{pnl_pct:.2f}%)\n"
results += f"📊 Total Trades: {len(trade_log)}\n"
if simulation.status == "running":
results += f"\n⏳ Simulation still running... (refresh for latest)"
return results
else:
return f"Unknown action: {action}. Use 'start', 'stop', 'status', or 'results'."
finally:
db.close()
except Exception as e:
return f"I encountered an error managing the simulation: {str(e)}"
def _stop_simulation_db(self, simulation_id: str):
"""Stop a simulation in the database."""
from ...core.database import SessionLocal
db = SessionLocal()
try:
simulation = db.query(Simulation).filter(Simulation.id == simulation_id).first()
if simulation:
simulation.status = "stopped"
db.commit()
finally:
db.close()
async def _run_simulation_sync(self, simulation_id: str, db_url: str, config: dict):
"""Run simulation synchronously in background."""
from ...services.simulate.engine import SimulateEngine
from ...core.database import SessionLocal
async def _run():
engine = SimulateEngine(config)
engine.run_id = simulation_id
def serialize_signal(s):
created = s.get("created_at")
if hasattr(created, "isoformat"):
created = created.isoformat()
return {**s, "created_at": created}
def save_progress():
db = SessionLocal()
try:
sim = db.query(Simulation).filter(Simulation.id == simulation_id).first()
if sim:
sim.status = engine.status
sim.signals = [serialize_signal(s) for s in engine.signals]
sim.klines = [{"time": k.get("time"), "close": k.get("close")} for k in engine.klines]
sim.trade_log = engine.trade_log
sim.portfolio = {
"initial_balance": config.get("initial_balance", 10000),
"current_balance": engine.current_balance,
"position": engine.position,
"position_token": engine.position_token,
"entry_price": engine.entry_price,
"current_price": engine.last_close,
}
db.commit()
finally:
db.close()
try:
await engine.run()
finally:
save_progress()
asyncio.run(_run())
def _update_strategy(self, strategy_update: Dict) -> bool: def _update_strategy(self, strategy_update: Dict) -> bool:
"""Update the bot's strategy in the database.""" """Update the bot's strategy in the database."""
try: try: