| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- """
- Custom transports, with nicely configured defaults.
- The following additional keyword arguments are currently supported by httpcore...
- * uds: str
- * local_address: str
- * retries: int
- Example usages...
- # Disable HTTP/2 on a single specific domain.
- mounts = {
- "all://": httpx.HTTPTransport(http2=True),
- "all://*example.org": httpx.HTTPTransport()
- }
- # Using advanced httpcore configuration, with connection retries.
- transport = httpx.HTTPTransport(retries=1)
- client = httpx.Client(transport=transport)
- # Using advanced httpcore configuration, with unix domain sockets.
- transport = httpx.HTTPTransport(uds="socket.uds")
- client = httpx.Client(transport=transport)
- """
- from __future__ import annotations
- import contextlib
- import typing
- from types import TracebackType
- if typing.TYPE_CHECKING:
- import ssl # pragma: no cover
- import httpx # pragma: no cover
- from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
- from .._exceptions import (
- ConnectError,
- ConnectTimeout,
- LocalProtocolError,
- NetworkError,
- PoolTimeout,
- ProtocolError,
- ProxyError,
- ReadError,
- ReadTimeout,
- RemoteProtocolError,
- TimeoutException,
- UnsupportedProtocol,
- WriteError,
- WriteTimeout,
- )
- from .._models import Request, Response
- from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream
- from .._urls import URL
- from .base import AsyncBaseTransport, BaseTransport
- T = typing.TypeVar("T", bound="HTTPTransport")
- A = typing.TypeVar("A", bound="AsyncHTTPTransport")
- SOCKET_OPTION = typing.Union[
- typing.Tuple[int, int, int],
- typing.Tuple[int, int, typing.Union[bytes, bytearray]],
- typing.Tuple[int, int, None, int],
- ]
- __all__ = ["AsyncHTTPTransport", "HTTPTransport"]
- HTTPCORE_EXC_MAP: dict[type[Exception], type[httpx.HTTPError]] = {}
- def _load_httpcore_exceptions() -> dict[type[Exception], type[httpx.HTTPError]]:
- import httpcore
- return {
- httpcore.TimeoutException: TimeoutException,
- httpcore.ConnectTimeout: ConnectTimeout,
- httpcore.ReadTimeout: ReadTimeout,
- httpcore.WriteTimeout: WriteTimeout,
- httpcore.PoolTimeout: PoolTimeout,
- httpcore.NetworkError: NetworkError,
- httpcore.ConnectError: ConnectError,
- httpcore.ReadError: ReadError,
- httpcore.WriteError: WriteError,
- httpcore.ProxyError: ProxyError,
- httpcore.UnsupportedProtocol: UnsupportedProtocol,
- httpcore.ProtocolError: ProtocolError,
- httpcore.LocalProtocolError: LocalProtocolError,
- httpcore.RemoteProtocolError: RemoteProtocolError,
- }
- @contextlib.contextmanager
- def map_httpcore_exceptions() -> typing.Iterator[None]:
- global HTTPCORE_EXC_MAP
- if len(HTTPCORE_EXC_MAP) == 0:
- HTTPCORE_EXC_MAP = _load_httpcore_exceptions()
- try:
- yield
- except Exception as exc:
- mapped_exc = None
- for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
- if not isinstance(exc, from_exc):
- continue
- # We want to map to the most specific exception we can find.
- # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
- # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
- if mapped_exc is None or issubclass(to_exc, mapped_exc):
- mapped_exc = to_exc
- if mapped_exc is None: # pragma: no cover
- raise
- message = str(exc)
- raise mapped_exc(message) from exc
- class ResponseStream(SyncByteStream):
- def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None:
- self._httpcore_stream = httpcore_stream
- def __iter__(self) -> typing.Iterator[bytes]:
- with map_httpcore_exceptions():
- for part in self._httpcore_stream:
- yield part
- def close(self) -> None:
- if hasattr(self._httpcore_stream, "close"):
- self._httpcore_stream.close()
- class HTTPTransport(BaseTransport):
- def __init__(
- self,
- verify: ssl.SSLContext | str | bool = True,
- cert: CertTypes | None = None,
- trust_env: bool = True,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- proxy: ProxyTypes | None = None,
- uds: str | None = None,
- local_address: str | None = None,
- retries: int = 0,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- import httpcore
- proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
- ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
- if proxy is None:
- self._pool = httpcore.ConnectionPool(
- ssl_context=ssl_context,
- max_connections=limits.max_connections,
- max_keepalive_connections=limits.max_keepalive_connections,
- keepalive_expiry=limits.keepalive_expiry,
- http1=http1,
- http2=http2,
- uds=uds,
- local_address=local_address,
- retries=retries,
- socket_options=socket_options,
- )
- elif proxy.url.scheme in ("http", "https"):
- self._pool = httpcore.HTTPProxy(
- proxy_url=httpcore.URL(
- scheme=proxy.url.raw_scheme,
- host=proxy.url.raw_host,
- port=proxy.url.port,
- target=proxy.url.raw_path,
- ),
- proxy_auth=proxy.raw_auth,
- proxy_headers=proxy.headers.raw,
- ssl_context=ssl_context,
- proxy_ssl_context=proxy.ssl_context,
- max_connections=limits.max_connections,
- max_keepalive_connections=limits.max_keepalive_connections,
- keepalive_expiry=limits.keepalive_expiry,
- http1=http1,
- http2=http2,
- socket_options=socket_options,
- )
- elif proxy.url.scheme in ("socks5", "socks5h"):
- try:
- import socksio # noqa
- except ImportError: # pragma: no cover
- raise ImportError(
- "Using SOCKS proxy, but the 'socksio' package is not installed. "
- "Make sure to install httpx using `pip install httpx[socks]`."
- ) from None
- self._pool = httpcore.SOCKSProxy(
- proxy_url=httpcore.URL(
- scheme=proxy.url.raw_scheme,
- host=proxy.url.raw_host,
- port=proxy.url.port,
- target=proxy.url.raw_path,
- ),
- proxy_auth=proxy.raw_auth,
- ssl_context=ssl_context,
- max_connections=limits.max_connections,
- max_keepalive_connections=limits.max_keepalive_connections,
- keepalive_expiry=limits.keepalive_expiry,
- http1=http1,
- http2=http2,
- )
- else: # pragma: no cover
- raise ValueError(
- "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h',"
- f" but got {proxy.url.scheme!r}."
- )
- def __enter__(self: T) -> T: # Use generics for subclass support.
- self._pool.__enter__()
- return self
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: TracebackType | None = None,
- ) -> None:
- with map_httpcore_exceptions():
- self._pool.__exit__(exc_type, exc_value, traceback)
- def handle_request(
- self,
- request: Request,
- ) -> Response:
- assert isinstance(request.stream, SyncByteStream)
- import httpcore
- req = httpcore.Request(
- method=request.method,
- url=httpcore.URL(
- scheme=request.url.raw_scheme,
- host=request.url.raw_host,
- port=request.url.port,
- target=request.url.raw_path,
- ),
- headers=request.headers.raw,
- content=request.stream,
- extensions=request.extensions,
- )
- with map_httpcore_exceptions():
- resp = self._pool.handle_request(req)
- assert isinstance(resp.stream, typing.Iterable)
- return Response(
- status_code=resp.status,
- headers=resp.headers,
- stream=ResponseStream(resp.stream),
- extensions=resp.extensions,
- )
- def close(self) -> None:
- self._pool.close()
- class AsyncResponseStream(AsyncByteStream):
- def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None:
- self._httpcore_stream = httpcore_stream
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- with map_httpcore_exceptions():
- async for part in self._httpcore_stream:
- yield part
- async def aclose(self) -> None:
- if hasattr(self._httpcore_stream, "aclose"):
- await self._httpcore_stream.aclose()
- class AsyncHTTPTransport(AsyncBaseTransport):
- def __init__(
- self,
- verify: ssl.SSLContext | str | bool = True,
- cert: CertTypes | None = None,
- trust_env: bool = True,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- proxy: ProxyTypes | None = None,
- uds: str | None = None,
- local_address: str | None = None,
- retries: int = 0,
- socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
- ) -> None:
- import httpcore
- proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
- ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
- if proxy is None:
- self._pool = httpcore.AsyncConnectionPool(
- ssl_context=ssl_context,
- max_connections=limits.max_connections,
- max_keepalive_connections=limits.max_keepalive_connections,
- keepalive_expiry=limits.keepalive_expiry,
- http1=http1,
- http2=http2,
- uds=uds,
- local_address=local_address,
- retries=retries,
- socket_options=socket_options,
- )
- elif proxy.url.scheme in ("http", "https"):
- self._pool = httpcore.AsyncHTTPProxy(
- proxy_url=httpcore.URL(
- scheme=proxy.url.raw_scheme,
- host=proxy.url.raw_host,
- port=proxy.url.port,
- target=proxy.url.raw_path,
- ),
- proxy_auth=proxy.raw_auth,
- proxy_headers=proxy.headers.raw,
- proxy_ssl_context=proxy.ssl_context,
- ssl_context=ssl_context,
- max_connections=limits.max_connections,
- max_keepalive_connections=limits.max_keepalive_connections,
- keepalive_expiry=limits.keepalive_expiry,
- http1=http1,
- http2=http2,
- socket_options=socket_options,
- )
- elif proxy.url.scheme in ("socks5", "socks5h"):
- try:
- import socksio # noqa
- except ImportError: # pragma: no cover
- raise ImportError(
- "Using SOCKS proxy, but the 'socksio' package is not installed. "
- "Make sure to install httpx using `pip install httpx[socks]`."
- ) from None
- self._pool = httpcore.AsyncSOCKSProxy(
- proxy_url=httpcore.URL(
- scheme=proxy.url.raw_scheme,
- host=proxy.url.raw_host,
- port=proxy.url.port,
- target=proxy.url.raw_path,
- ),
- proxy_auth=proxy.raw_auth,
- ssl_context=ssl_context,
- max_connections=limits.max_connections,
- max_keepalive_connections=limits.max_keepalive_connections,
- keepalive_expiry=limits.keepalive_expiry,
- http1=http1,
- http2=http2,
- )
- else: # pragma: no cover
- raise ValueError(
- "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h',"
- " but got {proxy.url.scheme!r}."
- )
- async def __aenter__(self: A) -> A: # Use generics for subclass support.
- await self._pool.__aenter__()
- return self
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: TracebackType | None = None,
- ) -> None:
- with map_httpcore_exceptions():
- await self._pool.__aexit__(exc_type, exc_value, traceback)
- async def handle_async_request(
- self,
- request: Request,
- ) -> Response:
- assert isinstance(request.stream, AsyncByteStream)
- import httpcore
- req = httpcore.Request(
- method=request.method,
- url=httpcore.URL(
- scheme=request.url.raw_scheme,
- host=request.url.raw_host,
- port=request.url.port,
- target=request.url.raw_path,
- ),
- headers=request.headers.raw,
- content=request.stream,
- extensions=request.extensions,
- )
- with map_httpcore_exceptions():
- resp = await self._pool.handle_async_request(req)
- assert isinstance(resp.stream, typing.AsyncIterable)
- return Response(
- status_code=resp.status,
- headers=resp.headers,
- stream=AsyncResponseStream(resp.stream),
- extensions=resp.extensions,
- )
- async def aclose(self) -> None:
- await self._pool.aclose()
|