text.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from __future__ import annotations
  2. import codecs
  3. import sys
  4. from collections.abc import Callable, Mapping
  5. from dataclasses import InitVar, dataclass, field
  6. from typing import Any
  7. from ..abc import (
  8. AnyByteReceiveStream,
  9. AnyByteSendStream,
  10. AnyByteStream,
  11. AnyByteStreamConnectable,
  12. ObjectReceiveStream,
  13. ObjectSendStream,
  14. ObjectStream,
  15. ObjectStreamConnectable,
  16. )
  17. if sys.version_info >= (3, 12):
  18. from typing import override
  19. else:
  20. from typing_extensions import override
  21. @dataclass(eq=False)
  22. class TextReceiveStream(ObjectReceiveStream[str]):
  23. """
  24. Stream wrapper that decodes bytes to strings using the given encoding.
  25. Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any
  26. completely received unicode characters as soon as they come in.
  27. :param transport_stream: any bytes-based receive stream
  28. :param encoding: character encoding to use for decoding bytes to strings (defaults
  29. to ``utf-8``)
  30. :param errors: handling scheme for decoding errors (defaults to ``strict``; see the
  31. `codecs module documentation`_ for a comprehensive list of options)
  32. .. _codecs module documentation:
  33. https://docs.python.org/3/library/codecs.html#codec-objects
  34. """
  35. transport_stream: AnyByteReceiveStream
  36. encoding: InitVar[str] = "utf-8"
  37. errors: InitVar[str] = "strict"
  38. _decoder: codecs.IncrementalDecoder = field(init=False)
  39. def __post_init__(self, encoding: str, errors: str) -> None:
  40. decoder_class = codecs.getincrementaldecoder(encoding)
  41. self._decoder = decoder_class(errors=errors)
  42. async def receive(self) -> str:
  43. while True:
  44. chunk = await self.transport_stream.receive()
  45. decoded = self._decoder.decode(chunk)
  46. if decoded:
  47. return decoded
  48. async def aclose(self) -> None:
  49. await self.transport_stream.aclose()
  50. self._decoder.reset()
  51. @property
  52. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  53. return self.transport_stream.extra_attributes
  54. @dataclass(eq=False)
  55. class TextSendStream(ObjectSendStream[str]):
  56. """
  57. Sends strings to the wrapped stream as bytes using the given encoding.
  58. :param AnyByteSendStream transport_stream: any bytes-based send stream
  59. :param str encoding: character encoding to use for encoding strings to bytes
  60. (defaults to ``utf-8``)
  61. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see
  62. the `codecs module documentation`_ for a comprehensive list of options)
  63. .. _codecs module documentation:
  64. https://docs.python.org/3/library/codecs.html#codec-objects
  65. """
  66. transport_stream: AnyByteSendStream
  67. encoding: InitVar[str] = "utf-8"
  68. errors: str = "strict"
  69. _encoder: Callable[..., tuple[bytes, int]] = field(init=False)
  70. def __post_init__(self, encoding: str) -> None:
  71. self._encoder = codecs.getencoder(encoding)
  72. async def send(self, item: str) -> None:
  73. encoded = self._encoder(item, self.errors)[0]
  74. await self.transport_stream.send(encoded)
  75. async def aclose(self) -> None:
  76. await self.transport_stream.aclose()
  77. @property
  78. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  79. return self.transport_stream.extra_attributes
  80. @dataclass(eq=False)
  81. class TextStream(ObjectStream[str]):
  82. """
  83. A bidirectional stream that decodes bytes to strings on receive and encodes strings
  84. to bytes on send.
  85. Extra attributes will be provided from both streams, with the receive stream
  86. providing the values in case of a conflict.
  87. :param AnyByteStream transport_stream: any bytes-based stream
  88. :param str encoding: character encoding to use for encoding/decoding strings to/from
  89. bytes (defaults to ``utf-8``)
  90. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see
  91. the `codecs module documentation`_ for a comprehensive list of options)
  92. .. _codecs module documentation:
  93. https://docs.python.org/3/library/codecs.html#codec-objects
  94. """
  95. transport_stream: AnyByteStream
  96. encoding: InitVar[str] = "utf-8"
  97. errors: InitVar[str] = "strict"
  98. _receive_stream: TextReceiveStream = field(init=False)
  99. _send_stream: TextSendStream = field(init=False)
  100. def __post_init__(self, encoding: str, errors: str) -> None:
  101. self._receive_stream = TextReceiveStream(
  102. self.transport_stream, encoding=encoding, errors=errors
  103. )
  104. self._send_stream = TextSendStream(
  105. self.transport_stream, encoding=encoding, errors=errors
  106. )
  107. async def receive(self) -> str:
  108. return await self._receive_stream.receive()
  109. async def send(self, item: str) -> None:
  110. await self._send_stream.send(item)
  111. async def send_eof(self) -> None:
  112. await self.transport_stream.send_eof()
  113. async def aclose(self) -> None:
  114. await self._send_stream.aclose()
  115. await self._receive_stream.aclose()
  116. @property
  117. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  118. return {
  119. **self._send_stream.extra_attributes,
  120. **self._receive_stream.extra_attributes,
  121. }
  122. class TextConnectable(ObjectStreamConnectable[str]):
  123. def __init__(self, connectable: AnyByteStreamConnectable):
  124. """
  125. :param connectable: the bytestream endpoint to wrap
  126. """
  127. self.connectable = connectable
  128. @override
  129. async def connect(self) -> TextStream:
  130. stream = await self.connectable.connect()
  131. return TextStream(stream)