| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi import Body
- from dotenv import load_dotenv
- import os
- import json
- import asyncio
- from typing import List, Dict, Any
- from hyperliquid.info import Info
- from hyperliquid.exchange import Exchange
- from hyperliquid.utils import constants
- from hyperliquid.utils.signing import get_timestamp_ms
- from eth_account import Account
- # 加载 .env 文件
- load_dotenv()
- # Testnet 配置
- TESTNET_API_URL = constants.TESTNET_API_URL
- # 初始化全局变量
- info = None
- exchange = None
- app = FastAPI(title="Hyperliquid DEX Backend")
- # 添加 CORS 中间件(允许前端访问)
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["http://localhost:5173", "http://101.32.51.95:5173"], # 前端 URL,根据需要调整
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # 启动事件:初始化 SDK
- @app.on_event("startup")
- async def startup_event():
- global info, exchange
- info = Info(TESTNET_API_URL, skip_ws=False)
- # 初始化 Exchange,使用你的代理私钥
- private_key = os.getenv("PRIVATE_KEY")
- if private_key:
- account = Account.from_key(private_key)
- exchange = Exchange(TESTNET_API_URL, account=account, skip_ws=True)
- else:
- raise ValueError("PRIVATE_KEY 未设置,请检查 .env 文件")
- # WebSocket 管理器
- class ConnectionManager:
- def __init__(self):
- self.active_connections: List[WebSocket] = []
- async def connect(self, websocket: WebSocket):
- await websocket.accept()
- self.active_connections.append(websocket)
- def disconnect(self, websocket: WebSocket):
- self.active_connections.remove(websocket)
- async def broadcast(self, data: Dict[str, Any]):
- for connection in self.active_connections:
- await connection.send_json(data)
- manager = ConnectionManager()
- # 根路径
- @app.get("/")
- def read_root():
- return {"message": "Hyperliquid DEX Backend 就绪!"}
- # 健康检查
- @app.get("/health")
- def health_check():
- return {
- "status": "OK",
- "builder_address": os.getenv("BUILDER_ADDRESS"),
- "max_fee_rate": os.getenv("MAX_FEE_RATE")
- }
- # 查询价格
- @app.get("/prices")
- def get_prices():
- global info
- if info:
- mids = info.all_mids()
- return {"prices": mids}
- return {"error": "SDK 未初始化"}
- # 批准 Builder Fee API
- @app.post("/approve-builder")
- async def approve_builder(request: Dict[str, Any] = Body(...)):
- """
- 用户批准 Builder Fee:签名批准你的地址的最大费率。
- Body 示例: {"user_address": "0xUserAddress", "max_fee_rate": 50}
- """
- try:
- user_address = request.get("user_address")
- max_fee_rate = request.get("max_fee_rate", int(os.getenv("MAX_FEE_RATE", 50)))
- if not user_address:
- return {"error": "user_address 必填"}
- # 构建批准消息
- timestamp = get_timestamp_ms()
- action = {
- "type": "BuilderApprove",
- "builder": os.getenv("BUILDER_ADDRESS"),
- "maxBuilderFeeBps": max_fee_rate * 10, # 转换为 bps (0.05% = 50 * 10 = 500 bps)
- "nonce": timestamp,
- }
- # 签名并提交(实际中,用户签名在前端,后端验证并提交)
- # 这里简化使用代理账户签名,生产中需用户提供签名
- signed_action = exchange.sign_action(action, user_address)
- result = exchange.builder_approve(signed_action)
- return {
- "status": "批准成功",
- "result": result,
- "fee_rate": max_fee_rate,
- "builder_address": os.getenv("BUILDER_ADDRESS")
- }
- except Exception as e:
- return {"error": str(e)}
- # 下单 API
- @app.post("/place-order")
- async def place_order(request: Dict[str, Any] = Body(...)):
- """
- 下单 API:注入 Builder Fee。
- Body 示例: {
- "asset": "BTC",
- "is_buy": true,
- "sz": 0.01,
- "limit_px": 60000,
- "user_address": "0xUserAddress"
- }
- """
- try:
- order_data = request.copy()
- user_address = order_data.get("user_address")
- if not user_address:
- return {"error": "user_address 必填"}
- # 注入 Builder
- order_data["builder"] = {
- "address": os.getenv("BUILDER_ADDRESS"),
- "fee": int(os.getenv("MAX_FEE_RATE", 50)),
- }
- # 签名并提交(简化:实际用户签名在前端)
- signed_order = exchange.sign_order(order_data, user_address)
- result = exchange.place_order(signed_order)
- # 广播结果(WebSocket)
- await manager.broadcast({"order_result": result})
- return {"status": "下单成功", "result": result}
- except Exception as e:
- return {"error": str(e)}
- # 查询 Builder 收益
- @app.get("/builder-fees")
- def get_builder_fees():
- """
- 查询你的 Builder 费率和估算收益(简化,实际用 SDK 查询 fills)
- """
- try:
- # 示例:用 Info 查询用户状态(需扩展为 fills 查询)
- user_state = info.user_state(os.getenv("BUILDER_ADDRESS"))
- return {
- "max_fee_rate": os.getenv("MAX_FEE_RATE"),
- "user_state": user_state,
- "estimated_earnings": "使用 Hyperliquid Stats 下载 fills CSV 查看实际收益"
- }
- except Exception as e:
- return {"error": str(e)}
- # WebSocket 端点:实时推送订单簿和价格
- @app.websocket("/ws")
- async def websocket_endpoint(websocket: WebSocket):
- await manager.connect(websocket)
- try:
- while True:
- global info
- if info:
- data = {
- "orderbook": info.l2_book("BTC"), # 示例:BTC L2 订单簿
- "prices": info.all_mids(),
- "timestamp": get_timestamp_ms(),
- }
- await manager.broadcast(data)
- await asyncio.sleep(5) # 每 5 秒推送一次
- except WebSocketDisconnect:
- manager.disconnect(websocket)
- except Exception as e:
- print(f"WebSocket 错误: {e}")
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
|