import json import logging import threading from collections import defaultdict import websocket from hyperliquid.utils.types import Any, Callable, Dict, List, NamedTuple, Optional, Subscription, Tuple, WsMsg ActiveSubscription = NamedTuple("ActiveSubscription", [("callback", Callable[[Any], None]), ("subscription_id", int)]) def subscription_to_identifier(subscription: Subscription) -> str: if subscription["type"] == "allMids": return "allMids" elif subscription["type"] == "l2Book": return f'l2Book:{subscription["coin"].lower()}' elif subscription["type"] == "trades": return f'trades:{subscription["coin"].lower()}' elif subscription["type"] == "userEvents": return "userEvents" elif subscription["type"] == "userFills": return f'userFills:{subscription["user"].lower()}' elif subscription["type"] == "candle": return f'candle:{subscription["coin"].lower()},{subscription["interval"]}' elif subscription["type"] == "orderUpdates": return "orderUpdates" elif subscription["type"] == "userFundings": return f'userFundings:{subscription["user"].lower()}' elif subscription["type"] == "userNonFundingLedgerUpdates": return f'userNonFundingLedgerUpdates:{subscription["user"].lower()}' elif subscription["type"] == "webData2": return f'webData2:{subscription["user"].lower()}' elif subscription["type"] == "bbo": return f'bbo:{subscription["coin"].lower()}' elif subscription["type"] == "activeAssetCtx": return f'activeAssetCtx:{subscription["coin"].lower()}' elif subscription["type"] == "activeAssetData": return f'activeAssetData:{subscription["coin"].lower()},{subscription["user"].lower()}' def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]: if ws_msg["channel"] == "pong": return "pong" elif ws_msg["channel"] == "allMids": return "allMids" elif ws_msg["channel"] == "l2Book": return f'l2Book:{ws_msg["data"]["coin"].lower()}' elif ws_msg["channel"] == "trades": trades = ws_msg["data"] if len(trades) == 0: return None else: return f'trades:{trades[0]["coin"].lower()}' elif ws_msg["channel"] == "user": return "userEvents" elif ws_msg["channel"] == "userFills": return f'userFills:{ws_msg["data"]["user"].lower()}' elif ws_msg["channel"] == "candle": return f'candle:{ws_msg["data"]["s"].lower()},{ws_msg["data"]["i"]}' elif ws_msg["channel"] == "orderUpdates": return "orderUpdates" elif ws_msg["channel"] == "userFundings": return f'userFundings:{ws_msg["data"]["user"].lower()}' elif ws_msg["channel"] == "userNonFundingLedgerUpdates": return f'userNonFundingLedgerUpdates:{ws_msg["data"]["user"].lower()}' elif ws_msg["channel"] == "webData2": return f'webData2:{ws_msg["data"]["user"].lower()}' elif ws_msg["channel"] == "bbo": return f'bbo:{ws_msg["data"]["coin"].lower()}' elif ws_msg["channel"] == "activeAssetCtx" or ws_msg["channel"] == "activeSpotAssetCtx": return f'activeAssetCtx:{ws_msg["data"]["coin"].lower()}' elif ws_msg["channel"] == "activeAssetData": return f'activeAssetData:{ws_msg["data"]["coin"].lower()},{ws_msg["data"]["user"].lower()}' class WebsocketManager(threading.Thread): def __init__(self, base_url): super().__init__() self.subscription_id_counter = 0 self.ws_ready = False self.queued_subscriptions: List[Tuple[Subscription, ActiveSubscription]] = [] self.active_subscriptions: Dict[str, List[ActiveSubscription]] = defaultdict(list) ws_url = "ws" + base_url[len("http") :] + "/ws" self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_open=self.on_open) self.ping_sender = threading.Thread(target=self.send_ping) self.stop_event = threading.Event() def run(self): self.ping_sender.start() self.ws.run_forever() def send_ping(self): while not self.stop_event.wait(50): if not self.ws.keep_running: break logging.debug("Websocket sending ping") self.ws.send(json.dumps({"method": "ping"})) logging.debug("Websocket ping sender stopped") def stop(self): self.stop_event.set() self.ws.close() if self.ping_sender.is_alive(): self.ping_sender.join() def on_message(self, _ws, message): if message == "Websocket connection established.": logging.debug(message) return logging.debug(f"on_message {message}") ws_msg: WsMsg = json.loads(message) identifier = ws_msg_to_identifier(ws_msg) if identifier == "pong": logging.debug("Websocket received pong") return if identifier is None: logging.debug("Websocket not handling empty message") return active_subscriptions = self.active_subscriptions[identifier] if len(active_subscriptions) == 0: print("Websocket message from an unexpected subscription:", message, identifier) else: for active_subscription in active_subscriptions: active_subscription.callback(ws_msg) def on_open(self, _ws): logging.debug("on_open") self.ws_ready = True for subscription, active_subscription in self.queued_subscriptions: self.subscribe(subscription, active_subscription.callback, active_subscription.subscription_id) def subscribe( self, subscription: Subscription, callback: Callable[[Any], None], subscription_id: Optional[int] = None ) -> int: if subscription_id is None: self.subscription_id_counter += 1 subscription_id = self.subscription_id_counter if not self.ws_ready: logging.debug("enqueueing subscription") self.queued_subscriptions.append((subscription, ActiveSubscription(callback, subscription_id))) else: logging.debug("subscribing") identifier = subscription_to_identifier(subscription) if identifier == "userEvents" or identifier == "orderUpdates": # TODO: ideally the userEvent and orderUpdates messages would include the user so that we can multiplex if len(self.active_subscriptions[identifier]) != 0: raise NotImplementedError(f"Cannot subscribe to {identifier} multiple times") self.active_subscriptions[identifier].append(ActiveSubscription(callback, subscription_id)) self.ws.send(json.dumps({"method": "subscribe", "subscription": subscription})) return subscription_id def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool: if not self.ws_ready: raise NotImplementedError("Can't unsubscribe before websocket connected") identifier = subscription_to_identifier(subscription) active_subscriptions = self.active_subscriptions[identifier] new_active_subscriptions = [x for x in active_subscriptions if x.subscription_id != subscription_id] if len(new_active_subscriptions) == 0: self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription})) self.active_subscriptions[identifier] = new_active_subscriptions return len(active_subscriptions) != len(new_active_subscriptions)