_dispatcher.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import time
  2. import socket
  3. import inspect
  4. import selectors
  5. from typing import TYPE_CHECKING, Callable, Optional, Union
  6. if TYPE_CHECKING:
  7. from ._app import WebSocketApp
  8. from . import _logging
  9. from ._socket import send
  10. """
  11. _dispatcher.py
  12. websocket - WebSocket client library for Python
  13. Copyright 2025 engn33r
  14. Licensed under the Apache License, Version 2.0 (the "License");
  15. you may not use this file except in compliance with the License.
  16. You may obtain a copy of the License at
  17. http://www.apache.org/licenses/LICENSE-2.0
  18. Unless required by applicable law or agreed to in writing, software
  19. distributed under the License is distributed on an "AS IS" BASIS,
  20. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  21. See the License for the specific language governing permissions and
  22. limitations under the License.
  23. """
  24. class DispatcherBase:
  25. """
  26. DispatcherBase
  27. """
  28. def __init__(
  29. self, app: "WebSocketApp", ping_timeout: Optional[Union[float, int]]
  30. ) -> None:
  31. self.app = app
  32. self.ping_timeout = ping_timeout
  33. def timeout(self, seconds: Optional[Union[float, int]], callback: Callable) -> None:
  34. if seconds is not None:
  35. time.sleep(seconds)
  36. callback()
  37. def reconnect(self, seconds: int, reconnector: Callable) -> None:
  38. try:
  39. _logging.info(
  40. f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]"
  41. )
  42. time.sleep(seconds)
  43. reconnector(reconnecting=True)
  44. except KeyboardInterrupt as e:
  45. _logging.info(f"User exited {e}")
  46. raise e
  47. def send(self, sock: socket.socket, data: Union[str, bytes]) -> int:
  48. return send(sock, data)
  49. class Dispatcher(DispatcherBase):
  50. """
  51. Dispatcher
  52. """
  53. def read(
  54. self,
  55. sock: socket.socket,
  56. read_callback: Callable,
  57. check_callback: Callable,
  58. ) -> None:
  59. if self.app.sock is None or self.app.sock.sock is None:
  60. return
  61. sel = selectors.DefaultSelector()
  62. sel.register(self.app.sock.sock, selectors.EVENT_READ)
  63. try:
  64. while self.app.keep_running:
  65. if sel.select(self.ping_timeout):
  66. if not read_callback():
  67. break
  68. check_callback()
  69. finally:
  70. sel.close()
  71. class SSLDispatcher(DispatcherBase):
  72. """
  73. SSLDispatcher
  74. """
  75. def read(
  76. self,
  77. sock: socket.socket,
  78. read_callback: Callable,
  79. check_callback: Callable,
  80. ) -> None:
  81. if self.app.sock is None or self.app.sock.sock is None:
  82. return
  83. sock = self.app.sock.sock
  84. sel = selectors.DefaultSelector()
  85. sel.register(sock, selectors.EVENT_READ)
  86. try:
  87. while self.app.keep_running:
  88. if self.select(sock, sel):
  89. if not read_callback():
  90. break
  91. check_callback()
  92. finally:
  93. sel.close()
  94. def select(self, sock, sel: selectors.DefaultSelector):
  95. if self.app.sock is None:
  96. return None
  97. sock = self.app.sock.sock
  98. if sock.pending():
  99. return [
  100. sock,
  101. ]
  102. r = sel.select(self.ping_timeout)
  103. if len(r) > 0:
  104. return r[0][0]
  105. return None
  106. class WrappedDispatcher:
  107. """
  108. WrappedDispatcher
  109. """
  110. def __init__(
  111. self,
  112. app: "WebSocketApp",
  113. ping_timeout: Optional[Union[float, int]],
  114. dispatcher,
  115. handleDisconnect,
  116. ) -> None:
  117. self.app = app
  118. self.ping_timeout = ping_timeout
  119. self.dispatcher = dispatcher
  120. self.handleDisconnect = handleDisconnect
  121. dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
  122. def read(
  123. self,
  124. sock: socket.socket,
  125. read_callback: Callable,
  126. check_callback: Callable,
  127. ) -> None:
  128. self.dispatcher.read(sock, read_callback)
  129. if self.ping_timeout:
  130. self.timeout(self.ping_timeout, check_callback)
  131. def send(self, sock: socket.socket, data: Union[str, bytes]) -> int:
  132. self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect)
  133. return len(data)
  134. def timeout(self, seconds: float, callback: Callable, *args) -> None:
  135. self.dispatcher.timeout(seconds, callback, *args)
  136. def reconnect(self, seconds: int, reconnector: Callable) -> None:
  137. self.timeout(seconds, reconnector, True)