basetransport.pyx 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. cdef class UVBaseTransport(UVSocketHandle):
  2. def __cinit__(self):
  3. # Flow control
  4. self._high_water = FLOW_CONTROL_HIGH_WATER * 1024
  5. self._low_water = FLOW_CONTROL_HIGH_WATER // 4
  6. self._protocol = None
  7. self._protocol_connected = 0
  8. self._protocol_paused = 0
  9. self._protocol_data_received = None
  10. self._server = None
  11. self._waiter = None
  12. self._extra_info = None
  13. self._conn_lost = 0
  14. self._closing = 0
  15. cdef size_t _get_write_buffer_size(self):
  16. return 0
  17. cdef inline _schedule_call_connection_made(self):
  18. self._loop._call_soon_handle(
  19. new_MethodHandle(self._loop,
  20. "UVTransport._call_connection_made",
  21. <method_t>self._call_connection_made,
  22. self.context,
  23. self))
  24. cdef inline _schedule_call_connection_lost(self, exc):
  25. self._loop._call_soon_handle(
  26. new_MethodHandle1(self._loop,
  27. "UVTransport._call_connection_lost",
  28. <method1_t>self._call_connection_lost,
  29. self.context,
  30. self, exc))
  31. cdef _fatal_error(self, exc, throw, reason=None):
  32. # Overload UVHandle._fatal_error
  33. self._force_close(exc)
  34. if not isinstance(exc, OSError):
  35. if throw or self._loop is None:
  36. raise exc
  37. msg = f'Fatal error on transport {self.__class__.__name__}'
  38. if reason is not None:
  39. msg = f'{msg} ({reason})'
  40. self._loop.call_exception_handler({
  41. 'message': msg,
  42. 'exception': exc,
  43. 'transport': self,
  44. 'protocol': self._protocol,
  45. })
  46. cdef inline _maybe_pause_protocol(self):
  47. cdef:
  48. size_t size = self._get_write_buffer_size()
  49. if size <= self._high_water:
  50. return
  51. if not self._protocol_paused:
  52. self._protocol_paused = 1
  53. try:
  54. # _maybe_pause_protocol() is always triggered from user-calls,
  55. # so we must copy the context to avoid entering context twice
  56. run_in_context(
  57. self.context.copy(), self._protocol.pause_writing,
  58. )
  59. except (KeyboardInterrupt, SystemExit):
  60. raise
  61. except BaseException as exc:
  62. self._loop.call_exception_handler({
  63. 'message': 'protocol.pause_writing() failed',
  64. 'exception': exc,
  65. 'transport': self,
  66. 'protocol': self._protocol,
  67. })
  68. cdef inline _maybe_resume_protocol(self):
  69. cdef:
  70. size_t size = self._get_write_buffer_size()
  71. if self._protocol_paused and size <= self._low_water:
  72. self._protocol_paused = 0
  73. try:
  74. # We're copying the context to avoid entering context twice,
  75. # even though it's not always necessary to copy - it's easier
  76. # to copy here than passing down a copied context.
  77. run_in_context(
  78. self.context.copy(), self._protocol.resume_writing,
  79. )
  80. except (KeyboardInterrupt, SystemExit):
  81. raise
  82. except BaseException as exc:
  83. self._loop.call_exception_handler({
  84. 'message': 'protocol.resume_writing() failed',
  85. 'exception': exc,
  86. 'transport': self,
  87. 'protocol': self._protocol,
  88. })
  89. cdef _wakeup_waiter(self):
  90. if self._waiter is not None:
  91. if not self._waiter.cancelled():
  92. if not self._is_alive():
  93. self._waiter.set_exception(
  94. RuntimeError(
  95. 'closed Transport handle and unset waiter'))
  96. else:
  97. self._waiter.set_result(True)
  98. self._waiter = None
  99. cdef _call_connection_made(self):
  100. if self._protocol is None:
  101. raise RuntimeError(
  102. 'protocol is not set, cannot call connection_made()')
  103. # We use `_is_alive()` and not `_closing`, because we call
  104. # `transport._close()` in `loop.create_connection()` if an
  105. # exception happens during `await waiter`.
  106. if not self._is_alive():
  107. # A connection waiter can be cancelled between
  108. # 'await loop.create_connection()' and
  109. # `_schedule_call_connection_made` and
  110. # the actual `_call_connection_made`.
  111. self._wakeup_waiter()
  112. return
  113. # Set _protocol_connected to 1 before calling "connection_made":
  114. # if transport is aborted or closed, "connection_lost" will
  115. # still be scheduled.
  116. self._protocol_connected = 1
  117. try:
  118. self._protocol.connection_made(self)
  119. except BaseException:
  120. self._wakeup_waiter()
  121. raise
  122. if not self._is_alive():
  123. # This might happen when "transport.abort()" is called
  124. # from "Protocol.connection_made".
  125. self._wakeup_waiter()
  126. return
  127. self._start_reading()
  128. self._wakeup_waiter()
  129. cdef _call_connection_lost(self, exc):
  130. if self._waiter is not None:
  131. if not self._waiter.done():
  132. self._waiter.set_exception(exc)
  133. self._waiter = None
  134. if self._closed:
  135. # The handle is closed -- likely, _call_connection_lost
  136. # was already called before.
  137. return
  138. try:
  139. if self._protocol_connected:
  140. self._protocol.connection_lost(exc)
  141. finally:
  142. self._clear_protocol()
  143. self._close()
  144. server = self._server
  145. if server is not None:
  146. (<Server>server)._detach()
  147. self._server = None
  148. cdef inline _set_server(self, Server server):
  149. self._server = server
  150. (<Server>server)._attach()
  151. cdef inline _set_waiter(self, object waiter):
  152. if waiter is not None and not isfuture(waiter):
  153. raise TypeError(
  154. f'invalid waiter object {waiter!r}, expected asyncio.Future')
  155. self._waiter = waiter
  156. cdef _set_protocol(self, object protocol):
  157. self._protocol = protocol
  158. # Store a reference to the bound method directly
  159. try:
  160. self._protocol_data_received = protocol.data_received
  161. except AttributeError:
  162. pass
  163. cdef _clear_protocol(self):
  164. self._protocol = None
  165. self._protocol_data_received = None
  166. cdef inline _init_protocol(self):
  167. self._loop._track_transport(self)
  168. if self._protocol is None:
  169. raise RuntimeError('invalid _init_protocol call')
  170. self._schedule_call_connection_made()
  171. cdef inline _add_extra_info(self, str name, object obj):
  172. if self._extra_info is None:
  173. self._extra_info = {}
  174. self._extra_info[name] = obj
  175. cdef bint _is_reading(self):
  176. raise NotImplementedError
  177. cdef _start_reading(self):
  178. raise NotImplementedError
  179. cdef _stop_reading(self):
  180. raise NotImplementedError
  181. # === Public API ===
  182. property _paused:
  183. # Used by SSLProto. Might be removed in the future.
  184. def __get__(self):
  185. return bool(not self._is_reading())
  186. def get_protocol(self):
  187. return self._protocol
  188. def set_protocol(self, protocol):
  189. self._set_protocol(protocol)
  190. if self._is_reading():
  191. self._stop_reading()
  192. self._start_reading()
  193. def _force_close(self, exc):
  194. # Used by SSLProto. Might be removed in the future.
  195. if self._conn_lost or self._closed:
  196. return
  197. if not self._closing:
  198. self._closing = 1
  199. self._stop_reading()
  200. self._conn_lost += 1
  201. self._schedule_call_connection_lost(exc)
  202. def abort(self):
  203. self._force_close(None)
  204. def close(self):
  205. if self._closing or self._closed:
  206. return
  207. self._closing = 1
  208. self._stop_reading()
  209. if not self._get_write_buffer_size():
  210. # The write buffer is empty
  211. self._conn_lost += 1
  212. self._schedule_call_connection_lost(None)
  213. def is_closing(self):
  214. return self._closing
  215. def get_write_buffer_size(self):
  216. return self._get_write_buffer_size()
  217. def set_write_buffer_limits(self, high=None, low=None):
  218. self._ensure_alive()
  219. self._high_water, self._low_water = add_flowcontrol_defaults(
  220. high, low, FLOW_CONTROL_HIGH_WATER)
  221. self._maybe_pause_protocol()
  222. def get_write_buffer_limits(self):
  223. return (self._low_water, self._high_water)
  224. def get_extra_info(self, name, default=None):
  225. if self._extra_info is not None and name in self._extra_info:
  226. return self._extra_info[name]
  227. if name == 'socket':
  228. return self._get_socket()
  229. if name == 'sockname':
  230. return self._get_socket().getsockname()
  231. if name == 'peername':
  232. try:
  233. return self._get_socket().getpeername()
  234. except socket_error:
  235. return default
  236. return default