_sync.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
  2. import selectors
  3. import socket
  4. import ssl
  5. import struct
  6. import threading
  7. import time
  8. import aioquic.h3.connection # type: ignore
  9. import aioquic.h3.events # type: ignore
  10. import aioquic.quic.configuration # type: ignore
  11. import aioquic.quic.connection # type: ignore
  12. import aioquic.quic.events # type: ignore
  13. import dns.exception
  14. import dns.inet
  15. from dns.quic._common import (
  16. QUIC_MAX_DATAGRAM,
  17. BaseQuicConnection,
  18. BaseQuicManager,
  19. BaseQuicStream,
  20. UnexpectedEOF,
  21. )
  22. # Function used to create a socket. Can be overridden if needed in special
  23. # situations.
  24. socket_factory = socket.socket
  25. class SyncQuicStream(BaseQuicStream):
  26. def __init__(self, connection, stream_id):
  27. super().__init__(connection, stream_id)
  28. self._wake_up = threading.Condition()
  29. self._lock = threading.Lock()
  30. def wait_for(self, amount, expiration):
  31. while True:
  32. timeout = self._timeout_from_expiration(expiration)
  33. with self._lock:
  34. if self._buffer.have(amount):
  35. return
  36. self._expecting = amount
  37. with self._wake_up:
  38. if not self._wake_up.wait(timeout):
  39. raise dns.exception.Timeout
  40. self._expecting = 0
  41. def wait_for_end(self, expiration):
  42. while True:
  43. timeout = self._timeout_from_expiration(expiration)
  44. with self._lock:
  45. if self._buffer.seen_end():
  46. return
  47. with self._wake_up:
  48. if not self._wake_up.wait(timeout):
  49. raise dns.exception.Timeout
  50. def receive(self, timeout=None):
  51. expiration = self._expiration_from_timeout(timeout)
  52. if self._connection.is_h3():
  53. self.wait_for_end(expiration)
  54. with self._lock:
  55. return self._buffer.get_all()
  56. else:
  57. self.wait_for(2, expiration)
  58. with self._lock:
  59. (size,) = struct.unpack("!H", self._buffer.get(2))
  60. self.wait_for(size, expiration)
  61. with self._lock:
  62. return self._buffer.get(size)
  63. def send(self, datagram, is_end=False):
  64. data = self._encapsulate(datagram)
  65. self._connection.write(self._stream_id, data, is_end)
  66. def _add_input(self, data, is_end):
  67. if self._common_add_input(data, is_end):
  68. with self._wake_up:
  69. self._wake_up.notify()
  70. def close(self):
  71. with self._lock:
  72. self._close()
  73. def __enter__(self):
  74. return self
  75. def __exit__(self, exc_type, exc_val, exc_tb):
  76. self.close()
  77. with self._wake_up:
  78. self._wake_up.notify()
  79. return False
  80. class SyncQuicConnection(BaseQuicConnection):
  81. def __init__(self, connection, address, port, source, source_port, manager):
  82. super().__init__(connection, address, port, source, source_port, manager)
  83. self._socket = socket_factory(self._af, socket.SOCK_DGRAM, 0)
  84. if self._source is not None:
  85. try:
  86. self._socket.bind(
  87. dns.inet.low_level_address_tuple(self._source, self._af)
  88. )
  89. except Exception:
  90. self._socket.close()
  91. raise
  92. self._socket.connect(self._peer)
  93. (self._send_wakeup, self._receive_wakeup) = socket.socketpair()
  94. self._receive_wakeup.setblocking(False)
  95. self._socket.setblocking(False)
  96. self._handshake_complete = threading.Event()
  97. self._worker_thread = None
  98. self._lock = threading.Lock()
  99. def _read(self):
  100. count = 0
  101. while count < 10:
  102. count += 1
  103. try:
  104. datagram = self._socket.recv(QUIC_MAX_DATAGRAM)
  105. except BlockingIOError:
  106. return
  107. with self._lock:
  108. self._connection.receive_datagram(datagram, self._peer, time.time())
  109. def _drain_wakeup(self):
  110. while True:
  111. try:
  112. self._receive_wakeup.recv(32)
  113. except BlockingIOError:
  114. return
  115. def _worker(self):
  116. try:
  117. with selectors.DefaultSelector() as sel:
  118. sel.register(self._socket, selectors.EVENT_READ, self._read)
  119. sel.register(
  120. self._receive_wakeup, selectors.EVENT_READ, self._drain_wakeup
  121. )
  122. while not self._done:
  123. (expiration, interval) = self._get_timer_values(False)
  124. items = sel.select(interval)
  125. for key, _ in items:
  126. key.data()
  127. with self._lock:
  128. self._handle_timer(expiration)
  129. self._handle_events()
  130. with self._lock:
  131. datagrams = self._connection.datagrams_to_send(time.time())
  132. for datagram, _ in datagrams:
  133. try:
  134. self._socket.send(datagram)
  135. except BlockingIOError:
  136. # we let QUIC handle any lossage
  137. pass
  138. except Exception:
  139. # Eat all exceptions as we have no way to pass them back to the
  140. # caller currently. It might be nice to fix this in the future.
  141. pass
  142. finally:
  143. with self._lock:
  144. self._done = True
  145. self._socket.close()
  146. # Ensure anyone waiting for this gets woken up.
  147. self._handshake_complete.set()
  148. def _handle_events(self):
  149. while True:
  150. with self._lock:
  151. event = self._connection.next_event()
  152. if event is None:
  153. return
  154. if isinstance(event, aioquic.quic.events.StreamDataReceived):
  155. if self.is_h3():
  156. assert self._h3_conn is not None
  157. h3_events = self._h3_conn.handle_event(event)
  158. for h3_event in h3_events:
  159. if isinstance(h3_event, aioquic.h3.events.HeadersReceived):
  160. with self._lock:
  161. stream = self._streams.get(event.stream_id)
  162. if stream:
  163. if stream._headers is None:
  164. stream._headers = h3_event.headers
  165. elif stream._trailers is None:
  166. stream._trailers = h3_event.headers
  167. if h3_event.stream_ended:
  168. stream._add_input(b"", True)
  169. elif isinstance(h3_event, aioquic.h3.events.DataReceived):
  170. with self._lock:
  171. stream = self._streams.get(event.stream_id)
  172. if stream:
  173. stream._add_input(h3_event.data, h3_event.stream_ended)
  174. else:
  175. with self._lock:
  176. stream = self._streams.get(event.stream_id)
  177. if stream:
  178. stream._add_input(event.data, event.end_stream)
  179. elif isinstance(event, aioquic.quic.events.HandshakeCompleted):
  180. self._handshake_complete.set()
  181. elif isinstance(event, aioquic.quic.events.ConnectionTerminated):
  182. with self._lock:
  183. self._done = True
  184. elif isinstance(event, aioquic.quic.events.StreamReset):
  185. with self._lock:
  186. stream = self._streams.get(event.stream_id)
  187. if stream:
  188. stream._add_input(b"", True)
  189. def write(self, stream, data, is_end=False):
  190. with self._lock:
  191. self._connection.send_stream_data(stream, data, is_end)
  192. self._send_wakeup.send(b"\x01")
  193. def send_headers(self, stream_id, headers, is_end=False):
  194. with self._lock:
  195. super().send_headers(stream_id, headers, is_end)
  196. if is_end:
  197. self._send_wakeup.send(b"\x01")
  198. def send_data(self, stream_id, data, is_end=False):
  199. with self._lock:
  200. super().send_data(stream_id, data, is_end)
  201. if is_end:
  202. self._send_wakeup.send(b"\x01")
  203. def run(self):
  204. if self._closed:
  205. return
  206. self._worker_thread = threading.Thread(target=self._worker)
  207. self._worker_thread.start()
  208. def make_stream(self, timeout=None):
  209. if not self._handshake_complete.wait(timeout):
  210. raise dns.exception.Timeout
  211. with self._lock:
  212. if self._done:
  213. raise UnexpectedEOF
  214. stream_id = self._connection.get_next_available_stream_id(False)
  215. stream = SyncQuicStream(self, stream_id)
  216. self._streams[stream_id] = stream
  217. return stream
  218. def close_stream(self, stream_id):
  219. with self._lock:
  220. super().close_stream(stream_id)
  221. def close(self):
  222. with self._lock:
  223. if self._closed:
  224. return
  225. if self._manager is not None:
  226. self._manager.closed(self._peer[0], self._peer[1])
  227. self._closed = True
  228. self._connection.close()
  229. self._send_wakeup.send(b"\x01")
  230. if self._worker_thread is not None:
  231. self._worker_thread.join()
  232. class SyncQuicManager(BaseQuicManager):
  233. def __init__(
  234. self, conf=None, verify_mode=ssl.CERT_REQUIRED, server_name=None, h3=False
  235. ):
  236. super().__init__(conf, verify_mode, SyncQuicConnection, server_name, h3)
  237. self._lock = threading.Lock()
  238. def connect(
  239. self,
  240. address,
  241. port=853,
  242. source=None,
  243. source_port=0,
  244. want_session_ticket=True,
  245. want_token=True,
  246. ):
  247. with self._lock:
  248. (connection, start) = self._connect(
  249. address, port, source, source_port, want_session_ticket, want_token
  250. )
  251. if start:
  252. connection.run()
  253. return connection
  254. def closed(self, address, port):
  255. with self._lock:
  256. super().closed(address, port)
  257. def save_session_ticket(self, address, port, ticket):
  258. with self._lock:
  259. super().save_session_ticket(address, port, ticket)
  260. def save_token(self, address, port, token):
  261. with self._lock:
  262. super().save_token(address, port, token)
  263. def __enter__(self):
  264. return self
  265. def __exit__(self, exc_type, exc_val, exc_tb):
  266. # Copy the iterator into a list as exiting things will mutate the connections
  267. # table.
  268. connections = list(self._connections.values())
  269. for connection in connections:
  270. connection.close()
  271. return False