From f425ae08d77c3b842d2bae5d3c09bf36875f313b Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Sun, 12 Apr 2026 01:34:20 +0000 Subject: [PATCH] feat: klines-based simulation instead of polling - Fetch historical klines once from AVE API (10 CU per request) - Process each candle as a time step - Limit to 500 candles max per simulation - No continuous polling - processes all data in seconds - Frontend now selects kline interval (1m, 5m, 15m, 1h) - Much more efficient API usage --- src/backend/app/api/simulate.py | 4 +- src/backend/app/db/schemas.py | 2 +- src/backend/app/services/simulate/engine.py | 178 ++++++++++++------ src/frontend/src/lib/api/client.ts | 2 +- .../src/routes/bot/[id]/simulate/+page.svelte | 15 +- 5 files changed, 135 insertions(+), 66 deletions(-) diff --git a/src/backend/app/api/simulate.py b/src/backend/app/api/simulate.py index 61ab7de..c8286e3 100644 --- a/src/backend/app/api/simulate.py +++ b/src/backend/app/api/simulate.py @@ -145,7 +145,7 @@ async def start_simulation( "bot_id": bot_id, "token": config.token, "chain": config.chain, - "check_interval": config.check_interval, + "kline_interval": config.kline_interval, "auto_execute": False, # Always paper trade "strategy_config": bot.strategy_config, "ave_api_key": settings.AVE_API_KEY, @@ -160,7 +160,7 @@ async def start_simulation( config={ "token": config.token, "chain": config.chain, - "check_interval": config.check_interval, + "kline_interval": config.kline_interval, }, signals=[], ) diff --git a/src/backend/app/db/schemas.py b/src/backend/app/db/schemas.py index 9f3f443..d2077be 100644 --- a/src/backend/app/db/schemas.py +++ b/src/backend/app/db/schemas.py @@ -100,7 +100,7 @@ class BacktestResponse(BaseModel): class SimulationCreate(BaseModel): token: str chain: str - check_interval: int = 60 + kline_interval: str = "1m" @field_validator("chain") @classmethod diff --git a/src/backend/app/services/simulate/engine.py b/src/backend/app/services/simulate/engine.py index 0660f45..13a8221 100644 --- a/src/backend/app/services/simulate/engine.py +++ b/src/backend/app/services/simulate/engine.py @@ -3,6 +3,7 @@ import asyncio import logging from datetime import datetime from typing import Dict, Any, List, Optional + from ..ave.client import AveCloudClient logger = logging.getLogger(__name__) @@ -26,22 +27,37 @@ class SimulateEngine: self.risk_management = self.strategy_config.get("risk_management", {}) self.stop_loss_percent = self.risk_management.get("stop_loss_percent") self.take_profit_percent = self.risk_management.get("take_profit_percent") - self.check_interval = config.get("check_interval", 60) - self.duration_seconds = config.get("duration_seconds", 3600) + + # Kline-based settings + self.kline_interval = config.get("kline_interval", "1m") + self.max_candles = config.get("max_candles", 500) # Limit candles to process + self.auto_execute = config.get("auto_execute", False) self.token = config.get("token", "") self.chain = config.get("chain", "bsc") self.running = False self.started_at: Optional[datetime] = None - self.last_price: Optional[float] = None + + # Price tracking (for conditions) + self.last_close: Optional[float] = None self.last_volume: Optional[float] = None + + # Position tracking (for risk management) self.position: float = 0.0 self.position_token: str = "" self.entry_price: Optional[float] = None self.entry_time: Optional[int] = None + + # Portfolio self.current_balance: float = config.get("initial_balance", 10000.0) self.trades: List[Dict[str, Any]] = [] + + # Error tracking self.errors: List[str] = [] + + # Kline data + self.klines: List[Dict[str, Any]] = [] + self.last_processed_time: Optional[int] = None async def run(self) -> Dict[str, Any]: self.running = True @@ -59,71 +75,113 @@ class SimulateEngine: self.results = {"error": "Token ID is required"} return self.results - # Run forever until stopped (no end_time limit) try: - while self.running: - try: - price_data = await self.ave_client.get_token_price(token_id) - if price_data: - current_price = float(price_data.get("price", 0)) - current_volume = float(price_data.get("volume", 0)) - - if current_price > 0: - # Only check conditions if we have a previous price to compare - if self.last_price is not None: - await self._check_conditions( - current_price, current_volume, price_data - ) - # Update last price AFTER checking (so next iteration has comparison data) - self.last_price = current_price - self.last_volume = current_volume - - except Exception as e: - logger.warning(f"Failed to get price for {token_id}: {e}") - self.errors.append(f"Price fetch failed for {token_id}: {str(e)}") - continue - - for _ in range(self.check_interval): - if not self.running: - break - await asyncio.sleep(1) - - # Simulation was stopped - self.status = "stopped" + # Step 1: Fetch klines (only once for simulation) + self.klines = await self._fetch_klines(token_id) + + if not self.klines: + self.status = "failed" + self.results = {"error": "No kline data available"} + return self.results + + logger.info(f"Fetched {len(self.klines)} klines for {token_id}") + + # Step 2: Process candles (with limit) + candles_processed = 0 + for candle in self.klines: + if not self.running: + break + if candles_processed >= self.max_candles: + logger.info(f"Reached max candles limit ({self.max_candles})") + break + + candle_time = int(candle.get("time", 0)) + + # Get OHLCV data from candle + close_price = float(candle.get("close", 0)) + volume = float(candle.get("volume", 0)) + + if close_price > 0: + # Process candle + await self._process_candle(close_price, volume, candle_time) + + # Update last close for next iteration + self.last_close = close_price + self.last_volume = volume + + # Track last processed time + self.last_processed_time = candle_time + + candles_processed += 1 + + self.status = "completed" except Exception as e: + logger.error(f"Simulation error: {e}") self.status = "failed" self.results = {"error": str(e)} + self.errors.append(str(e)) self.results = self.results or {} self.results["total_signals"] = len(self.signals) + self.results["total_trades"] = len(self.trades) self.results["total_errors"] = len(self.errors) self.results["errors"] = self.errors self.results["signals"] = self.signals + self.results["candles_processed"] = candles_processed if self.running else 0 self.results["started_at"] = self.started_at self.results["ended_at"] = datetime.utcnow() return self.results - async def _check_conditions( - self, current_price: float, current_volume: float, price_data: Dict[str, Any] + async def _fetch_klines( + self, + token_id: str, + limit: int = 500 + ) -> List[Dict[str, Any]]: + """Fetch klines from AVE API.""" + try: + klines = await self.ave_client.get_klines( + token_id, + interval=self.kline_interval, + limit=limit + ) + + # Sort by time ascending (oldest first) + klines = sorted(klines, key=lambda x: x.get("time", 0)) + return klines + + except Exception as e: + logger.warning(f"Failed to fetch klines for {token_id}: {e}") + self.errors.append(f"Kline fetch failed: {str(e)}") + return [] + + async def _process_candle( + self, + close_price: float, + volume: float, + timestamp: int ): - timestamp = int(datetime.utcnow().timestamp() * 1000) - + """Process a single candle - check conditions and risk management.""" + + # Check risk management first (for open positions) if self.position > 0 and self.entry_price is not None: - exit_info = self._check_risk_management(current_price, timestamp) + exit_info = self._check_risk_management(close_price, timestamp) if exit_info: - await self._execute_risk_exit(current_price, timestamp, exit_info) - return + await self._execute_risk_exit(close_price, timestamp, exit_info) + return # Skip condition check if we just exited - for condition in self.conditions: - if self._check_condition(condition, current_price, current_volume): - await self._execute_actions(current_price, timestamp, condition) - break + # Check conditions (only if no open position) + if self.position == 0: + for condition in self.conditions: + if self._check_condition(condition, close_price, volume): + await self._execute_actions(close_price, timestamp, condition) + break def _check_risk_management( self, current_price: float, timestamp: int ) -> Optional[Dict[str, Any]]: + """Check if stop loss or take profit is triggered.""" if self.position <= 0 or self.entry_price is None: return None @@ -142,6 +200,7 @@ class SimulateEngine: async def _execute_risk_exit( self, price: float, timestamp: int, exit_info: Dict[str, Any] ): + """Execute stop loss or take profit.""" if self.position <= 0: return @@ -180,32 +239,34 @@ class SimulateEngine: current_price: float, current_volume: float, ) -> bool: + """Check if a condition is met based on price movement.""" cond_type = condition.get("type", "") threshold = condition.get("threshold", 0) - price_level = condition.get("price") - direction = condition.get("direction", "above") if cond_type == "price_drop": - if self.last_price is None or self.last_price <= 0: + # Price dropped by threshold % from last close + if self.last_close is None or self.last_close <= 0: return False - drop_pct = ((self.last_price - current_price) / self.last_price) * 100 + drop_pct = ((self.last_close - current_price) / self.last_close) * 100 return drop_pct >= threshold elif cond_type == "price_rise": - if self.last_price is None or self.last_price <= 0: + # Price rose by threshold % from last close + if self.last_close is None or self.last_close <= 0: return False - rise_pct = ((current_price - self.last_price) / self.last_price) * 100 + rise_pct = ((current_price - self.last_close) / self.last_close) * 100 return rise_pct >= threshold elif cond_type == "volume_spike": + # Volume increased significantly if self.last_volume is None or self.last_volume <= 0: return False - volume_increase = ( - (current_volume - self.last_volume) / self.last_volume - ) * 100 + volume_increase = ((current_volume - self.last_volume) / self.last_volume) * 100 return volume_increase >= threshold elif cond_type == "price_level": + price_level = condition.get("price") + direction = condition.get("direction", "above") if price_level is None: return False if direction == "above": @@ -218,6 +279,7 @@ class SimulateEngine: async def _execute_actions( self, price: float, timestamp: int, matched_condition: Dict[str, Any] ): + """Execute buy/sell actions based on matched condition.""" token = matched_condition.get("token", self.token) reasoning = f"Condition {matched_condition.get('type')} triggered" @@ -226,18 +288,21 @@ class SimulateEngine: if action_type == "buy": amount_percent = action.get("amount_percent", 10) amount = self.current_balance * (amount_percent / 100) - self.position += amount / price + quantity = amount / price + + self.position += quantity self.position_token = token self.entry_price = price self.entry_time = timestamp self.current_balance -= amount + self.trades.append( { "type": "buy", "token": token, "price": price, "amount": amount, - "quantity": amount / price, + "quantity": quantity, "timestamp": timestamp, } ) @@ -258,10 +323,12 @@ class SimulateEngine: self.signals.append(signal) def stop(self): + """Stop the simulation.""" self.running = False self.status = "stopped" def get_results(self) -> Dict[str, Any]: + """Get simulation results.""" return { "id": self.run_id, "status": self.status, @@ -270,4 +337,5 @@ class SimulateEngine: } def get_signals(self) -> List[Dict[str, Any]]: + """Get current signals.""" return self.signals diff --git a/src/frontend/src/lib/api/client.ts b/src/frontend/src/lib/api/client.ts index ae22314..ea13104 100644 --- a/src/frontend/src/lib/api/client.ts +++ b/src/frontend/src/lib/api/client.ts @@ -189,7 +189,7 @@ export const api = { }, simulate: { - async start(botId: string, config: { token: string; chain?: string; check_interval: number }): Promise { + async start(botId: string, config: { token: string; chain?: string; kline_interval: string }): Promise { const response = await fetch(`${API_URL}/bots/${botId}/simulate`, { method: 'POST', headers: getAuthHeaders(), diff --git a/src/frontend/src/routes/bot/[id]/simulate/+page.svelte b/src/frontend/src/routes/bot/[id]/simulate/+page.svelte index 31e8b29..f6fa04b 100644 --- a/src/frontend/src/routes/bot/[id]/simulate/+page.svelte +++ b/src/frontend/src/routes/bot/[id]/simulate/+page.svelte @@ -9,7 +9,7 @@ let botId = $derived($page.params.id); let tokenName = $state(''); let tokenAddress = $state(''); - let intervalSeconds = $state(60); + let klineInterval = $state('1m'); let isRunning = $state(false); onMount(async () => { @@ -70,7 +70,7 @@ const simulation = await api.simulate.start(botId, { token: tokenAddress, chain: 'bsc', - check_interval: intervalSeconds + kline_interval: klineInterval }); setCurrentSimulation(simulation); clearSignals(); @@ -132,11 +132,12 @@
- - + + + +