from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi import Body from dotenv import load_dotenv import os import json import asyncio from typing import List, Dict, Any from hyperliquid.info import Info from hyperliquid.exchange import Exchange from hyperliquid.utils import constants from hyperliquid.utils.signing import get_timestamp_ms from eth_account import Account # 加载 .env 文件 load_dotenv() # Testnet 配置 TESTNET_API_URL = constants.TESTNET_API_URL # 初始化全局变量 info = None exchange = None app = FastAPI(title="Hyperliquid DEX Backend") # 添加 CORS 中间件(允许前端访问) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://101.32.51.95:5173"], # 前端 URL,根据需要调整 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 启动事件:初始化 SDK @app.on_event("startup") async def startup_event(): global info, exchange info = Info(TESTNET_API_URL, skip_ws=False) # 初始化 Exchange,使用你的代理私钥 private_key = os.getenv("PRIVATE_KEY") if private_key: account = Account.from_key(private_key) exchange = Exchange(TESTNET_API_URL, account=account, skip_ws=True) else: raise ValueError("PRIVATE_KEY 未设置,请检查 .env 文件") # WebSocket 管理器 class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, data: Dict[str, Any]): for connection in self.active_connections: await connection.send_json(data) manager = ConnectionManager() # 根路径 @app.get("/") def read_root(): return {"message": "Hyperliquid DEX Backend 就绪!"} # 健康检查 @app.get("/health") def health_check(): return { "status": "OK", "builder_address": os.getenv("BUILDER_ADDRESS"), "max_fee_rate": os.getenv("MAX_FEE_RATE") } # 查询价格 @app.get("/prices") def get_prices(): global info if info: mids = info.all_mids() return {"prices": mids} return {"error": "SDK 未初始化"} # 批准 Builder Fee API @app.post("/approve-builder") async def approve_builder(request: Dict[str, Any] = Body(...)): """ 用户批准 Builder Fee:签名批准你的地址的最大费率。 Body 示例: {"user_address": "0xUserAddress", "max_fee_rate": 50} """ try: user_address = request.get("user_address") max_fee_rate = request.get("max_fee_rate", int(os.getenv("MAX_FEE_RATE", 50))) if not user_address: return {"error": "user_address 必填"} # 构建批准消息 timestamp = get_timestamp_ms() action = { "type": "BuilderApprove", "builder": os.getenv("BUILDER_ADDRESS"), "maxBuilderFeeBps": max_fee_rate * 10, # 转换为 bps (0.05% = 50 * 10 = 500 bps) "nonce": timestamp, } # 签名并提交(实际中,用户签名在前端,后端验证并提交) # 这里简化使用代理账户签名,生产中需用户提供签名 signed_action = exchange.sign_action(action, user_address) result = exchange.builder_approve(signed_action) return { "status": "批准成功", "result": result, "fee_rate": max_fee_rate, "builder_address": os.getenv("BUILDER_ADDRESS") } except Exception as e: return {"error": str(e)} # 下单 API @app.post("/place-order") async def place_order(request: Dict[str, Any] = Body(...)): """ 下单 API:注入 Builder Fee。 Body 示例: { "asset": "BTC", "is_buy": true, "sz": 0.01, "limit_px": 60000, "user_address": "0xUserAddress" } """ try: order_data = request.copy() user_address = order_data.get("user_address") if not user_address: return {"error": "user_address 必填"} # 注入 Builder order_data["builder"] = { "address": os.getenv("BUILDER_ADDRESS"), "fee": int(os.getenv("MAX_FEE_RATE", 50)), } # 签名并提交(简化:实际用户签名在前端) signed_order = exchange.sign_order(order_data, user_address) result = exchange.place_order(signed_order) # 广播结果(WebSocket) await manager.broadcast({"order_result": result}) return {"status": "下单成功", "result": result} except Exception as e: return {"error": str(e)} # 查询 Builder 收益 @app.get("/builder-fees") def get_builder_fees(): """ 查询你的 Builder 费率和估算收益(简化,实际用 SDK 查询 fills) """ try: # 示例:用 Info 查询用户状态(需扩展为 fills 查询) user_state = info.user_state(os.getenv("BUILDER_ADDRESS")) return { "max_fee_rate": os.getenv("MAX_FEE_RATE"), "user_state": user_state, "estimated_earnings": "使用 Hyperliquid Stats 下载 fills CSV 查看实际收益" } except Exception as e: return {"error": str(e)} # WebSocket 端点:实时推送订单簿和价格 @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: global info if info: data = { "orderbook": info.l2_book("BTC"), # 示例:BTC L2 订单簿 "prices": info.all_mids(), "timestamp": get_timestamp_ms(), } await manager.broadcast(data) await asyncio.sleep(5) # 每 5 秒推送一次 except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: print(f"WebSocket 错误: {e}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)