from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi import Body from pydantic import BaseModel from typing import Optional from dotenv import load_dotenv import os import asyncio from typing import List, Dict, Any from hyperliquid.info import Info from hyperliquid.exchange import Exchange from hyperliquid.utils import constants from hyperliquid.utils.signing import get_timestamp_ms from eth_account import Account # 导入 Account 用于 wallet 创建 # 加载 .env 文件 load_dotenv() # Testnet 配置 TESTNET_API_URL = constants.TESTNET_API_URL # 初始化全局变量 info = None exchange = None app = FastAPI(title="Hyperliquid DEX Backend") class BuilderApproveRequest(BaseModel): user_address: str max_fee_rate: Optional[int] = 50 # 默认 50 # 添加 CORS 中间件(允许前端访问) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5174", "http://101.32.51.95:5174"], # 前端 URL,根据需要调整 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 启动事件:初始化 SDK @app.on_event("startup") async def startup_event(): global info, exchange info = Info(TESTNET_API_URL, skip_ws=False) # 初始化 Exchange,移除 skip_ws 参数(修正) private_key = os.getenv("PRIVATE_KEY") if private_key: wallet = Account.from_key(private_key) # 创建 LocalAccount exchange = Exchange(wallet, TESTNET_API_URL, account_address=wallet.address) print(f"Exchange 初始化成功,地址: {wallet.address}") else: raise ValueError("PRIVATE_KEY 未设置,请检查 .env 文件") # WebSocket 管理器 class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, data: Dict[str, Any]): for connection in self.active_connections: await connection.send_json(data) manager = ConnectionManager() # 根路径 @app.get("/") def read_root(): return {"message": "Hyperliquid DEX Backend 就绪!"} # 健康检查 @app.get("/health") def health_check(): return { "status": "OK", "builder_address": os.getenv("BUILDER_ADDRESS"), "max_fee_rate": os.getenv("MAX_FEE_RATE") } # 查询价格 @app.get("/prices") def get_prices(): global info if info: mids = info.all_mids() return {"prices": mids} return {"error": "SDK 未初始化"} @app.post("/approve-builder") async def approve_builder(request: BuilderApproveRequest): try: user_address = request.user_address max_fee_rate = request.max_fee_rate if not user_address: return {"error": "user_address 必填"} max_fee_bps = max_fee_rate * 10 result = exchange.approve_builder_fee(os.getenv("BUILDER_ADDRESS"), max_fee_bps) return { "status": "批准成功", "result": result, "fee_rate": max_fee_rate } except Exception as e: return {"error": str(e)} @app.post("/approve-builder33") async def approve_builder(request: BuilderApproveRequest): """ 用户批准 Builder Fee。 Body 示例: {"user_address": "0xUserAddress"} """ try: user_address = request.user_address max_fee_rate = request.max_fee_rate if not user_address: return {"error": "user_address 必填"} # SDK 内置批准 max_fee_bps = max_fee_rate * 10 # bps result = exchange.approve_builder_fee(os.getenv("BUILDER_ADDRESS"), max_fee_bps) return { "status": "批准成功", "result": result, "fee_rate": max_fee_rate, "builder_address": os.getenv("BUILDER_ADDRESS") } except Exception as e: return {"error": str(e)} @app.post("/approve-builder22") async def approve_builder(request: BuilderApproveRequest): """ 用户批准 Builder Fee。 Body 示例: {"user_address": "0xUserAddress", "max_fee_rate": 50} """ try: user_address = request.user_address max_fee_rate = request.max_fee_rate if not user_address: return {"error": "user_address 必填"} # SDK 内置批准 max_fee_bps = max_fee_rate * 10 # bps result = exchange.approve_builder_fee(os.getenv("BUILDER_ADDRESS"), max_fee_bps) return { "status": "批准成功", "result": result, "fee_rate": max_fee_rate, "builder_address": os.getenv("BUILDER_ADDRESS") } except Exception as e: return {"error": str(e)} # 批准 Builder Fee API @app.post("/approve-builder111") async def approve_builder(request: Dict[str, Any] = Body(...)): """ 用户批准 Builder Fee:批准你的地址的最大费率。 Body 示例: {"user_address": "0xUserAddress", "max_fee_rate": 50} """ try: user_address = request.get("user_address") max_fee_rate = request.get("max_fee_rate", int(os.getenv("MAX_FEE_RATE", 50))) if not user_address: return {"error": "user_address 必填"} # SDK 内置批准(注入费率,自动签名) max_fee_bps = max_fee_rate * 10 # 转换为 bps (0.05% = 500 bps) result = exchange.approve_builder_fee(os.getenv("BUILDER_ADDRESS"), max_fee_bps) return { "status": "批准成功", "result": result, "fee_rate": max_fee_rate, "builder_address": os.getenv("BUILDER_ADDRESS") } except Exception as e: return {"error": str(e)} # 下单 API @app.post("/place-order") async def place_order(request: Dict[str, Any] = Body(...)): """ 下单 API:注入 Builder Fee。 Body 示例: { "asset": "BTC", "is_buy": true, "sz": "0.01", "limit_px": 60000, "order_type": "Limit", "reduce_only": false, "user_address": "0xUserAddress" } """ try: order_data = request user_address = order_data.get("user_address") if not user_address: return {"error": "user_address 必填"} # 准备 builder builder = { "address": os.getenv("BUILDER_ADDRESS"), "feeBps": int(os.getenv("MAX_FEE_RATE", 50)) * 10, # bps } # SDK 内置下单(自动签名,注入 builder) result = exchange.order( asset=order_data["asset"], is_buy=order_data["is_buy"], sz=order_data["sz"], limit_px=order_data["limit_px"], order_type=order_data.get("order_type", "Limit"), reduce_only=order_data.get("reduce_only", False), builder=builder ) # 广播结果(WebSocket) await manager.broadcast({"order_result": result}) return {"status": "下单成功", "result": result} except Exception as e: return {"error": str(e)} # 查询 Builder 收益 @app.get("/builder-fees") def get_builder_fees(): """ 查询你的 Builder 费率和估算收益(简化) """ try: # 用 Info 查询用户状态 user_state = info.user_state(os.getenv("BUILDER_ADDRESS")) return { "max_fee_rate": os.getenv("MAX_FEE_RATE"), "user_state": user_state, "estimated_earnings": "下载 fills CSV 查看实际费 (https://stats-data.hyperliquid-testnet.xyz/Testnet/builder_fills/)" } except Exception as e: return {"error": str(e)} # WebSocket 端点:实时推送订单簿和价格 @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: global info if info: data = { "orderbook": info.l2_book("BTC"), # BTC L2 订单簿 "prices": info.all_mids(), "timestamp": get_timestamp_ms(), } await manager.broadcast(data) await asyncio.sleep(5) # 每 5 秒推送 except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: print(f"WebSocket 错误: {e}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)