diff --git a/src/backend/app/api/simulate.py b/src/backend/app/api/simulate.py index 734b8fa..f4300f9 100644 --- a/src/backend/app/api/simulate.py +++ b/src/backend/app/api/simulate.py @@ -1,38 +1,233 @@ -from fastapi import APIRouter, Depends, HTTPException, status +import uuid +import asyncio +from datetime import datetime +from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session -from typing import List +from typing import List, Dict, Any, Optional +from concurrent.futures import ThreadPoolExecutor +from .auth import get_current_user from ..core.database import get_db +from ..core.config import get_settings from ..db.schemas import SimulationCreate, SimulationResponse +from ..db.models import Bot, Simulation, Signal, User router = APIRouter() +running_simulations: Dict[str, Any] = {} +executor = ThreadPoolExecutor(max_workers=4) -@router.post("/bots/{bot_id}/simulate", response_model=SimulationResponse) -def start_simulation( - bot_id: str, config: SimulationCreate, db: Session = Depends(get_db) + +def run_simulation_sync( + simulation_id: str, db_url: str, bot_id: str, config: Dict[str, Any] ): - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented" + import asyncio + from ..services.simulate.engine import SimulateEngine + from ..core.database import SessionLocal + + async def _run(): + engine = SimulateEngine(config) + engine.run_id = simulation_id + running_simulations[simulation_id] = engine + try: + results = await engine.run() + db = SessionLocal() + try: + simulation = ( + db.query(Simulation).filter(Simulation.id == simulation_id).first() + ) + if simulation: + simulation.status = engine.status + simulation.signals = engine.signals + db.commit() + + for signal in engine.signals: + db_signal = Signal( + id=signal["id"], + bot_id=signal["bot_id"], + run_id=signal["run_id"], + signal_type=signal["signal_type"], + token=signal["token"], + price=signal["price"], + confidence=signal.get("confidence"), + reasoning=signal.get("reasoning"), + executed=signal.get("executed", False), + created_at=signal["created_at"], + ) + db.add(db_signal) + db.commit() + finally: + db.close() + finally: + if simulation_id in running_simulations: + del running_simulations[simulation_id] + + asyncio.run(_run()) + + +@router.post( + "/bots/{bot_id}/simulate", + response_model=SimulationResponse, + status_code=status.HTTP_201_CREATED, +) +async def start_simulation( + bot_id: str, + config: SimulationCreate, + background_tasks: BackgroundTasks, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + bot = db.query(Bot).filter(Bot.id == bot_id).first() + if not bot: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found" + ) + if bot.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" + ) + + settings = get_settings() + simulation_id = str(uuid.uuid4()) + + check_interval = config.check_interval + if settings.AVE_API_PLAN != "pro" and check_interval < 60: + check_interval = 60 + + simulation_config = { + "bot_id": bot_id, + "token": config.token, + "chain": config.chain, + "duration_seconds": config.duration_seconds, + "check_interval": check_interval, + "auto_execute": config.auto_execute, + "strategy_config": bot.strategy_config, + "ave_api_key": settings.AVE_API_KEY, + "ave_api_plan": settings.AVE_API_PLAN, + } + + simulation = Simulation( + id=simulation_id, + bot_id=bot_id, + started_at=datetime.utcnow(), + status="running", + config={ + "token": config.token, + "chain": config.chain, + "duration_seconds": config.duration_seconds, + "check_interval": check_interval, + "auto_execute": config.auto_execute, + }, + signals=[], ) + db.add(simulation) + db.commit() + db.refresh(simulation) + + db_url = str(settings.DATABASE_URL) + background_tasks.add_task( + run_simulation_sync, simulation_id, db_url, bot_id, simulation_config + ) + + return simulation @router.get("/bots/{bot_id}/simulate/{run_id}", response_model=SimulationResponse) -def get_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)): - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented" +def get_simulation( + bot_id: str, + run_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + bot = db.query(Bot).filter(Bot.id == bot_id).first() + if not bot: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found" + ) + if bot.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" + ) + + simulation = ( + db.query(Simulation) + .filter(Simulation.id == run_id, Simulation.bot_id == bot_id) + .first() ) + if not simulation: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found" + ) + + if run_id in running_simulations: + engine = running_simulations[run_id] + simulation.signals = engine.get_signals() + + return simulation @router.get("/bots/{bot_id}/simulations", response_model=List[SimulationResponse]) -def list_simulations(bot_id: str, db: Session = Depends(get_db)): - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented" +def list_simulations( + bot_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + bot = db.query(Bot).filter(Bot.id == bot_id).first() + if not bot: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found" + ) + if bot.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" + ) + + simulations = ( + db.query(Simulation) + .filter(Simulation.bot_id == bot_id) + .order_by(Simulation.started_at.desc()) + .all() ) + for sim in simulations: + if sim.id in running_simulations: + engine = running_simulations[sim.id] + sim.signals = engine.get_signals() + + return simulations + @router.post("/bots/{bot_id}/simulate/{run_id}/stop") -def stop_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)): - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented" +def stop_simulation( + bot_id: str, + run_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + bot = db.query(Bot).filter(Bot.id == bot_id).first() + if not bot: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found" + ) + if bot.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" + ) + + simulation = ( + db.query(Simulation) + .filter(Simulation.id == run_id, Simulation.bot_id == bot_id) + .first() ) + if not simulation: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found" + ) + + if run_id in running_simulations: + engine = running_simulations[run_id] + asyncio.create_task(engine.stop()) + simulation.status = "stopped" + db.commit() + + return {"status": "stopping", "run_id": run_id} diff --git a/src/backend/app/db/schemas.py b/src/backend/app/db/schemas.py index 664d712..84fd298 100644 --- a/src/backend/app/db/schemas.py +++ b/src/backend/app/db/schemas.py @@ -87,6 +87,7 @@ class SimulationCreate(BaseModel): token: str chain: str duration_seconds: int = 3600 + check_interval: int = 60 auto_execute: bool = False diff --git a/src/backend/app/services/simulate/engine.py b/src/backend/app/services/simulate/engine.py index d8d7c2e..9cd32e7 100644 --- a/src/backend/app/services/simulate/engine.py +++ b/src/backend/app/services/simulate/engine.py @@ -1,15 +1,177 @@ -from typing import Optional, Dict, Any, List +import uuid +import asyncio +from datetime import datetime +from typing import Dict, Any, List, Optional +from ..backtest.ave_client import AveCloudClient class SimulateEngine: def __init__(self, config: Dict[str, Any]): self.config = config + self.run_id = str(uuid.uuid4()) + self.status = "pending" + self.results: Optional[Dict[str, Any]] = None + self.signals: List[Dict[str, Any]] = [] + self.ave_client = AveCloudClient( + api_key=config.get("ave_api_key", ""), + plan=config.get("ave_api_plan", "free"), + ) + self.bot_id = config.get("bot_id") + self.strategy_config = config.get("strategy_config", {}) + self.conditions = self.strategy_config.get("conditions", []) + self.actions = self.strategy_config.get("actions", []) + self.check_interval = config.get("check_interval", 60) + self.duration_seconds = config.get("duration_seconds", 3600) + self.auto_execute = config.get("auto_execute", False) + self.token = config.get("token", "") + self.chain = config.get("chain", "bsc") + self.running = False + self.started_at: Optional[datetime] = None + self.last_price: Optional[float] = None + self.last_volume: Optional[float] = None - async def run(self) -> List[Dict[str, Any]]: - raise NotImplementedError("Simulation engine not yet implemented") + async def run(self) -> Dict[str, Any]: + self.running = True + self.status = "running" + self.started_at = datetime.utcnow() + + token_id = ( + f"{self.token}-{self.chain}" + if self.token and not self.token.endswith(f"-{self.chain}") + else self.token + ) + + if not token_id or token_id == f"-{self.chain}": + self.status = "failed" + self.results = {"error": "Token ID is required"} + return self.results + + end_time = datetime.utcnow().timestamp() + self.duration_seconds + + try: + while self.running and datetime.utcnow().timestamp() < end_time: + try: + price_data = await self.ave_client.get_token_price(token_id) + if price_data: + current_price = float(price_data.get("price", 0)) + current_volume = float(price_data.get("volume", 0)) + + if current_price > 0: + await self._check_conditions( + current_price, current_volume, price_data + ) + + self.last_price = current_price + self.last_volume = current_volume + + except Exception as e: + pass + + for _ in range(self.check_interval): + if not self.running: + break + await asyncio.sleep(1) + + if self.running: + self.status = "completed" + else: + self.status = "stopped" + + except Exception as e: + self.status = "failed" + self.results = {"error": str(e)} + + self.results = self.results or {} + self.results["total_signals"] = len(self.signals) + self.results["signals"] = self.signals + self.results["started_at"] = self.started_at + self.results["ended_at"] = datetime.utcnow() + + return self.results + + async def _check_conditions( + self, current_price: float, current_volume: float, price_data: Dict[str, Any] + ): + timestamp = int(datetime.utcnow().timestamp() * 1000) + + for condition in self.conditions: + if self._check_condition(condition, current_price, current_volume): + await self._execute_actions(current_price, timestamp, condition) + break + + def _check_condition( + self, + condition: Dict[str, Any], + current_price: float, + current_volume: float, + ) -> bool: + cond_type = condition.get("type", "") + threshold = condition.get("threshold", 0) + price_level = condition.get("price") + direction = condition.get("direction", "above") + + if cond_type == "price_drop": + if self.last_price is None or self.last_price <= 0: + return False + drop_pct = ((self.last_price - current_price) / self.last_price) * 100 + return drop_pct >= threshold + + elif cond_type == "price_rise": + if self.last_price is None or self.last_price <= 0: + return False + rise_pct = ((current_price - self.last_price) / self.last_price) * 100 + return rise_pct >= threshold + + elif cond_type == "volume_spike": + if self.last_volume is None or self.last_volume <= 0: + return False + volume_increase = ( + (current_volume - self.last_volume) / self.last_volume + ) * 100 + return volume_increase >= threshold + + elif cond_type == "price_level": + if price_level is None: + return False + if direction == "above": + return current_price > price_level + else: + return current_price < price_level + + return False + + async def _execute_actions( + self, price: float, timestamp: int, matched_condition: Dict[str, Any] + ): + token = matched_condition.get("token", self.token) + reasoning = f"Condition {matched_condition.get('type')} triggered" + + signal = { + "id": str(uuid.uuid4()), + "bot_id": self.bot_id, + "run_id": self.run_id, + "signal_type": "signal", + "token": token, + "price": price, + "confidence": 0.8, + "reasoning": reasoning, + "executed": self.auto_execute, + "created_at": datetime.utcnow(), + } + + self.signals.append(signal) async def stop(self): - raise NotImplementedError("Simulation stop not yet implemented") + self.running = False + self.status = "stopped" + + def get_results(self) -> Dict[str, Any]: + return { + "id": self.run_id, + "status": self.status, + "results": self.results, + "signals": self.signals, + } def get_signals(self) -> List[Dict[str, Any]]: - raise NotImplementedError("Simulation signals not yet implemented") + return self.signals