| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- @cython.no_gc_clear
- @cython.freelist(DEFAULT_FREELIST_SIZE)
- cdef class _UDPSendContext:
- # used to hold additional write request information for uv_write
- cdef:
- uv.uv_udp_send_t req
- uv.uv_buf_t uv_buf
- Py_buffer py_buf
- UDPTransport udp
- bint closed
- cdef close(self):
- if self.closed:
- return
- self.closed = 1
- PyBuffer_Release(&self.py_buf) # void
- self.req.data = NULL
- self.uv_buf.base = NULL
- Py_DECREF(self)
- self.udp = None
- @staticmethod
- cdef _UDPSendContext new(UDPTransport udp, object data):
- cdef _UDPSendContext ctx
- ctx = _UDPSendContext.__new__(_UDPSendContext)
- ctx.udp = None
- ctx.closed = 1
- ctx.req.data = <void*> ctx
- Py_INCREF(ctx)
- PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE)
- ctx.uv_buf.base = <char*>ctx.py_buf.buf
- ctx.uv_buf.len = ctx.py_buf.len
- ctx.udp = udp
- ctx.closed = 0
- return ctx
- def __dealloc__(self):
- if UVLOOP_DEBUG:
- if not self.closed:
- raise RuntimeError(
- 'open _UDPSendContext is being deallocated')
- self.udp = None
- @cython.no_gc_clear
- cdef class UDPTransport(UVBaseTransport):
- def __cinit__(self):
- self._family = uv.AF_UNSPEC
- self.__receiving = 0
- self._address = None
- self.context = Context_CopyCurrent()
- cdef _init(self, Loop loop, unsigned int family):
- cdef int err
- self._start_init(loop)
- self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_udp_t))
- if self._handle is NULL:
- self._abort_init()
- raise MemoryError()
- err = uv.uv_udp_init_ex(loop.uvloop,
- <uv.uv_udp_t*>self._handle,
- family)
- if err < 0:
- self._abort_init()
- raise convert_error(err)
- if family in (uv.AF_INET, uv.AF_INET6):
- self._family = family
- self._finish_init()
- cdef _set_address(self, system.addrinfo *addr):
- self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr)
- cdef _connect(self, system.sockaddr* addr, size_t addr_len):
- cdef int err
- err = uv.uv_udp_connect(<uv.uv_udp_t*>self._handle, addr)
- if err < 0:
- exc = convert_error(err)
- raise exc
- cdef open(self, int family, int sockfd):
- if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
- self._family = family
- else:
- raise ValueError(
- 'cannot open a UDP handle, invalid family {}'.format(family))
- cdef int err
- err = uv.uv_udp_open(<uv.uv_udp_t*>self._handle,
- <uv.uv_os_sock_t>sockfd)
- if err < 0:
- exc = convert_error(err)
- raise exc
- cdef _bind(self, system.sockaddr* addr):
- cdef:
- int err
- int flags = 0
- self._ensure_alive()
- err = uv.uv_udp_bind(<uv.uv_udp_t*>self._handle, addr, flags)
- if err < 0:
- exc = convert_error(err)
- raise exc
- cdef _set_broadcast(self, bint on):
- cdef int err
- self._ensure_alive()
- err = uv.uv_udp_set_broadcast(<uv.uv_udp_t*>self._handle, on)
- if err < 0:
- exc = convert_error(err)
- raise exc
- cdef size_t _get_write_buffer_size(self):
- if self._handle is NULL:
- return 0
- return (<uv.uv_udp_t*>self._handle).send_queue_size
- cdef bint _is_reading(self):
- return self.__receiving
- cdef _start_reading(self):
- cdef int err
- if self.__receiving:
- return
- self._ensure_alive()
- err = uv.uv_udp_recv_start(<uv.uv_udp_t*>self._handle,
- __loop_alloc_buffer,
- __uv_udp_on_receive)
- if err < 0:
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
- else:
- # UDPTransport must live until the read callback is called
- self.__receiving_started()
- cdef _stop_reading(self):
- cdef int err
- if not self.__receiving:
- return
- self._ensure_alive()
- err = uv.uv_udp_recv_stop(<uv.uv_udp_t*>self._handle)
- if err < 0:
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
- else:
- self.__receiving_stopped()
- cdef inline __receiving_started(self):
- if self.__receiving:
- return
- self.__receiving = 1
- Py_INCREF(self)
- cdef inline __receiving_stopped(self):
- if not self.__receiving:
- return
- self.__receiving = 0
- Py_DECREF(self)
- cdef _new_socket(self):
- if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
- raise RuntimeError(
- 'UDPTransport.family is undefined; '
- 'cannot create python socket')
- fileno = self._fileno()
- return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno)
- cdef _send(self, object data, object addr):
- cdef:
- _UDPSendContext ctx
- system.sockaddr_storage saddr_st
- system.sockaddr *saddr
- Py_buffer try_pybuf
- uv.uv_buf_t try_uvbuf
- self._ensure_alive()
- if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
- raise RuntimeError('UDPTransport.family is undefined; cannot send')
- if addr is None:
- saddr = NULL
- else:
- try:
- __convert_pyaddr_to_sockaddr(self._family, addr,
- <system.sockaddr*>&saddr_st)
- except (ValueError, TypeError):
- raise
- except Exception:
- raise ValueError(
- f'{addr!r}: socket family mismatch or '
- f'a DNS lookup is required')
- saddr = <system.sockaddr*>(&saddr_st)
- if self._get_write_buffer_size() == 0:
- PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE)
- try_uvbuf.base = <char*>try_pybuf.buf
- try_uvbuf.len = try_pybuf.len
- err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle,
- &try_uvbuf,
- 1,
- saddr)
- PyBuffer_Release(&try_pybuf)
- else:
- err = uv.UV_EAGAIN
- if err == uv.UV_EAGAIN:
- ctx = _UDPSendContext.new(self, data)
- err = uv.uv_udp_send(&ctx.req,
- <uv.uv_udp_t*>self._handle,
- &ctx.uv_buf,
- 1,
- saddr,
- __uv_udp_on_send)
- if err < 0:
- ctx.close()
- exc = convert_error(err)
- if isinstance(exc, OSError):
- run_in_context1(self.context.copy(), self._protocol.error_received, exc)
- else:
- self._fatal_error(exc, True)
- else:
- self._maybe_pause_protocol()
- else:
- self._on_sent(convert_error(err) if err < 0 else None, self.context.copy())
- cdef _on_receive(self, bytes data, object exc, object addr):
- if exc is None:
- run_in_context2(
- self.context, self._protocol.datagram_received, data, addr,
- )
- else:
- run_in_context1(self.context, self._protocol.error_received, exc)
- cdef _on_sent(self, object exc, object context=None):
- if exc is not None:
- if isinstance(exc, OSError):
- if context is None:
- context = self.context
- run_in_context1(context, self._protocol.error_received, exc)
- else:
- self._fatal_error(
- exc, False, 'Fatal write error on datagram transport')
- self._maybe_resume_protocol()
- if not self._get_write_buffer_size():
- if self._closing:
- self._schedule_call_connection_lost(None)
- # === Public API ===
- def sendto(self, data, addr=None):
- if not data:
- # Replicating asyncio logic here.
- return
- if self._address:
- if addr not in (None, self._address):
- # Replicating asyncio logic here.
- raise ValueError(
- 'Invalid address: must be None or %s' % (self._address,))
- # Instead of setting addr to self._address below like what asyncio
- # does, we depend on previous uv_udp_connect() to set the address
- addr = None
- if self._conn_lost:
- # Replicating asyncio logic here.
- if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
- aio_logger.warning('socket.send() raised exception.')
- self._conn_lost += 1
- return
- self._send(data, addr)
- cdef void __uv_udp_on_receive(
- uv.uv_udp_t* handle,
- ssize_t nread,
- const uv.uv_buf_t* buf,
- const system.sockaddr* addr,
- unsigned flags
- ) noexcept with gil:
- if __ensure_handle_data(<uv.uv_handle_t*>handle,
- "UDPTransport receive callback") == 0:
- return
- cdef:
- UDPTransport udp = <UDPTransport>handle.data
- Loop loop = udp._loop
- bytes data
- object pyaddr
- # It's OK to free the buffer early, since nothing will
- # be able to touch it until this method is done.
- __loop_free_buffer(loop)
- if udp._closed:
- # The handle was closed, there is no reason to
- # do any work now.
- udp.__receiving_stopped() # Just in case.
- return
- if addr is NULL and nread == 0:
- # From libuv docs:
- # addr: struct sockaddr* containing the address
- # of the sender. Can be NULL. Valid for the duration
- # of the callback only.
- # [...]
- # The receive callback will be called with
- # nread == 0 and addr == NULL when there is
- # nothing to read, and with nread == 0 and
- # addr != NULL when an empty UDP packet is
- # received.
- return
- if addr is NULL:
- pyaddr = None
- elif addr.sa_family == uv.AF_UNSPEC:
- # https://github.com/MagicStack/uvloop/issues/304
- if system.PLATFORM_IS_LINUX:
- pyaddr = None
- else:
- pyaddr = ''
- else:
- try:
- pyaddr = __convert_sockaddr_to_pyaddr(addr)
- except BaseException as exc:
- udp._error(exc, False)
- return
- if nread < 0:
- exc = convert_error(nread)
- udp._on_receive(None, exc, pyaddr)
- return
- if nread == 0:
- data = b''
- else:
- data = loop._recv_buffer[:nread]
- try:
- udp._on_receive(data, None, pyaddr)
- except BaseException as exc:
- udp._error(exc, False)
- cdef void __uv_udp_on_send(
- uv.uv_udp_send_t* req,
- int status,
- ) noexcept with gil:
- if req.data is NULL:
- # Shouldn't happen as:
- # - _UDPSendContext does an extra INCREF in its 'init()'
- # - _UDPSendContext holds a ref to the relevant UDPTransport
- aio_logger.error(
- 'UVStream.write callback called with NULL req.data, status=%r',
- status)
- return
- cdef:
- _UDPSendContext ctx = <_UDPSendContext> req.data
- UDPTransport udp = <UDPTransport>ctx.udp
- ctx.close()
- if status < 0:
- exc = convert_error(status)
- print(exc)
- else:
- exc = None
- try:
- udp._on_sent(exc)
- except BaseException as exc:
- udp._error(exc, False)
|