Compare commits

..

4 Commits

Author SHA1 Message Date
shokollm
965efa122b feat: implement Simulate Engine for real-time signal detection
Implement simulate engine for real-time signal detection via REST polling.

Changes:
- SimulateEngine service with configurable check interval (default 60s for free tier)
- REST polling for current prices using AveCloudClient
- Condition matching for real-time data (price_drop, price_rise, volume_spike, price_level)
- Signal logging with user-initiated start/stop
- Simulation API endpoints:
  - POST /api/bots/{id}/simulate - Start simulation
  - GET /api/bots/{id}/simulate/{run_id} - Get status/signals
  - GET /api/bots/{id}/simulations - List all simulations
  - POST /api/bots/{id}/simulate/{run_id}/stop - Stop simulation
- Updated SimulationCreate schema with check_interval field
- Free tier limited to 60s minimum check interval
- Signals stored in database for simulation signal history

Depends on issue #7 (Backtest Engine) which was merged in PR #18
2026-04-08 11:25:51 +00:00
0fb16f06e4 Merge pull request '[Backend] Backtest Engine - Historical Data Replay' (#18) from fix/issue-7 into main 2026-04-08 13:21:56 +02:00
shokollm
a461005015 Implement Backtest Engine - Historical Data Replay
Implements issue #7 - Backtest Engine for historical strategy testing.

Changes:
- Created AveCloudClient for fetching klines from AVE Cloud Data API
- Implemented BacktestEngine with condition matching (price_drop, price_rise, volume_spike, price_level)
- Implemented signal generation and portfolio simulation
- Calculates metrics: total_return, win_rate, max_drawdown, sharpe_ratio, total_trades
- Implemented async/background backtest execution via FastAPI BackgroundTasks
- Stores results in backtests table and signals table
- All backtest API endpoints with JWT auth and ownership validation

API Endpoints:
- POST /api/bots/{id}/backtest - Start backtest
- GET /api/bots/{id}/backtest/{run_id} - Get status/results
- GET /api/bots/{id}/backtests - List all backtests
- POST /api/bots/{id}/backtest/{run_id}/stop - Stop running backtest
2026-04-08 09:39:07 +00:00
b0311bc96f Merge pull request '[Backend] Chat Interface + CrewAI Integration' (#17) from fix/issue-6 into main 2026-04-08 08:37:58 +02:00
6 changed files with 960 additions and 40 deletions

View File

@@ -1,36 +1,219 @@
from fastapi import APIRouter, Depends, HTTPException, status
import uuid
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from .auth import get_current_user
from ..core.database import get_db
from ..core.config import get_settings
from ..db.schemas import BacktestCreate, BacktestResponse
from ..db.models import Bot, Backtest, Signal, User
router = APIRouter()
running_backtests: Dict[str, Any] = {}
executor = ThreadPoolExecutor(max_workers=4)
@router.post("/bots/{bot_id}/backtest", response_model=BacktestResponse)
def start_backtest(bot_id: str, config: BacktestCreate, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def run_backtest_sync(
backtest_id: str, db_url: str, bot_id: str, config: Dict[str, Any]
):
import asyncio
from ..services.backtest.engine import BacktestEngine
from ..core.database import SessionLocal
async def _run():
engine = BacktestEngine(config)
engine.run_id = backtest_id
running_backtests[backtest_id] = engine
try:
results = await engine.run()
db = SessionLocal()
try:
backtest = db.query(Backtest).filter(Backtest.id == backtest_id).first()
if backtest:
backtest.status = engine.status
backtest.ended_at = datetime.utcnow()
backtest.result = results
db.commit()
for signal in engine.signals:
db_signal = Signal(
id=signal["id"],
bot_id=signal["bot_id"],
run_id=signal["run_id"],
signal_type=signal["signal_type"],
token=signal["token"],
price=signal["price"],
confidence=signal.get("confidence"),
reasoning=signal.get("reasoning"),
executed=signal.get("executed", False),
created_at=signal["created_at"],
)
db.add(db_signal)
db.commit()
finally:
db.close()
finally:
if backtest_id in running_backtests:
del running_backtests[backtest_id]
asyncio.run(_run())
@router.post(
"/bots/{bot_id}/backtest",
response_model=BacktestResponse,
status_code=status.HTTP_201_CREATED,
)
async def start_backtest(
bot_id: str,
config: BacktestCreate,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
settings = get_settings()
backtest_id = str(uuid.uuid4())
backtest_config = {
"bot_id": bot_id,
"token": config.token,
"chain": config.chain,
"timeframe": config.timeframe,
"start_date": config.start_date,
"end_date": config.end_date,
"strategy_config": bot.strategy_config,
"ave_api_key": settings.AVE_API_KEY,
"ave_api_plan": settings.AVE_API_PLAN,
"initial_balance": 10000.0,
}
backtest = Backtest(
id=backtest_id,
bot_id=bot_id,
started_at=datetime.utcnow(),
status="running",
config={
"token": config.token,
"chain": config.chain,
"timeframe": config.timeframe,
"start_date": config.start_date,
"end_date": config.end_date,
},
)
db.add(backtest)
db.commit()
db.refresh(backtest)
db_url = str(settings.DATABASE_URL)
background_tasks.add_task(
run_backtest_sync, backtest_id, db_url, bot_id, backtest_config
)
return backtest
@router.get("/bots/{bot_id}/backtest/{run_id}", response_model=BacktestResponse)
def get_backtest(bot_id: str, run_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def get_backtest(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
backtest = (
db.query(Backtest)
.filter(Backtest.id == run_id, Backtest.bot_id == bot_id)
.first()
)
if not backtest:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Backtest not found"
)
return backtest
@router.get("/bots/{bot_id}/backtests", response_model=List[BacktestResponse])
def list_backtests(bot_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def list_backtests(
bot_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
backtests = (
db.query(Backtest)
.filter(Backtest.bot_id == bot_id)
.order_by(Backtest.started_at.desc())
.all()
)
return backtests
@router.post("/bots/{bot_id}/backtest/{run_id}/stop")
def stop_backtest(bot_id: str, run_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def stop_backtest(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
backtest = (
db.query(Backtest)
.filter(Backtest.id == run_id, Backtest.bot_id == bot_id)
.first()
)
if not backtest:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Backtest not found"
)
if run_id in running_backtests:
engine = running_backtests[run_id]
asyncio.create_task(engine.stop())
backtest.status = "stopped"
backtest.ended_at = datetime.utcnow()
db.commit()
return {"status": "stopping", "run_id": run_id}

View File

@@ -1,38 +1,233 @@
from fastapi import APIRouter, Depends, HTTPException, status
import uuid
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor
from .auth import get_current_user
from ..core.database import get_db
from ..core.config import get_settings
from ..db.schemas import SimulationCreate, SimulationResponse
from ..db.models import Bot, Simulation, Signal, User
router = APIRouter()
running_simulations: Dict[str, Any] = {}
executor = ThreadPoolExecutor(max_workers=4)
@router.post("/bots/{bot_id}/simulate", response_model=SimulationResponse)
def start_simulation(
bot_id: str, config: SimulationCreate, db: Session = Depends(get_db)
def run_simulation_sync(
simulation_id: str, db_url: str, bot_id: str, config: Dict[str, Any]
):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
import asyncio
from ..services.simulate.engine import SimulateEngine
from ..core.database import SessionLocal
async def _run():
engine = SimulateEngine(config)
engine.run_id = simulation_id
running_simulations[simulation_id] = engine
try:
results = await engine.run()
db = SessionLocal()
try:
simulation = (
db.query(Simulation).filter(Simulation.id == simulation_id).first()
)
if simulation:
simulation.status = engine.status
simulation.signals = engine.signals
db.commit()
for signal in engine.signals:
db_signal = Signal(
id=signal["id"],
bot_id=signal["bot_id"],
run_id=signal["run_id"],
signal_type=signal["signal_type"],
token=signal["token"],
price=signal["price"],
confidence=signal.get("confidence"),
reasoning=signal.get("reasoning"),
executed=signal.get("executed", False),
created_at=signal["created_at"],
)
db.add(db_signal)
db.commit()
finally:
db.close()
finally:
if simulation_id in running_simulations:
del running_simulations[simulation_id]
asyncio.run(_run())
@router.post(
"/bots/{bot_id}/simulate",
response_model=SimulationResponse,
status_code=status.HTTP_201_CREATED,
)
async def start_simulation(
bot_id: str,
config: SimulationCreate,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
settings = get_settings()
simulation_id = str(uuid.uuid4())
check_interval = config.check_interval
if settings.AVE_API_PLAN != "pro" and check_interval < 60:
check_interval = 60
simulation_config = {
"bot_id": bot_id,
"token": config.token,
"chain": config.chain,
"duration_seconds": config.duration_seconds,
"check_interval": check_interval,
"auto_execute": config.auto_execute,
"strategy_config": bot.strategy_config,
"ave_api_key": settings.AVE_API_KEY,
"ave_api_plan": settings.AVE_API_PLAN,
}
simulation = Simulation(
id=simulation_id,
bot_id=bot_id,
started_at=datetime.utcnow(),
status="running",
config={
"token": config.token,
"chain": config.chain,
"duration_seconds": config.duration_seconds,
"check_interval": check_interval,
"auto_execute": config.auto_execute,
},
signals=[],
)
db.add(simulation)
db.commit()
db.refresh(simulation)
db_url = str(settings.DATABASE_URL)
background_tasks.add_task(
run_simulation_sync, simulation_id, db_url, bot_id, simulation_config
)
return simulation
@router.get("/bots/{bot_id}/simulate/{run_id}", response_model=SimulationResponse)
def get_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def get_simulation(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulation = (
db.query(Simulation)
.filter(Simulation.id == run_id, Simulation.bot_id == bot_id)
.first()
)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found"
)
if run_id in running_simulations:
engine = running_simulations[run_id]
simulation.signals = engine.get_signals()
return simulation
@router.get("/bots/{bot_id}/simulations", response_model=List[SimulationResponse])
def list_simulations(bot_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def list_simulations(
bot_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulations = (
db.query(Simulation)
.filter(Simulation.bot_id == bot_id)
.order_by(Simulation.started_at.desc())
.all()
)
for sim in simulations:
if sim.id in running_simulations:
engine = running_simulations[sim.id]
sim.signals = engine.get_signals()
return simulations
@router.post("/bots/{bot_id}/simulate/{run_id}/stop")
def stop_simulation(bot_id: str, run_id: str, db: Session = Depends(get_db)):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
def stop_simulation(
bot_id: str,
run_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
bot = db.query(Bot).filter(Bot.id == bot_id).first()
if not bot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Bot not found"
)
if bot.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized"
)
simulation = (
db.query(Simulation)
.filter(Simulation.id == run_id, Simulation.bot_id == bot_id)
.first()
)
if not simulation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Simulation not found"
)
if run_id in running_simulations:
engine = running_simulations[run_id]
asyncio.create_task(engine.stop())
simulation.status = "stopped"
db.commit()
return {"status": "stopping", "run_id": run_id}

View File

@@ -87,6 +87,7 @@ class SimulationCreate(BaseModel):
token: str
chain: str
duration_seconds: int = 3600
check_interval: int = 60
auto_execute: bool = False

View File

@@ -0,0 +1,70 @@
import httpx
from typing import List, Dict, Any, Optional
from datetime import datetime
class AveCloudClient:
BASE_URL = "https://prod.ave-api.com"
def __init__(self, api_key: str, plan: str = "free"):
self.api_key = api_key
self.plan = plan
def _headers(self) -> Dict[str, str]:
return {"X-API-KEY": self.api_key}
async def get_klines(
self,
token_id: str,
interval: str = "1h",
limit: int = 100,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> List[Dict[str, Any]]:
url = f"{self.BASE_URL}/v2/klines/token/{token_id}"
params = {"interval": interval, "limit": limit}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
async with httpx.AsyncClient() as client:
response = await client.get(
url, headers=self._headers(), params=params, timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data", [])
raise Exception(f"Failed to fetch klines: {data}")
async def get_token_price(self, token_id: str) -> Optional[Dict[str, Any]]:
url = f"{self.BASE_URL}/v2/tokens/price"
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=self._headers(),
json={"token_ids": [token_id]},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
prices = data.get("data", {})
return prices.get(token_id)
return None
async def get_batch_prices(self, token_ids: List[str]) -> Dict[str, Dict[str, Any]]:
url = f"{self.BASE_URL}/v2/tokens/price"
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=self._headers(),
json={"token_ids": token_ids},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data", {})
return {}

View File

@@ -1,15 +1,324 @@
from typing import Optional, Dict, Any
import uuid
import asyncio
from datetime import datetime
from typing import Dict, Any, List, Optional
from .ave_client import AveCloudClient
class BacktestEngine:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.run_id = str(uuid.uuid4())
self.status = "pending"
self.results: Optional[Dict[str, Any]] = None
self.signals: List[Dict[str, Any]] = []
self.ave_client = AveCloudClient(
api_key=config.get("ave_api_key", ""),
plan=config.get("ave_api_plan", "free"),
)
self.bot_id = config.get("bot_id")
self.strategy_config = config.get("strategy_config", {})
self.conditions = self.strategy_config.get("conditions", [])
self.actions = self.strategy_config.get("actions", [])
self.initial_balance = config.get("initial_balance", 10000.0)
self.current_balance = self.initial_balance
self.position = 0.0
self.position_token = ""
self.trades: List[Dict[str, Any]] = []
self.running = False
async def run(self) -> Dict[str, Any]:
raise NotImplementedError("Backtest engine not yet implemented")
self.running = True
self.status = "running"
started_at = datetime.utcnow()
try:
token = self.config.get("token", "")
chain = self.config.get("chain", "bsc")
timeframe = self.config.get("timeframe", "1h")
start_date = self.config.get("start_date", "")
end_date = self.config.get("end_date", "")
token_id = (
f"{token}-{chain}"
if token and not token.endswith(f"-{chain}")
else token
)
if not token_id or token_id == f"-{chain}":
raise ValueError("Token ID is required")
start_ts = None
end_ts = None
if start_date:
start_ts = int(
datetime.fromisoformat(
start_date.replace("Z", "+00:00")
).timestamp()
* 1000
)
if end_date:
end_ts = int(
datetime.fromisoformat(end_date.replace("Z", "+00:00")).timestamp()
* 1000
)
klines = await self.ave_client.get_klines(
token_id=token_id,
interval=timeframe,
limit=1000,
start_time=start_ts,
end_time=end_ts,
)
if not klines:
self.status = "failed"
self.results = {"error": "No kline data available"}
return self.results
await self._process_klines(klines)
self._calculate_metrics()
self.status = "completed"
except Exception as e:
self.status = "failed"
self.results = {"error": str(e)}
ended_at = datetime.utcnow()
self.results = self.results or {}
self.results["started_at"] = started_at
self.results["ended_at"] = ended_at
self.results["duration_seconds"] = (ended_at - started_at).total_seconds()
return self.results
async def _process_klines(self, klines: List[Dict[str, Any]]):
for i, kline in enumerate(klines):
if not self.running:
break
price = float(kline.get("close", 0))
if price <= 0:
continue
timestamp = kline.get("timestamp", 0)
for condition in self.conditions:
if self._check_condition(condition, klines, i, price):
await self._execute_actions(price, timestamp, condition)
break
def _check_condition(
self,
condition: Dict[str, Any],
klines: List[Dict[str, Any]],
current_idx: int,
current_price: float,
) -> bool:
cond_type = condition.get("type", "")
threshold = condition.get("threshold", 0)
timeframe = condition.get("timeframe", "1h")
price_level = condition.get("price")
direction = condition.get("direction", "above")
if cond_type == "price_drop":
if current_idx == 0:
return False
prev_price = float(klines[current_idx - 1].get("close", 0))
if prev_price <= 0:
return False
drop_pct = ((prev_price - current_price) / prev_price) * 100
return drop_pct >= threshold
elif cond_type == "price_rise":
if current_idx == 0:
return False
prev_price = float(klines[current_idx - 1].get("close", 0))
if prev_price <= 0:
return False
rise_pct = ((current_price - prev_price) / prev_price) * 100
return rise_pct >= threshold
elif cond_type == "volume_spike":
if current_idx == 0:
return False
prev_volume = float(klines[current_idx - 1].get("volume", 0))
current_volume = float(kline.get("volume", 0))
if prev_volume <= 0:
return False
volume_increase = ((current_volume - prev_volume) / prev_volume) * 100
return volume_increase >= threshold
elif cond_type == "price_level":
if price_level is None:
return False
if direction == "above":
return current_price > price_level
else:
return current_price < price_level
return False
async def _execute_actions(
self, price: float, timestamp: int, matched_condition: Dict[str, Any]
):
token = matched_condition.get("token", self.config.get("token", ""))
for action in self.actions:
action_type = action.get("type", "")
amount_percent = action.get("amount_percent", 10)
amount = self.current_balance * (amount_percent / 100)
if action_type == "buy" and self.current_balance >= amount:
self.position += amount / price
self.current_balance -= amount
self.position_token = token
self.trades.append(
{
"type": "buy",
"token": token,
"price": price,
"amount": amount,
"quantity": amount / price,
"timestamp": timestamp,
}
)
self.signals.append(
{
"id": str(uuid.uuid4()),
"bot_id": self.bot_id,
"run_id": self.run_id,
"signal_type": "buy",
"token": token,
"price": price,
"confidence": 0.8,
"reasoning": f"Condition {matched_condition.get('type')} triggered buy",
"executed": False,
"created_at": datetime.utcnow(),
}
)
elif action_type == "sell" and self.position > 0:
sell_amount = self.position * price
self.current_balance += sell_amount
self.trades.append(
{
"type": "sell",
"token": self.position_token,
"price": price,
"amount": sell_amount,
"quantity": self.position,
"timestamp": timestamp,
}
)
self.position = 0
self.signals.append(
{
"id": str(uuid.uuid4()),
"bot_id": self.bot_id,
"run_id": self.run_id,
"signal_type": "sell",
"token": self.position_token,
"price": price,
"confidence": 0.8,
"reasoning": f"Condition {matched_condition.get('type')} triggered sell",
"executed": False,
"created_at": datetime.utcnow(),
}
)
def _calculate_metrics(self):
final_balance = self.current_balance + (
self.position * self.trades[-1]["price"]
if self.trades and self.position > 0
else 0
)
total_return = (
(final_balance - self.initial_balance) / self.initial_balance
) * 100
buy_trades = [t for t in self.trades if t["type"] == "buy"]
sell_trades = [t for t in self.trades if t["type"] == "sell"]
total_trades = len(buy_trades) + len(sell_trades)
winning_trades = 0
for i, trade in enumerate(sell_trades):
if i < len(buy_trades):
buy_price = buy_trades[i]["price"]
sell_price = trade["price"]
if sell_price > buy_price:
winning_trades += 1
win_rate = (winning_trades / len(sell_trades) * 100) if sell_trades else 0
portfolio_values = []
running_balance = self.initial_balance
running_position = 0.0
current_token = ""
last_price = 0.0
for trade in self.trades:
if trade["type"] == "buy":
running_position = trade["quantity"]
running_balance = trade["amount"]
current_token = trade["token"]
last_price = trade["price"]
else:
running_balance = trade["amount"]
running_position = 0
last_price = trade["price"]
portfolio_value = running_balance + (running_position * last_price)
portfolio_values.append(portfolio_value)
max_value = self.initial_balance
max_drawdown = 0.0
for value in portfolio_values:
if value > max_value:
max_value = value
drawdown = ((max_value - value) / max_value) * 100
if drawdown > max_drawdown:
max_drawdown = drawdown
sharpe_ratio = 0.0
if len(portfolio_values) > 1:
returns = []
for i in range(1, len(portfolio_values)):
ret = (
portfolio_values[i] - portfolio_values[i - 1]
) / portfolio_values[i - 1]
returns.append(ret)
if returns:
avg_return = sum(returns) / len(returns)
variance = sum((r - avg_return) ** 2 for r in returns) / len(returns)
std_dev = variance**0.5
if std_dev > 0:
sharpe_ratio = avg_return / std_dev
buy_signals = len(buy_trades)
sell_signals = len(sell_trades)
self.results = {
"total_return": round(total_return, 2),
"win_rate": round(win_rate, 2),
"total_trades": total_trades,
"buy_signals": buy_signals,
"sell_signals": sell_signals,
"max_drawdown": round(max_drawdown, 2),
"sharpe_ratio": round(sharpe_ratio, 2),
"final_balance": round(final_balance, 2),
"signals": self.signals,
}
async def stop(self):
raise NotImplementedError("Backtest stop not yet implemented")
self.running = False
self.status = "stopped"
self._calculate_metrics()
def get_results(self) -> Dict[str, Any]:
raise NotImplementedError("Backtest results not yet implemented")
return {
"id": self.run_id,
"status": self.status,
"results": self.results,
"signals": self.signals,
}

View File

@@ -1,15 +1,177 @@
from typing import Optional, Dict, Any, List
import uuid
import asyncio
from datetime import datetime
from typing import Dict, Any, List, Optional
from ..backtest.ave_client import AveCloudClient
class SimulateEngine:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.run_id = str(uuid.uuid4())
self.status = "pending"
self.results: Optional[Dict[str, Any]] = None
self.signals: List[Dict[str, Any]] = []
self.ave_client = AveCloudClient(
api_key=config.get("ave_api_key", ""),
plan=config.get("ave_api_plan", "free"),
)
self.bot_id = config.get("bot_id")
self.strategy_config = config.get("strategy_config", {})
self.conditions = self.strategy_config.get("conditions", [])
self.actions = self.strategy_config.get("actions", [])
self.check_interval = config.get("check_interval", 60)
self.duration_seconds = config.get("duration_seconds", 3600)
self.auto_execute = config.get("auto_execute", False)
self.token = config.get("token", "")
self.chain = config.get("chain", "bsc")
self.running = False
self.started_at: Optional[datetime] = None
self.last_price: Optional[float] = None
self.last_volume: Optional[float] = None
async def run(self) -> List[Dict[str, Any]]:
raise NotImplementedError("Simulation engine not yet implemented")
async def run(self) -> Dict[str, Any]:
self.running = True
self.status = "running"
self.started_at = datetime.utcnow()
token_id = (
f"{self.token}-{self.chain}"
if self.token and not self.token.endswith(f"-{self.chain}")
else self.token
)
if not token_id or token_id == f"-{self.chain}":
self.status = "failed"
self.results = {"error": "Token ID is required"}
return self.results
end_time = datetime.utcnow().timestamp() + self.duration_seconds
try:
while self.running and datetime.utcnow().timestamp() < end_time:
try:
price_data = await self.ave_client.get_token_price(token_id)
if price_data:
current_price = float(price_data.get("price", 0))
current_volume = float(price_data.get("volume", 0))
if current_price > 0:
await self._check_conditions(
current_price, current_volume, price_data
)
self.last_price = current_price
self.last_volume = current_volume
except Exception as e:
pass
for _ in range(self.check_interval):
if not self.running:
break
await asyncio.sleep(1)
if self.running:
self.status = "completed"
else:
self.status = "stopped"
except Exception as e:
self.status = "failed"
self.results = {"error": str(e)}
self.results = self.results or {}
self.results["total_signals"] = len(self.signals)
self.results["signals"] = self.signals
self.results["started_at"] = self.started_at
self.results["ended_at"] = datetime.utcnow()
return self.results
async def _check_conditions(
self, current_price: float, current_volume: float, price_data: Dict[str, Any]
):
timestamp = int(datetime.utcnow().timestamp() * 1000)
for condition in self.conditions:
if self._check_condition(condition, current_price, current_volume):
await self._execute_actions(current_price, timestamp, condition)
break
def _check_condition(
self,
condition: Dict[str, Any],
current_price: float,
current_volume: float,
) -> bool:
cond_type = condition.get("type", "")
threshold = condition.get("threshold", 0)
price_level = condition.get("price")
direction = condition.get("direction", "above")
if cond_type == "price_drop":
if self.last_price is None or self.last_price <= 0:
return False
drop_pct = ((self.last_price - current_price) / self.last_price) * 100
return drop_pct >= threshold
elif cond_type == "price_rise":
if self.last_price is None or self.last_price <= 0:
return False
rise_pct = ((current_price - self.last_price) / self.last_price) * 100
return rise_pct >= threshold
elif cond_type == "volume_spike":
if self.last_volume is None or self.last_volume <= 0:
return False
volume_increase = (
(current_volume - self.last_volume) / self.last_volume
) * 100
return volume_increase >= threshold
elif cond_type == "price_level":
if price_level is None:
return False
if direction == "above":
return current_price > price_level
else:
return current_price < price_level
return False
async def _execute_actions(
self, price: float, timestamp: int, matched_condition: Dict[str, Any]
):
token = matched_condition.get("token", self.token)
reasoning = f"Condition {matched_condition.get('type')} triggered"
signal = {
"id": str(uuid.uuid4()),
"bot_id": self.bot_id,
"run_id": self.run_id,
"signal_type": "signal",
"token": token,
"price": price,
"confidence": 0.8,
"reasoning": reasoning,
"executed": self.auto_execute,
"created_at": datetime.utcnow(),
}
self.signals.append(signal)
async def stop(self):
raise NotImplementedError("Simulation stop not yet implemented")
self.running = False
self.status = "stopped"
def get_results(self) -> Dict[str, Any]:
return {
"id": self.run_id,
"status": self.status,
"results": self.results,
"signals": self.signals,
}
def get_signals(self) -> List[Dict[str, Any]]:
raise NotImplementedError("Simulation signals not yet implemented")
return self.signals