feat: klines-based simulation instead of polling

- Fetch historical klines once from AVE API (10 CU per request)
- Process each candle as a time step
- Limit to 500 candles max per simulation
- No continuous polling - processes all data in seconds
- Frontend now selects kline interval (1m, 5m, 15m, 1h)
- Much more efficient API usage
This commit is contained in:
shokollm
2026-04-12 01:34:20 +00:00
parent d4400f5dcd
commit f425ae08d7
5 changed files with 135 additions and 66 deletions

View File

@@ -145,7 +145,7 @@ async def start_simulation(
"bot_id": bot_id, "bot_id": bot_id,
"token": config.token, "token": config.token,
"chain": config.chain, "chain": config.chain,
"check_interval": config.check_interval, "kline_interval": config.kline_interval,
"auto_execute": False, # Always paper trade "auto_execute": False, # Always paper trade
"strategy_config": bot.strategy_config, "strategy_config": bot.strategy_config,
"ave_api_key": settings.AVE_API_KEY, "ave_api_key": settings.AVE_API_KEY,
@@ -160,7 +160,7 @@ async def start_simulation(
config={ config={
"token": config.token, "token": config.token,
"chain": config.chain, "chain": config.chain,
"check_interval": config.check_interval, "kline_interval": config.kline_interval,
}, },
signals=[], signals=[],
) )

View File

@@ -100,7 +100,7 @@ class BacktestResponse(BaseModel):
class SimulationCreate(BaseModel): class SimulationCreate(BaseModel):
token: str token: str
chain: str chain: str
check_interval: int = 60 kline_interval: str = "1m"
@field_validator("chain") @field_validator("chain")
@classmethod @classmethod

View File

@@ -3,6 +3,7 @@ import asyncio
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from ..ave.client import AveCloudClient from ..ave.client import AveCloudClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -26,23 +27,38 @@ class SimulateEngine:
self.risk_management = self.strategy_config.get("risk_management", {}) self.risk_management = self.strategy_config.get("risk_management", {})
self.stop_loss_percent = self.risk_management.get("stop_loss_percent") self.stop_loss_percent = self.risk_management.get("stop_loss_percent")
self.take_profit_percent = self.risk_management.get("take_profit_percent") self.take_profit_percent = self.risk_management.get("take_profit_percent")
self.check_interval = config.get("check_interval", 60)
self.duration_seconds = config.get("duration_seconds", 3600) # Kline-based settings
self.kline_interval = config.get("kline_interval", "1m")
self.max_candles = config.get("max_candles", 500) # Limit candles to process
self.auto_execute = config.get("auto_execute", False) self.auto_execute = config.get("auto_execute", False)
self.token = config.get("token", "") self.token = config.get("token", "")
self.chain = config.get("chain", "bsc") self.chain = config.get("chain", "bsc")
self.running = False self.running = False
self.started_at: Optional[datetime] = None self.started_at: Optional[datetime] = None
self.last_price: Optional[float] = None
# Price tracking (for conditions)
self.last_close: Optional[float] = None
self.last_volume: Optional[float] = None self.last_volume: Optional[float] = None
# Position tracking (for risk management)
self.position: float = 0.0 self.position: float = 0.0
self.position_token: str = "" self.position_token: str = ""
self.entry_price: Optional[float] = None self.entry_price: Optional[float] = None
self.entry_time: Optional[int] = None self.entry_time: Optional[int] = None
# Portfolio
self.current_balance: float = config.get("initial_balance", 10000.0) self.current_balance: float = config.get("initial_balance", 10000.0)
self.trades: List[Dict[str, Any]] = [] self.trades: List[Dict[str, Any]] = []
# Error tracking
self.errors: List[str] = [] self.errors: List[str] = []
# Kline data
self.klines: List[Dict[str, Any]] = []
self.last_processed_time: Optional[int] = None
async def run(self) -> Dict[str, Any]: async def run(self) -> Dict[str, Any]:
self.running = True self.running = True
self.status = "running" self.status = "running"
@@ -59,71 +75,113 @@ class SimulateEngine:
self.results = {"error": "Token ID is required"} self.results = {"error": "Token ID is required"}
return self.results return self.results
# Run forever until stopped (no end_time limit)
try: try:
while self.running: # Step 1: Fetch klines (only once for simulation)
try: self.klines = await self._fetch_klines(token_id)
price_data = await self.ave_client.get_token_price(token_id)
if price_data:
current_price = float(price_data.get("price", 0))
current_volume = float(price_data.get("volume", 0))
if current_price > 0: if not self.klines:
# Only check conditions if we have a previous price to compare self.status = "failed"
if self.last_price is not None: self.results = {"error": "No kline data available"}
await self._check_conditions( return self.results
current_price, current_volume, price_data
)
# Update last price AFTER checking (so next iteration has comparison data)
self.last_price = current_price
self.last_volume = current_volume
except Exception as e: logger.info(f"Fetched {len(self.klines)} klines for {token_id}")
logger.warning(f"Failed to get price for {token_id}: {e}")
self.errors.append(f"Price fetch failed for {token_id}: {str(e)}")
continue
for _ in range(self.check_interval): # Step 2: Process candles (with limit)
candles_processed = 0
for candle in self.klines:
if not self.running: if not self.running:
break break
await asyncio.sleep(1) if candles_processed >= self.max_candles:
logger.info(f"Reached max candles limit ({self.max_candles})")
break
# Simulation was stopped candle_time = int(candle.get("time", 0))
self.status = "stopped"
# Get OHLCV data from candle
close_price = float(candle.get("close", 0))
volume = float(candle.get("volume", 0))
if close_price > 0:
# Process candle
await self._process_candle(close_price, volume, candle_time)
# Update last close for next iteration
self.last_close = close_price
self.last_volume = volume
# Track last processed time
self.last_processed_time = candle_time
candles_processed += 1
self.status = "completed"
except Exception as e: except Exception as e:
logger.error(f"Simulation error: {e}")
self.status = "failed" self.status = "failed"
self.results = {"error": str(e)} self.results = {"error": str(e)}
self.errors.append(str(e))
self.results = self.results or {} self.results = self.results or {}
self.results["total_signals"] = len(self.signals) self.results["total_signals"] = len(self.signals)
self.results["total_trades"] = len(self.trades)
self.results["total_errors"] = len(self.errors) self.results["total_errors"] = len(self.errors)
self.results["errors"] = self.errors self.results["errors"] = self.errors
self.results["signals"] = self.signals self.results["signals"] = self.signals
self.results["candles_processed"] = candles_processed if self.running else 0
self.results["started_at"] = self.started_at self.results["started_at"] = self.started_at
self.results["ended_at"] = datetime.utcnow() self.results["ended_at"] = datetime.utcnow()
return self.results return self.results
async def _check_conditions( async def _fetch_klines(
self, current_price: float, current_volume: float, price_data: Dict[str, Any] self,
token_id: str,
limit: int = 500
) -> List[Dict[str, Any]]:
"""Fetch klines from AVE API."""
try:
klines = await self.ave_client.get_klines(
token_id,
interval=self.kline_interval,
limit=limit
)
# Sort by time ascending (oldest first)
klines = sorted(klines, key=lambda x: x.get("time", 0))
return klines
except Exception as e:
logger.warning(f"Failed to fetch klines for {token_id}: {e}")
self.errors.append(f"Kline fetch failed: {str(e)}")
return []
async def _process_candle(
self,
close_price: float,
volume: float,
timestamp: int
): ):
timestamp = int(datetime.utcnow().timestamp() * 1000) """Process a single candle - check conditions and risk management."""
# Check risk management first (for open positions)
if self.position > 0 and self.entry_price is not None: if self.position > 0 and self.entry_price is not None:
exit_info = self._check_risk_management(current_price, timestamp) exit_info = self._check_risk_management(close_price, timestamp)
if exit_info: if exit_info:
await self._execute_risk_exit(current_price, timestamp, exit_info) await self._execute_risk_exit(close_price, timestamp, exit_info)
return return # Skip condition check if we just exited
# Check conditions (only if no open position)
if self.position == 0:
for condition in self.conditions: for condition in self.conditions:
if self._check_condition(condition, current_price, current_volume): if self._check_condition(condition, close_price, volume):
await self._execute_actions(current_price, timestamp, condition) await self._execute_actions(close_price, timestamp, condition)
break break
def _check_risk_management( def _check_risk_management(
self, current_price: float, timestamp: int self, current_price: float, timestamp: int
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
"""Check if stop loss or take profit is triggered."""
if self.position <= 0 or self.entry_price is None: if self.position <= 0 or self.entry_price is None:
return None return None
@@ -142,6 +200,7 @@ class SimulateEngine:
async def _execute_risk_exit( async def _execute_risk_exit(
self, price: float, timestamp: int, exit_info: Dict[str, Any] self, price: float, timestamp: int, exit_info: Dict[str, Any]
): ):
"""Execute stop loss or take profit."""
if self.position <= 0: if self.position <= 0:
return return
@@ -180,32 +239,34 @@ class SimulateEngine:
current_price: float, current_price: float,
current_volume: float, current_volume: float,
) -> bool: ) -> bool:
"""Check if a condition is met based on price movement."""
cond_type = condition.get("type", "") cond_type = condition.get("type", "")
threshold = condition.get("threshold", 0) threshold = condition.get("threshold", 0)
price_level = condition.get("price")
direction = condition.get("direction", "above")
if cond_type == "price_drop": if cond_type == "price_drop":
if self.last_price is None or self.last_price <= 0: # Price dropped by threshold % from last close
if self.last_close is None or self.last_close <= 0:
return False return False
drop_pct = ((self.last_price - current_price) / self.last_price) * 100 drop_pct = ((self.last_close - current_price) / self.last_close) * 100
return drop_pct >= threshold return drop_pct >= threshold
elif cond_type == "price_rise": elif cond_type == "price_rise":
if self.last_price is None or self.last_price <= 0: # Price rose by threshold % from last close
if self.last_close is None or self.last_close <= 0:
return False return False
rise_pct = ((current_price - self.last_price) / self.last_price) * 100 rise_pct = ((current_price - self.last_close) / self.last_close) * 100
return rise_pct >= threshold return rise_pct >= threshold
elif cond_type == "volume_spike": elif cond_type == "volume_spike":
# Volume increased significantly
if self.last_volume is None or self.last_volume <= 0: if self.last_volume is None or self.last_volume <= 0:
return False return False
volume_increase = ( volume_increase = ((current_volume - self.last_volume) / self.last_volume) * 100
(current_volume - self.last_volume) / self.last_volume
) * 100
return volume_increase >= threshold return volume_increase >= threshold
elif cond_type == "price_level": elif cond_type == "price_level":
price_level = condition.get("price")
direction = condition.get("direction", "above")
if price_level is None: if price_level is None:
return False return False
if direction == "above": if direction == "above":
@@ -218,6 +279,7 @@ class SimulateEngine:
async def _execute_actions( async def _execute_actions(
self, price: float, timestamp: int, matched_condition: Dict[str, Any] self, price: float, timestamp: int, matched_condition: Dict[str, Any]
): ):
"""Execute buy/sell actions based on matched condition."""
token = matched_condition.get("token", self.token) token = matched_condition.get("token", self.token)
reasoning = f"Condition {matched_condition.get('type')} triggered" reasoning = f"Condition {matched_condition.get('type')} triggered"
@@ -226,18 +288,21 @@ class SimulateEngine:
if action_type == "buy": if action_type == "buy":
amount_percent = action.get("amount_percent", 10) amount_percent = action.get("amount_percent", 10)
amount = self.current_balance * (amount_percent / 100) amount = self.current_balance * (amount_percent / 100)
self.position += amount / price quantity = amount / price
self.position += quantity
self.position_token = token self.position_token = token
self.entry_price = price self.entry_price = price
self.entry_time = timestamp self.entry_time = timestamp
self.current_balance -= amount self.current_balance -= amount
self.trades.append( self.trades.append(
{ {
"type": "buy", "type": "buy",
"token": token, "token": token,
"price": price, "price": price,
"amount": amount, "amount": amount,
"quantity": amount / price, "quantity": quantity,
"timestamp": timestamp, "timestamp": timestamp,
} }
) )
@@ -258,10 +323,12 @@ class SimulateEngine:
self.signals.append(signal) self.signals.append(signal)
def stop(self): def stop(self):
"""Stop the simulation."""
self.running = False self.running = False
self.status = "stopped" self.status = "stopped"
def get_results(self) -> Dict[str, Any]: def get_results(self) -> Dict[str, Any]:
"""Get simulation results."""
return { return {
"id": self.run_id, "id": self.run_id,
"status": self.status, "status": self.status,
@@ -270,4 +337,5 @@ class SimulateEngine:
} }
def get_signals(self) -> List[Dict[str, Any]]: def get_signals(self) -> List[Dict[str, Any]]:
"""Get current signals."""
return self.signals return self.signals

View File

@@ -189,7 +189,7 @@ export const api = {
}, },
simulate: { simulate: {
async start(botId: string, config: { token: string; chain?: string; check_interval: number }): Promise<Simulation> { async start(botId: string, config: { token: string; chain?: string; kline_interval: string }): Promise<Simulation> {
const response = await fetch(`${API_URL}/bots/${botId}/simulate`, { const response = await fetch(`${API_URL}/bots/${botId}/simulate`, {
method: 'POST', method: 'POST',
headers: getAuthHeaders(), headers: getAuthHeaders(),

View File

@@ -9,7 +9,7 @@
let botId = $derived($page.params.id); let botId = $derived($page.params.id);
let tokenName = $state(''); let tokenName = $state('');
let tokenAddress = $state(''); let tokenAddress = $state('');
let intervalSeconds = $state(60); let klineInterval = $state('1m');
let isRunning = $state(false); let isRunning = $state(false);
onMount(async () => { onMount(async () => {
@@ -70,7 +70,7 @@
const simulation = await api.simulate.start(botId, { const simulation = await api.simulate.start(botId, {
token: tokenAddress, token: tokenAddress,
chain: 'bsc', chain: 'bsc',
check_interval: intervalSeconds kline_interval: klineInterval
}); });
setCurrentSimulation(simulation); setCurrentSimulation(simulation);
clearSignals(); clearSignals();
@@ -132,11 +132,12 @@
</div> </div>
</div> </div>
<div class="field"> <div class="field">
<label for="interval">Check Interval</label> <label for="klineInterval">Kline Interval</label>
<select id="interval" bind:value={intervalSeconds} disabled={isRunning}> <select id="klineInterval" bind:value={klineInterval} disabled={isRunning}>
<option value={10}>Every 10 seconds</option> <option value="1m">1 minute</option>
<option value={30}>Every 30 seconds</option> <option value="5m">5 minutes</option>
<option value={60}>Every minute</option> <option value="15m">15 minutes</option>
<option value="1h">1 hour</option>
</select> </select>
</div> </div>
</div> </div>