| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import json
- import logging
- import threading
- from collections import defaultdict
- import websocket
- from hyperliquid.utils.types import Any, Callable, Dict, List, NamedTuple, Optional, Subscription, Tuple, WsMsg
- ActiveSubscription = NamedTuple("ActiveSubscription", [("callback", Callable[[Any], None]), ("subscription_id", int)])
- def subscription_to_identifier(subscription: Subscription) -> str:
- if subscription["type"] == "allMids":
- return "allMids"
- elif subscription["type"] == "l2Book":
- return f'l2Book:{subscription["coin"].lower()}'
- elif subscription["type"] == "trades":
- return f'trades:{subscription["coin"].lower()}'
- elif subscription["type"] == "userEvents":
- return "userEvents"
- elif subscription["type"] == "userFills":
- return f'userFills:{subscription["user"].lower()}'
- elif subscription["type"] == "candle":
- return f'candle:{subscription["coin"].lower()},{subscription["interval"]}'
- elif subscription["type"] == "orderUpdates":
- return "orderUpdates"
- elif subscription["type"] == "userFundings":
- return f'userFundings:{subscription["user"].lower()}'
- elif subscription["type"] == "userNonFundingLedgerUpdates":
- return f'userNonFundingLedgerUpdates:{subscription["user"].lower()}'
- elif subscription["type"] == "webData2":
- return f'webData2:{subscription["user"].lower()}'
- elif subscription["type"] == "bbo":
- return f'bbo:{subscription["coin"].lower()}'
- elif subscription["type"] == "activeAssetCtx":
- return f'activeAssetCtx:{subscription["coin"].lower()}'
- elif subscription["type"] == "activeAssetData":
- return f'activeAssetData:{subscription["coin"].lower()},{subscription["user"].lower()}'
- def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]:
- if ws_msg["channel"] == "pong":
- return "pong"
- elif ws_msg["channel"] == "allMids":
- return "allMids"
- elif ws_msg["channel"] == "l2Book":
- return f'l2Book:{ws_msg["data"]["coin"].lower()}'
- elif ws_msg["channel"] == "trades":
- trades = ws_msg["data"]
- if len(trades) == 0:
- return None
- else:
- return f'trades:{trades[0]["coin"].lower()}'
- elif ws_msg["channel"] == "user":
- return "userEvents"
- elif ws_msg["channel"] == "userFills":
- return f'userFills:{ws_msg["data"]["user"].lower()}'
- elif ws_msg["channel"] == "candle":
- return f'candle:{ws_msg["data"]["s"].lower()},{ws_msg["data"]["i"]}'
- elif ws_msg["channel"] == "orderUpdates":
- return "orderUpdates"
- elif ws_msg["channel"] == "userFundings":
- return f'userFundings:{ws_msg["data"]["user"].lower()}'
- elif ws_msg["channel"] == "userNonFundingLedgerUpdates":
- return f'userNonFundingLedgerUpdates:{ws_msg["data"]["user"].lower()}'
- elif ws_msg["channel"] == "webData2":
- return f'webData2:{ws_msg["data"]["user"].lower()}'
- elif ws_msg["channel"] == "bbo":
- return f'bbo:{ws_msg["data"]["coin"].lower()}'
- elif ws_msg["channel"] == "activeAssetCtx" or ws_msg["channel"] == "activeSpotAssetCtx":
- return f'activeAssetCtx:{ws_msg["data"]["coin"].lower()}'
- elif ws_msg["channel"] == "activeAssetData":
- return f'activeAssetData:{ws_msg["data"]["coin"].lower()},{ws_msg["data"]["user"].lower()}'
- class WebsocketManager(threading.Thread):
- def __init__(self, base_url):
- super().__init__()
- self.subscription_id_counter = 0
- self.ws_ready = False
- self.queued_subscriptions: List[Tuple[Subscription, ActiveSubscription]] = []
- self.active_subscriptions: Dict[str, List[ActiveSubscription]] = defaultdict(list)
- ws_url = "ws" + base_url[len("http") :] + "/ws"
- self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_open=self.on_open)
- self.ping_sender = threading.Thread(target=self.send_ping)
- self.stop_event = threading.Event()
- def run(self):
- self.ping_sender.start()
- self.ws.run_forever()
- def send_ping(self):
- while not self.stop_event.wait(50):
- if not self.ws.keep_running:
- break
- logging.debug("Websocket sending ping")
- self.ws.send(json.dumps({"method": "ping"}))
- logging.debug("Websocket ping sender stopped")
- def stop(self):
- self.stop_event.set()
- self.ws.close()
- if self.ping_sender.is_alive():
- self.ping_sender.join()
- def on_message(self, _ws, message):
- if message == "Websocket connection established.":
- logging.debug(message)
- return
- logging.debug(f"on_message {message}")
- ws_msg: WsMsg = json.loads(message)
- identifier = ws_msg_to_identifier(ws_msg)
- if identifier == "pong":
- logging.debug("Websocket received pong")
- return
- if identifier is None:
- logging.debug("Websocket not handling empty message")
- return
- active_subscriptions = self.active_subscriptions[identifier]
- if len(active_subscriptions) == 0:
- print("Websocket message from an unexpected subscription:", message, identifier)
- else:
- for active_subscription in active_subscriptions:
- active_subscription.callback(ws_msg)
- def on_open(self, _ws):
- logging.debug("on_open")
- self.ws_ready = True
- for subscription, active_subscription in self.queued_subscriptions:
- self.subscribe(subscription, active_subscription.callback, active_subscription.subscription_id)
- def subscribe(
- self, subscription: Subscription, callback: Callable[[Any], None], subscription_id: Optional[int] = None
- ) -> int:
- if subscription_id is None:
- self.subscription_id_counter += 1
- subscription_id = self.subscription_id_counter
- if not self.ws_ready:
- logging.debug("enqueueing subscription")
- self.queued_subscriptions.append((subscription, ActiveSubscription(callback, subscription_id)))
- else:
- logging.debug("subscribing")
- identifier = subscription_to_identifier(subscription)
- if identifier == "userEvents" or identifier == "orderUpdates":
- # TODO: ideally the userEvent and orderUpdates messages would include the user so that we can multiplex
- if len(self.active_subscriptions[identifier]) != 0:
- raise NotImplementedError(f"Cannot subscribe to {identifier} multiple times")
- self.active_subscriptions[identifier].append(ActiveSubscription(callback, subscription_id))
- self.ws.send(json.dumps({"method": "subscribe", "subscription": subscription}))
- return subscription_id
- def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool:
- if not self.ws_ready:
- raise NotImplementedError("Can't unsubscribe before websocket connected")
- identifier = subscription_to_identifier(subscription)
- active_subscriptions = self.active_subscriptions[identifier]
- new_active_subscriptions = [x for x in active_subscriptions if x.subscription_id != subscription_id]
- if len(new_active_subscriptions) == 0:
- self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription}))
- self.active_subscriptions[identifier] = new_active_subscriptions
- return len(active_subscriptions) != len(new_active_subscriptions)
|