| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- from __future__ import annotations
- import io
- import itertools
- import sys
- import typing
- from .._models import Request, Response
- from .._types import SyncByteStream
- from .base import BaseTransport
- if typing.TYPE_CHECKING:
- from _typeshed import OptExcInfo # pragma: no cover
- from _typeshed.wsgi import WSGIApplication # pragma: no cover
- _T = typing.TypeVar("_T")
- __all__ = ["WSGITransport"]
- def _skip_leading_empty_chunks(body: typing.Iterable[_T]) -> typing.Iterable[_T]:
- body = iter(body)
- for chunk in body:
- if chunk:
- return itertools.chain([chunk], body)
- return []
- class WSGIByteStream(SyncByteStream):
- def __init__(self, result: typing.Iterable[bytes]) -> None:
- self._close = getattr(result, "close", None)
- self._result = _skip_leading_empty_chunks(result)
- def __iter__(self) -> typing.Iterator[bytes]:
- for part in self._result:
- yield part
- def close(self) -> None:
- if self._close is not None:
- self._close()
- class WSGITransport(BaseTransport):
- """
- A custom transport that handles sending requests directly to an WSGI app.
- The simplest way to use this functionality is to use the `app` argument.
- ```
- client = httpx.Client(app=app)
- ```
- Alternatively, you can setup the transport instance explicitly.
- This allows you to include any additional configuration arguments specific
- to the WSGITransport class:
- ```
- transport = httpx.WSGITransport(
- app=app,
- script_name="/submount",
- remote_addr="1.2.3.4"
- )
- client = httpx.Client(transport=transport)
- ```
- Arguments:
- * `app` - The WSGI application.
- * `raise_app_exceptions` - Boolean indicating if exceptions in the application
- should be raised. Default to `True`. Can be set to `False` for use cases
- such as testing the content of a client 500 response.
- * `script_name` - The root path on which the WSGI application should be mounted.
- * `remote_addr` - A string indicating the client IP of incoming requests.
- ```
- """
- def __init__(
- self,
- app: WSGIApplication,
- raise_app_exceptions: bool = True,
- script_name: str = "",
- remote_addr: str = "127.0.0.1",
- wsgi_errors: typing.TextIO | None = None,
- ) -> None:
- self.app = app
- self.raise_app_exceptions = raise_app_exceptions
- self.script_name = script_name
- self.remote_addr = remote_addr
- self.wsgi_errors = wsgi_errors
- def handle_request(self, request: Request) -> Response:
- request.read()
- wsgi_input = io.BytesIO(request.content)
- port = request.url.port or {"http": 80, "https": 443}[request.url.scheme]
- environ = {
- "wsgi.version": (1, 0),
- "wsgi.url_scheme": request.url.scheme,
- "wsgi.input": wsgi_input,
- "wsgi.errors": self.wsgi_errors or sys.stderr,
- "wsgi.multithread": True,
- "wsgi.multiprocess": False,
- "wsgi.run_once": False,
- "REQUEST_METHOD": request.method,
- "SCRIPT_NAME": self.script_name,
- "PATH_INFO": request.url.path,
- "QUERY_STRING": request.url.query.decode("ascii"),
- "SERVER_NAME": request.url.host,
- "SERVER_PORT": str(port),
- "SERVER_PROTOCOL": "HTTP/1.1",
- "REMOTE_ADDR": self.remote_addr,
- }
- for header_key, header_value in request.headers.raw:
- key = header_key.decode("ascii").upper().replace("-", "_")
- if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
- key = "HTTP_" + key
- environ[key] = header_value.decode("ascii")
- seen_status = None
- seen_response_headers = None
- seen_exc_info = None
- def start_response(
- status: str,
- response_headers: list[tuple[str, str]],
- exc_info: OptExcInfo | None = None,
- ) -> typing.Callable[[bytes], typing.Any]:
- nonlocal seen_status, seen_response_headers, seen_exc_info
- seen_status = status
- seen_response_headers = response_headers
- seen_exc_info = exc_info
- return lambda _: None
- result = self.app(environ, start_response)
- stream = WSGIByteStream(result)
- assert seen_status is not None
- assert seen_response_headers is not None
- if seen_exc_info and seen_exc_info[0] and self.raise_app_exceptions:
- raise seen_exc_info[1]
- status_code = int(seen_status.split()[0])
- headers = [
- (key.encode("ascii"), value.encode("ascii"))
- for key, value in seen_response_headers
- ]
- return Response(status_code, headers=headers, stream=stream)
|