websocket_manager.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import json
  2. import logging
  3. import threading
  4. from collections import defaultdict
  5. import websocket
  6. from hyperliquid.utils.types import Any, Callable, Dict, List, NamedTuple, Optional, Subscription, Tuple, WsMsg
  7. ActiveSubscription = NamedTuple("ActiveSubscription", [("callback", Callable[[Any], None]), ("subscription_id", int)])
  8. def subscription_to_identifier(subscription: Subscription) -> str:
  9. if subscription["type"] == "allMids":
  10. return "allMids"
  11. elif subscription["type"] == "l2Book":
  12. return f'l2Book:{subscription["coin"].lower()}'
  13. elif subscription["type"] == "trades":
  14. return f'trades:{subscription["coin"].lower()}'
  15. elif subscription["type"] == "userEvents":
  16. return "userEvents"
  17. elif subscription["type"] == "userFills":
  18. return f'userFills:{subscription["user"].lower()}'
  19. elif subscription["type"] == "candle":
  20. return f'candle:{subscription["coin"].lower()},{subscription["interval"]}'
  21. elif subscription["type"] == "orderUpdates":
  22. return "orderUpdates"
  23. elif subscription["type"] == "userFundings":
  24. return f'userFundings:{subscription["user"].lower()}'
  25. elif subscription["type"] == "userNonFundingLedgerUpdates":
  26. return f'userNonFundingLedgerUpdates:{subscription["user"].lower()}'
  27. elif subscription["type"] == "webData2":
  28. return f'webData2:{subscription["user"].lower()}'
  29. elif subscription["type"] == "bbo":
  30. return f'bbo:{subscription["coin"].lower()}'
  31. elif subscription["type"] == "activeAssetCtx":
  32. return f'activeAssetCtx:{subscription["coin"].lower()}'
  33. elif subscription["type"] == "activeAssetData":
  34. return f'activeAssetData:{subscription["coin"].lower()},{subscription["user"].lower()}'
  35. def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]:
  36. if ws_msg["channel"] == "pong":
  37. return "pong"
  38. elif ws_msg["channel"] == "allMids":
  39. return "allMids"
  40. elif ws_msg["channel"] == "l2Book":
  41. return f'l2Book:{ws_msg["data"]["coin"].lower()}'
  42. elif ws_msg["channel"] == "trades":
  43. trades = ws_msg["data"]
  44. if len(trades) == 0:
  45. return None
  46. else:
  47. return f'trades:{trades[0]["coin"].lower()}'
  48. elif ws_msg["channel"] == "user":
  49. return "userEvents"
  50. elif ws_msg["channel"] == "userFills":
  51. return f'userFills:{ws_msg["data"]["user"].lower()}'
  52. elif ws_msg["channel"] == "candle":
  53. return f'candle:{ws_msg["data"]["s"].lower()},{ws_msg["data"]["i"]}'
  54. elif ws_msg["channel"] == "orderUpdates":
  55. return "orderUpdates"
  56. elif ws_msg["channel"] == "userFundings":
  57. return f'userFundings:{ws_msg["data"]["user"].lower()}'
  58. elif ws_msg["channel"] == "userNonFundingLedgerUpdates":
  59. return f'userNonFundingLedgerUpdates:{ws_msg["data"]["user"].lower()}'
  60. elif ws_msg["channel"] == "webData2":
  61. return f'webData2:{ws_msg["data"]["user"].lower()}'
  62. elif ws_msg["channel"] == "bbo":
  63. return f'bbo:{ws_msg["data"]["coin"].lower()}'
  64. elif ws_msg["channel"] == "activeAssetCtx" or ws_msg["channel"] == "activeSpotAssetCtx":
  65. return f'activeAssetCtx:{ws_msg["data"]["coin"].lower()}'
  66. elif ws_msg["channel"] == "activeAssetData":
  67. return f'activeAssetData:{ws_msg["data"]["coin"].lower()},{ws_msg["data"]["user"].lower()}'
  68. class WebsocketManager(threading.Thread):
  69. def __init__(self, base_url):
  70. super().__init__()
  71. self.subscription_id_counter = 0
  72. self.ws_ready = False
  73. self.queued_subscriptions: List[Tuple[Subscription, ActiveSubscription]] = []
  74. self.active_subscriptions: Dict[str, List[ActiveSubscription]] = defaultdict(list)
  75. ws_url = "ws" + base_url[len("http") :] + "/ws"
  76. self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_open=self.on_open)
  77. self.ping_sender = threading.Thread(target=self.send_ping)
  78. self.stop_event = threading.Event()
  79. def run(self):
  80. self.ping_sender.start()
  81. self.ws.run_forever()
  82. def send_ping(self):
  83. while not self.stop_event.wait(50):
  84. if not self.ws.keep_running:
  85. break
  86. logging.debug("Websocket sending ping")
  87. self.ws.send(json.dumps({"method": "ping"}))
  88. logging.debug("Websocket ping sender stopped")
  89. def stop(self):
  90. self.stop_event.set()
  91. self.ws.close()
  92. if self.ping_sender.is_alive():
  93. self.ping_sender.join()
  94. def on_message(self, _ws, message):
  95. if message == "Websocket connection established.":
  96. logging.debug(message)
  97. return
  98. logging.debug(f"on_message {message}")
  99. ws_msg: WsMsg = json.loads(message)
  100. identifier = ws_msg_to_identifier(ws_msg)
  101. if identifier == "pong":
  102. logging.debug("Websocket received pong")
  103. return
  104. if identifier is None:
  105. logging.debug("Websocket not handling empty message")
  106. return
  107. active_subscriptions = self.active_subscriptions[identifier]
  108. if len(active_subscriptions) == 0:
  109. print("Websocket message from an unexpected subscription:", message, identifier)
  110. else:
  111. for active_subscription in active_subscriptions:
  112. active_subscription.callback(ws_msg)
  113. def on_open(self, _ws):
  114. logging.debug("on_open")
  115. self.ws_ready = True
  116. for subscription, active_subscription in self.queued_subscriptions:
  117. self.subscribe(subscription, active_subscription.callback, active_subscription.subscription_id)
  118. def subscribe(
  119. self, subscription: Subscription, callback: Callable[[Any], None], subscription_id: Optional[int] = None
  120. ) -> int:
  121. if subscription_id is None:
  122. self.subscription_id_counter += 1
  123. subscription_id = self.subscription_id_counter
  124. if not self.ws_ready:
  125. logging.debug("enqueueing subscription")
  126. self.queued_subscriptions.append((subscription, ActiveSubscription(callback, subscription_id)))
  127. else:
  128. logging.debug("subscribing")
  129. identifier = subscription_to_identifier(subscription)
  130. if identifier == "userEvents" or identifier == "orderUpdates":
  131. # TODO: ideally the userEvent and orderUpdates messages would include the user so that we can multiplex
  132. if len(self.active_subscriptions[identifier]) != 0:
  133. raise NotImplementedError(f"Cannot subscribe to {identifier} multiple times")
  134. self.active_subscriptions[identifier].append(ActiveSubscription(callback, subscription_id))
  135. self.ws.send(json.dumps({"method": "subscribe", "subscription": subscription}))
  136. return subscription_id
  137. def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool:
  138. if not self.ws_ready:
  139. raise NotImplementedError("Can't unsubscribe before websocket connected")
  140. identifier = subscription_to_identifier(subscription)
  141. active_subscriptions = self.active_subscriptions[identifier]
  142. new_active_subscriptions = [x for x in active_subscriptions if x.subscription_id != subscription_id]
  143. if len(new_active_subscriptions) == 0:
  144. self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription}))
  145. self.active_subscriptions[identifier] = new_active_subscriptions
  146. return len(active_subscriptions) != len(new_active_subscriptions)