Files
randebu/src/backend/app/services/ave/client.py

230 lines
7.7 KiB
Python

import httpx
from typing import List, Dict, Any, Optional
from datetime import datetime
class AveCloudClient:
DATA_API_URL = "https://prod.ave-api.com"
TRADING_API_URL = "https://bot-api.ave.ai"
def __init__(self, api_key: str, plan: str = "free"):
self.api_key = api_key
self.plan = plan
def _data_headers(self) -> Dict[str, str]:
return {"X-API-KEY": self.api_key}
def _trading_headers(self) -> Dict[str, str]:
return {"X-API-KEY": self.api_key, "Content-Type": "application/json"}
async def get_tokens(
self,
query: Optional[str] = None,
chain: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/tokens"
params = {"limit": limit}
if query:
params["query"] = query
if chain:
params["chain"] = chain
async with httpx.AsyncClient() as client:
response = await client.get(
url, headers=self._data_headers(), params=params, timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data", [])
raise Exception(f"Failed to fetch tokens: {data}")
async def get_batch_prices(self, token_ids: List[str]) -> Dict[str, Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/tokens/price"
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=self._data_headers(),
json={"token_ids": token_ids},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data", {})
return {}
async def get_token_details(self, token_id: str) -> Optional[Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/tokens/{token_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self._data_headers(), timeout=30.0)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data")
return None
async def get_klines(
self,
token_id: str,
interval: str = "1h",
limit: int = 100,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> List[Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/klines/token/{token_id}"
params = {"interval": interval, "limit": limit}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
async with httpx.AsyncClient() as client:
response = await client.get(
url, headers=self._data_headers(), params=params, timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data", [])
raise Exception(f"Failed to fetch klines: {data}")
async def get_token_price(self, token_id: str) -> Optional[Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/tokens/price"
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=self._data_headers(),
json={"token_ids": [token_id]},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
prices = data.get("data", {})
return prices.get(token_id)
return None
async def get_trending_tokens(
self, chain: Optional[str] = None, limit: int = 20
) -> List[Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/tokens/trending"
params = {"limit": limit}
if chain:
params["chain"] = chain
async with httpx.AsyncClient() as client:
response = await client.get(
url, headers=self._data_headers(), params=params, timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data", [])
raise Exception(f"Failed to fetch trending tokens: {data}")
async def get_token_risk(self, contract_id: str) -> Optional[Dict[str, Any]]:
url = f"{self.DATA_API_URL}/v2/contracts/{contract_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self._data_headers(), timeout=30.0)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data")
return None
async def get_chain_quote(
self,
chain: str,
from_token: str,
to_token: str,
amount: str,
slippage: float = 0.5,
) -> Optional[Dict[str, Any]]:
url = f"{self.TRADING_API_URL}/v1/chain/quote"
payload = {
"chain": chain,
"from_token": from_token,
"to_token": to_token,
"amount": amount,
"slippage": slippage,
}
async with httpx.AsyncClient() as client:
response = await client.post(
url, headers=self._trading_headers(), json=payload, timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data")
return None
async def get_chain_swap(
self,
chain: str,
from_token: str,
to_token: str,
amount: str,
slippage: float = 0.5,
wallet_address: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
url = f"{self.TRADING_API_URL}/v1/chain/swap"
payload = {
"chain": chain,
"from_token": from_token,
"to_token": to_token,
"amount": amount,
"slippage": slippage,
}
if wallet_address:
payload["wallet_address"] = wallet_address
async with httpx.AsyncClient() as client:
response = await client.post(
url, headers=self._trading_headers(), json=payload, timeout=60.0
)
response.raise_for_status()
data = response.json()
if data.get("status") == 200:
return data.get("data")
return None
def check_tier_access(user_tier: str, feature: str) -> tuple[bool, Optional[str]]:
tier_access = {
"free": {
"data_rest": True,
"websocket": False,
"chain_wallet": True,
"proxy_wallet": False,
},
"normal": {
"data_rest": True,
"websocket": False,
"chain_wallet": True,
"proxy_wallet": True,
},
"pro": {
"data_rest": True,
"websocket": True,
"chain_wallet": True,
"proxy_wallet": True,
},
}
if user_tier not in tier_access:
user_tier = "free"
access = tier_access[user_tier]
if access.get(feature):
return True, None
upsell_messages = {
"websocket": "Upgrade to Pro plan to access WebSocket streaming data. Visit your account settings.",
"proxy_wallet": "Upgrade to Normal or Pro plan to access Proxy Wallet functionality. Visit your account settings.",
}
return False, upsell_messages.get(
feature, "Upgrade your plan to access this feature."
)