Merge pull request '[Backend] Simulate Engine - Real-time Signal Detection' (#19) from fix/issue-8 into main

This commit was merged in pull request #19.
This commit is contained in:
2026-04-08 14:57:28 +02:00
3 changed files with 379 additions and 21 deletions

View File

@@ -1,38 +1,233 @@
from fastapi import APIRouter, Depends, HTTPException, status
import uuid
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from .auth import get_current_user
from ..core.database import get_db
from ..core.config import get_settings
from ..db.schemas import SimulationCreate, SimulationResponse
from ..db.models import Bot, Simulation, Signal, User
router = APIRouter()
running_simulations: Dict[str, Any] = {}
executor = ThreadPoolExecutor(max_workers=4)
@router.post("/bots/{bot_id}/simulate", response_model=SimulationResponse)
def start_simulation(
bot_id: str, config: SimulationCreate, db: Session = Depends(get_db)
def run_simulation_sync(
simulation_id: str, db_url: str, bot_id: str, config: Dict[str, Any]
):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
import asyncio
from ..services.simulate.engine import SimulateEngine
from ..core.database import SessionLocal
async def _run():
engine = SimulateEngine(config)
engine.run_id = simulation_id
running_simulations[simulation_id] = engine
try:
results = await engine.run()
db = SessionLocal()
try:
simulation = (
db.query(Simulation).filter(Simulation.id == simulation_id).first()
)
if simulation:
simulation.status = engine.status
simulation.signals = engine.signals
db.commit()
for signal in engine.signals:
db_signal = Signal(
id=signal["id"],
bot_id=signal["bot_id"],
run_id=signal["run_id"],
signal_type=signal["signal_type"],
token=signal["token"],
price=signal["price"],
confidence=signal.get("confidence"),
reasoning=signal.get("reasoning"),
executed=signal.get("executed", False),
created_at=signal["created_at"],
)
db.add(db_signal)
db.commit()
finally:
db.close()
finally:
if simulation_id in running_simulations:
del running_simulations[simulation_id]
asyncio.run(_run())
@router.post(
"/bots/{bot_id}/simulate",
response_model=SimulationResponse,
status_code=status.HTTP_201_CREATED,
)
async def start_simulation(
bot_id: str,
config: SimulationCreate,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
settings = get_settings()
simulation_id = str(uuid.uuid4())
check_interval = config.check_interval
if settings.AVE_API_PLAN != "pro" and check_interval < 60:
check_interval = 60
simulation_config = {
"bot_id": bot_id,
"token": config.token,
"chain": config.chain,
"duration_seconds": config.duration_seconds,
"check_interval": check_interval,
"auto_execute": config.auto_execute,
"strategy_config": bot.strategy_config,
"ave_api_key": settings.AVE_API_KEY,
"ave_api_plan": settings.AVE_API_PLAN,
}
simulation = Simulation(
id=simulation_id,
bot_id=bot_id,
started_at=datetime.utcnow(),
status="running",
config={
"token": config.token,
"chain": config.chain,
"duration_seconds": config.duration_seconds,
"check_interval": check_interval,
"auto_execute": config.auto_execute,
},
signals=[],
)
db.add(simulation)
db.commit()
db.refresh(simulation)
db_url = str(settings.DATABASE_URL)
background_tasks.add_task(
run_simulation_sync, simulation_id, db_url, bot_id, simulation_config
)
return simulation
@router.get("/bots/{bot_id}/simulate/{run_id}", response_model=SimulationResponse)
def get_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def get_simulation(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulation = (
db.query(Simulation)
.filter(Simulation.id == run_id, Simulation.bot_id == bot_id)
.first()
)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found"
)
if run_id in running_simulations:
engine = running_simulations[run_id]
simulation.signals = engine.get_signals()
return simulation
@router.get("/bots/{bot_id}/simulations", response_model=List[SimulationResponse])
def list_simulations(bot_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def list_simulations(
bot_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulations = (
db.query(Simulation)
.filter(Simulation.bot_id == bot_id)
.order_by(Simulation.started_at.desc())
.all()
)
for sim in simulations:
if sim.id in running_simulations:
engine = running_simulations[sim.id]
sim.signals = engine.get_signals()
return simulations
@router.post("/bots/{bot_id}/simulate/{run_id}/stop")
def stop_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def stop_simulation(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulation = (
db.query(Simulation)
.filter(Simulation.id == run_id, Simulation.bot_id == bot_id)
.first()
)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found"
)
if run_id in running_simulations:
engine = running_simulations[run_id]
asyncio.create_task(engine.stop())
simulation.status = "stopped"
db.commit()
return {"status": "stopping", "run_id": run_id}

View File

@@ -87,6 +87,7 @@ class SimulationCreate(BaseModel):
token: str
chain: str
duration_seconds: int = 3600
check_interval: int = 60
auto_execute: bool = False

View File

@@ -1,15 +1,177 @@
from typing import Optional, Dict, Any, List
import uuid
import asyncio
from datetime import datetime
from typing import Dict, Any, List, Optional
from ..backtest.ave_client import AveCloudClient
class SimulateEngine:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.run_id = str(uuid.uuid4())
self.status = "pending"
self.results: Optional[Dict[str, Any]] = None
self.signals: List[Dict[str, Any]] = []
self.ave_client = AveCloudClient(
api_key=config.get("ave_api_key", ""),
plan=config.get("ave_api_plan", "free"),
)
self.bot_id = config.get("bot_id")
self.strategy_config = config.get("strategy_config", {})
self.conditions = self.strategy_config.get("conditions", [])
self.actions = self.strategy_config.get("actions", [])
self.check_interval = config.get("check_interval", 60)
self.duration_seconds = config.get("duration_seconds", 3600)
self.auto_execute = config.get("auto_execute", False)
self.token = config.get("token", "")
self.chain = config.get("chain", "bsc")
self.running = False
self.started_at: Optional[datetime] = None
self.last_price: Optional[float] = None
self.last_volume: Optional[float] = None
async def run(self) -> List[Dict[str, Any]]:
raise NotImplementedError("Simulation engine not yet implemented")
async def run(self) -> Dict[str, Any]:
self.running = True
self.status = "running"
self.started_at = datetime.utcnow()
token_id = (
f"{self.token}-{self.chain}"
if self.token and not self.token.endswith(f"-{self.chain}")
else self.token
)
if not token_id or token_id == f"-{self.chain}":
self.status = "failed"
self.results = {"error": "Token ID is required"}
return self.results
end_time = datetime.utcnow().timestamp() + self.duration_seconds
try:
while self.running and datetime.utcnow().timestamp() < end_time:
try:
price_data = await self.ave_client.get_token_price(token_id)
if price_data:
current_price = float(price_data.get("price", 0))
current_volume = float(price_data.get("volume", 0))
if current_price > 0:
await self._check_conditions(
current_price, current_volume, price_data
)
self.last_price = current_price
self.last_volume = current_volume
except Exception as e:
pass
for _ in range(self.check_interval):
if not self.running:
break
await asyncio.sleep(1)
if self.running:
self.status = "completed"
else:
self.status = "stopped"
except Exception as e:
self.status = "failed"
self.results = {"error": str(e)}
self.results = self.results or {}
self.results["total_signals"] = len(self.signals)
self.results["signals"] = self.signals
self.results["started_at"] = self.started_at
self.results["ended_at"] = datetime.utcnow()
return self.results
async def _check_conditions(
self, current_price: float, current_volume: float, price_data: Dict[str, Any]
):
timestamp = int(datetime.utcnow().timestamp() * 1000)
for condition in self.conditions:
if self._check_condition(condition, current_price, current_volume):
await self._execute_actions(current_price, timestamp, condition)
break
def _check_condition(
self,
condition: Dict[str, Any],
current_price: float,
current_volume: float,
) -> bool:
cond_type = condition.get("type", "")
threshold = condition.get("threshold", 0)
price_level = condition.get("price")
direction = condition.get("direction", "above")
if cond_type == "price_drop":
if self.last_price is None or self.last_price <= 0:
return False
drop_pct = ((self.last_price - current_price) / self.last_price) * 100
return drop_pct >= threshold
elif cond_type == "price_rise":
if self.last_price is None or self.last_price <= 0:
return False
rise_pct = ((current_price - self.last_price) / self.last_price) * 100
return rise_pct >= threshold
elif cond_type == "volume_spike":
if self.last_volume is None or self.last_volume <= 0:
return False
volume_increase = (
(current_volume - self.last_volume) / self.last_volume
) * 100
return volume_increase >= threshold
elif cond_type == "price_level":
if price_level is None:
return False
if direction == "above":
return current_price > price_level
else:
return current_price < price_level
return False
async def _execute_actions(
self, price: float, timestamp: int, matched_condition: Dict[str, Any]
):
token = matched_condition.get("token", self.token)
reasoning = f"Condition {matched_condition.get('type')} triggered"
signal = {
"id": str(uuid.uuid4()),
"bot_id": self.bot_id,
"run_id": self.run_id,
"signal_type": "signal",
"token": token,
"price": price,
"confidence": 0.8,
"reasoning": reasoning,
"executed": self.auto_execute,
"created_at": datetime.utcnow(),
}
self.signals.append(signal)
async def stop(self):
raise NotImplementedError("Simulation stop not yet implemented")
self.running = False
self.status = "stopped"
def get_results(self) -> Dict[str, Any]:
return {
"id": self.run_id,
"status": self.status,
"results": self.results,
"signals": self.signals,
}
def get_signals(self) -> List[Dict[str, Any]]:
raise NotImplementedError("Simulation signals not yet implemented")
return self.signals