Compare commits

...

2 Commits

Author SHA1 Message Date
shokollm
965efa122b feat: implement Simulate Engine for real-time signal detection
Implement simulate engine for real-time signal detection via REST polling.

Changes:
- SimulateEngine service with configurable check interval (default 60s for free tier)
- REST polling for current prices using AveCloudClient
- Condition matching for real-time data (price_drop, price_rise, volume_spike, price_level)
- Signal logging with user-initiated start/stop
- Simulation API endpoints:
  - POST /api/bots/{id}/simulate - Start simulation
  - GET /api/bots/{id}/simulate/{run_id} - Get status/signals
  - GET /api/bots/{id}/simulations - List all simulations
  - POST /api/bots/{id}/simulate/{run_id}/stop - Stop simulation
- Updated SimulationCreate schema with check_interval field
- Free tier limited to 60s minimum check interval
- Signals stored in database for simulation signal history

Depends on issue #7 (Backtest Engine) which was merged in PR #18
2026-04-08 11:25:51 +00:00
0fb16f06e4 Merge pull request '[Backend] Backtest Engine - Historical Data Replay' (#18) from fix/issue-7 into main 2026-04-08 13:21:56 +02:00
3 changed files with 379 additions and 21 deletions

View File

@@ -1,38 +1,233 @@
from fastapi import APIRouter, Depends, HTTPException, status
import uuid
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from .auth import get_current_user
from ..core.database import get_db
from ..core.config import get_settings
from ..db.schemas import SimulationCreate, SimulationResponse
from ..db.models import Bot, Simulation, Signal, User
router = APIRouter()
running_simulations: Dict[str, Any] = {}
executor = ThreadPoolExecutor(max_workers=4)
@router.post("/bots/{bot_id}/simulate", response_model=SimulationResponse)
def start_simulation(
bot_id: str, config: SimulationCreate, db: Session = Depends(get_db)
def run_simulation_sync(
simulation_id: str, db_url: str, bot_id: str, config: Dict[str, Any]
):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
import asyncio
from ..services.simulate.engine import SimulateEngine
from ..core.database import SessionLocal
async def _run():
engine = SimulateEngine(config)
engine.run_id = simulation_id
running_simulations[simulation_id] = engine
try:
results = await engine.run()
db = SessionLocal()
try:
simulation = (
db.query(Simulation).filter(Simulation.id == simulation_id).first()
)
if simulation:
simulation.status = engine.status
simulation.signals = engine.signals
db.commit()
for signal in engine.signals:
db_signal = Signal(
id=signal["id"],
bot_id=signal["bot_id"],
run_id=signal["run_id"],
signal_type=signal["signal_type"],
token=signal["token"],
price=signal["price"],
confidence=signal.get("confidence"),
reasoning=signal.get("reasoning"),
executed=signal.get("executed", False),
created_at=signal["created_at"],
)
db.add(db_signal)
db.commit()
finally:
db.close()
finally:
if simulation_id in running_simulations:
del running_simulations[simulation_id]
asyncio.run(_run())
@router.post(
"/bots/{bot_id}/simulate",
response_model=SimulationResponse,
status_code=status.HTTP_201_CREATED,
)
async def start_simulation(
bot_id: str,
config: SimulationCreate,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
settings = get_settings()
simulation_id = str(uuid.uuid4())
check_interval = config.check_interval
if settings.AVE_API_PLAN != "pro" and check_interval < 60:
check_interval = 60
simulation_config = {
"bot_id": bot_id,
"token": config.token,
"chain": config.chain,
"duration_seconds": config.duration_seconds,
"check_interval": check_interval,
"auto_execute": config.auto_execute,
"strategy_config": bot.strategy_config,
"ave_api_key": settings.AVE_API_KEY,
"ave_api_plan": settings.AVE_API_PLAN,
}
simulation = Simulation(
id=simulation_id,
bot_id=bot_id,
started_at=datetime.utcnow(),
status="running",
config={
"token": config.token,
"chain": config.chain,
"duration_seconds": config.duration_seconds,
"check_interval": check_interval,
"auto_execute": config.auto_execute,
},
signals=[],
)
db.add(simulation)
db.commit()
db.refresh(simulation)
db_url = str(settings.DATABASE_URL)
background_tasks.add_task(
run_simulation_sync, simulation_id, db_url, bot_id, simulation_config
)
return simulation
@router.get("/bots/{bot_id}/simulate/{run_id}", response_model=SimulationResponse)
def get_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)):
def get_simulation(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulation = (
db.query(Simulation)
.filter(Simulation.id == run_id, Simulation.bot_id == bot_id)
.first()
)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found"
)
if run_id in running_simulations:
engine = running_simulations[run_id]
simulation.signals = engine.get_signals()
return simulation
@router.get("/bots/{bot_id}/simulations", response_model=List[SimulationResponse])
def list_simulations(bot_id: str, db: Session = Depends(get_db)):
def list_simulations(
bot_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulations = (
db.query(Simulation)
.filter(Simulation.bot_id == bot_id)
.order_by(Simulation.started_at.desc())
.all()
)
for sim in simulations:
if sim.id in running_simulations:
engine = running_simulations[sim.id]
sim.signals = engine.get_signals()
return simulations
@router.post("/bots/{bot_id}/simulate/{run_id}/stop")
def stop_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)):
def stop_simulation(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulation = (
db.query(Simulation)
.filter(Simulation.id == run_id, Simulation.bot_id == bot_id)
.first()
)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found"
)
if run_id in running_simulations:
engine = running_simulations[run_id]
asyncio.create_task(engine.stop())
simulation.status = "stopped"
db.commit()
return {"status": "stopping", "run_id": run_id}

View File

@@ -87,6 +87,7 @@ class SimulationCreate(BaseModel):
token: str
chain: str
duration_seconds: int = 3600
check_interval: int = 60
auto_execute: bool = False

View File

@@ -1,15 +1,177 @@
from typing import Optional, Dict, Any, List
import uuid
import asyncio
from datetime import datetime
from typing import Dict, Any, List, Optional
from ..backtest.ave_client import AveCloudClient
class SimulateEngine:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.run_id = str(uuid.uuid4())
self.status = "pending"
self.results: Optional[Dict[str, Any]] = None
self.signals: List[Dict[str, Any]] = []
self.ave_client = AveCloudClient(
api_key=config.get("ave_api_key", ""),
plan=config.get("ave_api_plan", "free"),
)
self.bot_id = config.get("bot_id")
self.strategy_config = config.get("strategy_config", {})
self.conditions = self.strategy_config.get("conditions", [])
self.actions = self.strategy_config.get("actions", [])
self.check_interval = config.get("check_interval", 60)
self.duration_seconds = config.get("duration_seconds", 3600)
self.auto_execute = config.get("auto_execute", False)
self.token = config.get("token", "")
self.chain = config.get("chain", "bsc")
self.running = False
self.started_at: Optional[datetime] = None
self.last_price: Optional[float] = None
self.last_volume: Optional[float] = None
async def run(self) -> List[Dict[str, Any]]:
raise NotImplementedError("Simulation engine not yet implemented")
async def run(self) -> Dict[str, Any]:
self.running = True
self.status = "running"
self.started_at = datetime.utcnow()
token_id = (
f"{self.token}-{self.chain}"
if self.token and not self.token.endswith(f"-{self.chain}")
else self.token
)
if not token_id or token_id == f"-{self.chain}":
self.status = "failed"
self.results = {"error": "Token ID is required"}
return self.results
end_time = datetime.utcnow().timestamp() + self.duration_seconds
try:
while self.running and datetime.utcnow().timestamp() < end_time:
try:
price_data = await self.ave_client.get_token_price(token_id)
if price_data:
current_price = float(price_data.get("price", 0))
current_volume = float(price_data.get("volume", 0))
if current_price > 0:
await self._check_conditions(
current_price, current_volume, price_data
)
self.last_price = current_price
self.last_volume = current_volume
except Exception as e:
pass
for _ in range(self.check_interval):
if not self.running:
break
await asyncio.sleep(1)
if self.running:
self.status = "completed"
else:
self.status = "stopped"
except Exception as e:
self.status = "failed"
self.results = {"error": str(e)}
self.results = self.results or {}
self.results["total_signals"] = len(self.signals)
self.results["signals"] = self.signals
self.results["started_at"] = self.started_at
self.results["ended_at"] = datetime.utcnow()
return self.results
async def _check_conditions(
self, current_price: float, current_volume: float, price_data: Dict[str, Any]
):
timestamp = int(datetime.utcnow().timestamp() * 1000)
for condition in self.conditions:
if self._check_condition(condition, current_price, current_volume):
await self._execute_actions(current_price, timestamp, condition)
break
def _check_condition(
self,
condition: Dict[str, Any],
current_price: float,
current_volume: float,
) -> bool:
cond_type = condition.get("type", "")
threshold = condition.get("threshold", 0)
price_level = condition.get("price")
direction = condition.get("direction", "above")
if cond_type == "price_drop":
if self.last_price is None or self.last_price <= 0:
return False
drop_pct = ((self.last_price - current_price) / self.last_price) * 100
return drop_pct >= threshold
elif cond_type == "price_rise":
if self.last_price is None or self.last_price <= 0:
return False
rise_pct = ((current_price - self.last_price) / self.last_price) * 100
return rise_pct >= threshold
elif cond_type == "volume_spike":
if self.last_volume is None or self.last_volume <= 0:
return False
volume_increase = (
(current_volume - self.last_volume) / self.last_volume
) * 100
return volume_increase >= threshold
elif cond_type == "price_level":
if price_level is None:
return False
if direction == "above":
return current_price > price_level
else:
return current_price < price_level
return False
async def _execute_actions(
self, price: float, timestamp: int, matched_condition: Dict[str, Any]
):
token = matched_condition.get("token", self.token)
reasoning = f"Condition {matched_condition.get('type')} triggered"
signal = {
"id": str(uuid.uuid4()),
"bot_id": self.bot_id,
"run_id": self.run_id,
"signal_type": "signal",
"token": token,
"price": price,
"confidence": 0.8,
"reasoning": reasoning,
"executed": self.auto_execute,
"created_at": datetime.utcnow(),
}
self.signals.append(signal)
async def stop(self):
raise NotImplementedError("Simulation stop not yet implemented")
self.running = False
self.status = "stopped"
def get_results(self) -> Dict[str, Any]:
return {
"id": self.run_id,
"status": self.status,
"results": self.results,
"signals": self.signals,
}
def get_signals(self) -> List[Dict[str, Any]]:
raise NotImplementedError("Simulation signals not yet implemented")
return self.signals