test_socket.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. # -*- coding: utf-8 -*-
  2. import errno
  3. import socket
  4. import unittest
  5. from unittest.mock import Mock, patch, MagicMock
  6. import time
  7. from websocket._socket import recv, recv_line, send, DEFAULT_SOCKET_OPTION
  8. from websocket._ssl_compat import (
  9. SSLError,
  10. SSLEOFError,
  11. SSLWantWriteError,
  12. SSLWantReadError,
  13. )
  14. from websocket._exceptions import (
  15. WebSocketTimeoutException,
  16. WebSocketConnectionClosedException,
  17. )
  18. """
  19. test_socket.py
  20. websocket - WebSocket client library for Python
  21. Copyright 2025 engn33r
  22. Licensed under the Apache License, Version 2.0 (the "License");
  23. you may not use this file except in compliance with the License.
  24. You may obtain a copy of the License at
  25. http://www.apache.org/licenses/LICENSE-2.0
  26. Unless required by applicable law or agreed to in writing, software
  27. distributed under the License is distributed on an "AS IS" BASIS,
  28. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  29. See the License for the specific language governing permissions and
  30. limitations under the License.
  31. """
  32. class SocketTest(unittest.TestCase):
  33. def test_default_socket_option(self):
  34. """Test DEFAULT_SOCKET_OPTION contains expected options"""
  35. self.assertIsInstance(DEFAULT_SOCKET_OPTION, list)
  36. self.assertGreater(len(DEFAULT_SOCKET_OPTION), 0)
  37. # Should contain TCP_NODELAY option
  38. tcp_nodelay_found = any(
  39. opt[1] == socket.TCP_NODELAY for opt in DEFAULT_SOCKET_OPTION
  40. )
  41. self.assertTrue(tcp_nodelay_found)
  42. def test_recv_normal(self):
  43. """Test normal recv operation"""
  44. mock_sock = Mock()
  45. mock_sock.recv.return_value = b"test data"
  46. result = recv(mock_sock, 9)
  47. self.assertEqual(result, b"test data")
  48. mock_sock.recv.assert_called_once_with(9)
  49. def test_recv_timeout_error(self):
  50. """Test recv with TimeoutError"""
  51. mock_sock = Mock()
  52. mock_sock.recv.side_effect = TimeoutError("Connection timed out")
  53. with self.assertRaises(WebSocketTimeoutException) as cm:
  54. recv(mock_sock, 9)
  55. self.assertEqual(str(cm.exception), "Connection timed out")
  56. def test_recv_socket_timeout(self):
  57. """Test recv with socket.timeout"""
  58. mock_sock = Mock()
  59. timeout_exc = socket.timeout("Socket timed out")
  60. timeout_exc.args = ("Socket timed out",)
  61. mock_sock.recv.side_effect = timeout_exc
  62. mock_sock.gettimeout.return_value = 30.0
  63. with self.assertRaises(WebSocketTimeoutException) as cm:
  64. recv(mock_sock, 9)
  65. # In Python 3.10+, socket.timeout is a subclass of TimeoutError
  66. # so it's caught by the TimeoutError handler with hardcoded message
  67. # In Python 3.9, socket.timeout is caught by socket.timeout handler
  68. # which preserves the original message
  69. import sys
  70. if sys.version_info >= (3, 10):
  71. self.assertEqual(str(cm.exception), "Connection timed out")
  72. else:
  73. self.assertEqual(str(cm.exception), "Socket timed out")
  74. def test_recv_ssl_timeout(self):
  75. """Test recv with SSL timeout error"""
  76. mock_sock = Mock()
  77. ssl_exc = SSLError("The operation timed out")
  78. ssl_exc.args = ("The operation timed out",)
  79. mock_sock.recv.side_effect = ssl_exc
  80. with self.assertRaises(WebSocketTimeoutException) as cm:
  81. recv(mock_sock, 9)
  82. self.assertEqual(str(cm.exception), "The operation timed out")
  83. def test_recv_ssl_non_timeout_error(self):
  84. """Test recv with SSL non-timeout error"""
  85. mock_sock = Mock()
  86. ssl_exc = SSLError("SSL certificate error")
  87. ssl_exc.args = ("SSL certificate error",)
  88. mock_sock.recv.side_effect = ssl_exc
  89. # Should re-raise the original SSL error
  90. with self.assertRaises(SSLError):
  91. recv(mock_sock, 9)
  92. def test_recv_empty_response(self):
  93. """Test recv with empty response (connection closed)"""
  94. mock_sock = Mock()
  95. mock_sock.recv.return_value = b""
  96. with self.assertRaises(WebSocketConnectionClosedException) as cm:
  97. recv(mock_sock, 9)
  98. self.assertEqual(str(cm.exception), "Connection to remote host was lost.")
  99. def test_recv_ssl_want_read_error(self):
  100. """Test recv with SSLWantReadError (should retry)"""
  101. mock_sock = Mock()
  102. # First call raises SSLWantReadError, second call succeeds
  103. mock_sock.recv.side_effect = [SSLWantReadError(), b"data after retry"]
  104. with patch("selectors.DefaultSelector") as mock_selector_class:
  105. mock_selector = Mock()
  106. mock_selector_class.return_value = mock_selector
  107. mock_selector.select.return_value = [True] # Ready to read
  108. result = recv(mock_sock, 100)
  109. self.assertEqual(result, b"data after retry")
  110. mock_selector.register.assert_called()
  111. mock_selector.close.assert_called()
  112. def test_recv_ssl_want_read_timeout(self):
  113. """Test recv with SSLWantReadError that times out"""
  114. mock_sock = Mock()
  115. mock_sock.recv.side_effect = SSLWantReadError()
  116. mock_sock.gettimeout.return_value = 1.0
  117. with patch("selectors.DefaultSelector") as mock_selector_class:
  118. mock_selector = Mock()
  119. mock_selector_class.return_value = mock_selector
  120. mock_selector.select.return_value = [] # Timeout
  121. with self.assertRaises(WebSocketTimeoutException):
  122. recv(mock_sock, 100)
  123. def test_recv_line(self):
  124. """Test recv_line functionality"""
  125. mock_sock = Mock()
  126. # Mock recv to return one character at a time
  127. recv_calls = [b"H", b"e", b"l", b"l", b"o", b"\n"]
  128. with patch("websocket._socket.recv", side_effect=recv_calls) as mock_recv:
  129. result = recv_line(mock_sock)
  130. self.assertEqual(result, b"Hello\n")
  131. self.assertEqual(mock_recv.call_count, 6)
  132. def test_send_normal(self):
  133. """Test normal send operation"""
  134. mock_sock = Mock()
  135. mock_sock.send.return_value = 9
  136. mock_sock.gettimeout.return_value = 30.0
  137. result = send(mock_sock, b"test data")
  138. self.assertEqual(result, 9)
  139. mock_sock.send.assert_called_with(b"test data")
  140. def test_send_zero_timeout(self):
  141. """Test send with zero timeout (non-blocking)"""
  142. mock_sock = Mock()
  143. mock_sock.send.return_value = 9
  144. mock_sock.gettimeout.return_value = 0
  145. result = send(mock_sock, b"test data")
  146. self.assertEqual(result, 9)
  147. mock_sock.send.assert_called_once_with(b"test data")
  148. def test_send_ssl_eof_error(self):
  149. """Test send with SSLEOFError"""
  150. mock_sock = Mock()
  151. mock_sock.gettimeout.return_value = 30.0
  152. mock_sock.send.side_effect = SSLEOFError("Connection closed")
  153. with self.assertRaises(WebSocketConnectionClosedException) as cm:
  154. send(mock_sock, b"test data")
  155. self.assertEqual(str(cm.exception), "socket is already closed.")
  156. def test_send_ssl_want_write_error(self):
  157. """Test send with SSLWantWriteError (should retry)"""
  158. mock_sock = Mock()
  159. mock_sock.gettimeout.return_value = 30.0
  160. # First call raises SSLWantWriteError, second call succeeds
  161. mock_sock.send.side_effect = [SSLWantWriteError(), 9]
  162. with patch("selectors.DefaultSelector") as mock_selector_class:
  163. mock_selector = Mock()
  164. mock_selector_class.return_value = mock_selector
  165. mock_selector.select.return_value = [True] # Ready to write
  166. result = send(mock_sock, b"test data")
  167. self.assertEqual(result, 9)
  168. mock_selector.register.assert_called()
  169. mock_selector.close.assert_called()
  170. def test_send_socket_eagain_error(self):
  171. """Test send with EAGAIN error (should retry)"""
  172. mock_sock = Mock()
  173. mock_sock.gettimeout.return_value = 30.0
  174. # Create socket error with EAGAIN
  175. eagain_error = socket.error("Resource temporarily unavailable")
  176. eagain_error.errno = errno.EAGAIN
  177. eagain_error.args = (errno.EAGAIN, "Resource temporarily unavailable")
  178. # First call raises EAGAIN, second call succeeds
  179. mock_sock.send.side_effect = [eagain_error, 9]
  180. with patch("selectors.DefaultSelector") as mock_selector_class:
  181. mock_selector = Mock()
  182. mock_selector_class.return_value = mock_selector
  183. mock_selector.select.return_value = [True] # Ready to write
  184. result = send(mock_sock, b"test data")
  185. self.assertEqual(result, 9)
  186. def test_send_socket_ewouldblock_error(self):
  187. """Test send with EWOULDBLOCK error (should retry)"""
  188. mock_sock = Mock()
  189. mock_sock.gettimeout.return_value = 30.0
  190. # Create socket error with EWOULDBLOCK
  191. ewouldblock_error = socket.error("Operation would block")
  192. ewouldblock_error.errno = errno.EWOULDBLOCK
  193. ewouldblock_error.args = (errno.EWOULDBLOCK, "Operation would block")
  194. # First call raises EWOULDBLOCK, second call succeeds
  195. mock_sock.send.side_effect = [ewouldblock_error, 9]
  196. with patch("selectors.DefaultSelector") as mock_selector_class:
  197. mock_selector = Mock()
  198. mock_selector_class.return_value = mock_selector
  199. mock_selector.select.return_value = [True] # Ready to write
  200. result = send(mock_sock, b"test data")
  201. self.assertEqual(result, 9)
  202. def test_send_socket_other_error(self):
  203. """Test send with other socket error (should raise)"""
  204. mock_sock = Mock()
  205. mock_sock.gettimeout.return_value = 30.0
  206. # Create socket error with different errno
  207. other_error = socket.error("Connection reset by peer")
  208. other_error.errno = errno.ECONNRESET
  209. other_error.args = (errno.ECONNRESET, "Connection reset by peer")
  210. mock_sock.send.side_effect = other_error
  211. with self.assertRaises(socket.error):
  212. send(mock_sock, b"test data")
  213. def test_send_socket_error_no_errno(self):
  214. """Test send with socket error that has no errno"""
  215. mock_sock = Mock()
  216. mock_sock.gettimeout.return_value = 30.0
  217. # Create socket error without errno attribute
  218. no_errno_error = socket.error("Generic socket error")
  219. no_errno_error.args = ("Generic socket error",)
  220. mock_sock.send.side_effect = no_errno_error
  221. with self.assertRaises(socket.error):
  222. send(mock_sock, b"test data")
  223. def test_send_write_timeout(self):
  224. """Test send write operation timeout"""
  225. mock_sock = Mock()
  226. mock_sock.gettimeout.return_value = 30.0
  227. # First call raises EAGAIN
  228. eagain_error = socket.error("Resource temporarily unavailable")
  229. eagain_error.errno = errno.EAGAIN
  230. eagain_error.args = (errno.EAGAIN, "Resource temporarily unavailable")
  231. mock_sock.send.side_effect = eagain_error
  232. with patch("selectors.DefaultSelector") as mock_selector_class:
  233. mock_selector = Mock()
  234. mock_selector_class.return_value = mock_selector
  235. mock_selector.select.return_value = [] # Timeout - nothing ready
  236. result = send(mock_sock, b"test data")
  237. # Should return 0 when write times out
  238. self.assertEqual(result, 0)
  239. def test_send_string_data(self):
  240. """Test send with string data (should be encoded)"""
  241. mock_sock = Mock()
  242. mock_sock.send.return_value = 9
  243. mock_sock.gettimeout.return_value = 30.0
  244. result = send(mock_sock, "test data")
  245. self.assertEqual(result, 9)
  246. mock_sock.send.assert_called_with(b"test data")
  247. def test_send_partial_send_retry(self):
  248. """Test send retry mechanism"""
  249. mock_sock = Mock()
  250. mock_sock.gettimeout.return_value = 30.0
  251. # Create a scenario where send succeeds after selector retry
  252. eagain_error = socket.error("Resource temporarily unavailable")
  253. eagain_error.errno = errno.EAGAIN
  254. eagain_error.args = (errno.EAGAIN, "Resource temporarily unavailable")
  255. # Mock the internal _send function behavior
  256. mock_sock.send.side_effect = [eagain_error, 9]
  257. with patch("selectors.DefaultSelector") as mock_selector_class:
  258. mock_selector = Mock()
  259. mock_selector_class.return_value = mock_selector
  260. mock_selector.select.return_value = [True] # Socket ready for writing
  261. result = send(mock_sock, b"test data")
  262. self.assertEqual(result, 9)
  263. # Verify selector was used for retry mechanism
  264. mock_selector.register.assert_called()
  265. mock_selector.select.assert_called()
  266. mock_selector.close.assert_called()
  267. if __name__ == "__main__":
  268. unittest.main()