| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548 |
- from __future__ import annotations
- import asyncio
- import inspect
- import json
- import logging
- import logging.config
- import os
- import socket
- import ssl
- import sys
- from collections.abc import Awaitable
- from configparser import RawConfigParser
- from pathlib import Path
- from typing import IO, Any, Callable, Literal
- import click
- from uvicorn._types import ASGIApplication
- from uvicorn.importer import ImportFromStringError, import_from_string
- from uvicorn.logging import TRACE_LOG_LEVEL
- from uvicorn.middleware.asgi2 import ASGI2Middleware
- from uvicorn.middleware.message_logger import MessageLoggerMiddleware
- from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
- from uvicorn.middleware.wsgi import WSGIMiddleware
- HTTPProtocolType = Literal["auto", "h11", "httptools"]
- WSProtocolType = Literal["auto", "none", "websockets", "websockets-sansio", "wsproto"]
- LifespanType = Literal["auto", "on", "off"]
- LoopFactoryType = Literal["none", "auto", "asyncio", "uvloop"]
- InterfaceType = Literal["auto", "asgi3", "asgi2", "wsgi"]
- LOG_LEVELS: dict[str, int] = {
- "critical": logging.CRITICAL,
- "error": logging.ERROR,
- "warning": logging.WARNING,
- "info": logging.INFO,
- "debug": logging.DEBUG,
- "trace": TRACE_LOG_LEVEL,
- }
- HTTP_PROTOCOLS: dict[str, str] = {
- "auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol",
- "h11": "uvicorn.protocols.http.h11_impl:H11Protocol",
- "httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol",
- }
- WS_PROTOCOLS: dict[str, str | None] = {
- "auto": "uvicorn.protocols.websockets.auto:AutoWebSocketsProtocol",
- "none": None,
- "websockets": "uvicorn.protocols.websockets.websockets_impl:WebSocketProtocol",
- "websockets-sansio": "uvicorn.protocols.websockets.websockets_sansio_impl:WebSocketsSansIOProtocol",
- "wsproto": "uvicorn.protocols.websockets.wsproto_impl:WSProtocol",
- }
- LIFESPAN: dict[str, str] = {
- "auto": "uvicorn.lifespan.on:LifespanOn",
- "on": "uvicorn.lifespan.on:LifespanOn",
- "off": "uvicorn.lifespan.off:LifespanOff",
- }
- LOOP_FACTORIES: dict[str, str | None] = {
- "none": None,
- "auto": "uvicorn.loops.auto:auto_loop_factory",
- "asyncio": "uvicorn.loops.asyncio:asyncio_loop_factory",
- "uvloop": "uvicorn.loops.uvloop:uvloop_loop_factory",
- }
- INTERFACES: list[InterfaceType] = ["auto", "asgi3", "asgi2", "wsgi"]
- SSL_PROTOCOL_VERSION: int = ssl.PROTOCOL_TLS_SERVER
- LOGGING_CONFIG: dict[str, Any] = {
- "version": 1,
- "disable_existing_loggers": False,
- "formatters": {
- "default": {
- "()": "uvicorn.logging.DefaultFormatter",
- "fmt": "%(levelprefix)s %(message)s",
- "use_colors": None,
- },
- "access": {
- "()": "uvicorn.logging.AccessFormatter",
- "fmt": '%(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s', # noqa: E501
- },
- },
- "handlers": {
- "default": {
- "formatter": "default",
- "class": "logging.StreamHandler",
- "stream": "ext://sys.stderr",
- },
- "access": {
- "formatter": "access",
- "class": "logging.StreamHandler",
- "stream": "ext://sys.stdout",
- },
- },
- "loggers": {
- "uvicorn": {"handlers": ["default"], "level": "INFO", "propagate": False},
- "uvicorn.error": {"level": "INFO"},
- "uvicorn.access": {"handlers": ["access"], "level": "INFO", "propagate": False},
- },
- }
- logger = logging.getLogger("uvicorn.error")
- def create_ssl_context(
- certfile: str | os.PathLike[str],
- keyfile: str | os.PathLike[str] | None,
- password: str | None,
- ssl_version: int,
- cert_reqs: int,
- ca_certs: str | os.PathLike[str] | None,
- ciphers: str | None,
- ) -> ssl.SSLContext:
- ctx = ssl.SSLContext(ssl_version)
- get_password = (lambda: password) if password else None
- ctx.load_cert_chain(certfile, keyfile, get_password)
- ctx.verify_mode = ssl.VerifyMode(cert_reqs)
- if ca_certs:
- ctx.load_verify_locations(ca_certs)
- if ciphers:
- ctx.set_ciphers(ciphers)
- return ctx
- def is_dir(path: Path) -> bool:
- try:
- if not path.is_absolute():
- path = path.resolve()
- return path.is_dir()
- except OSError: # pragma: full coverage
- return False
- def resolve_reload_patterns(patterns_list: list[str], directories_list: list[str]) -> tuple[list[str], list[Path]]:
- directories: list[Path] = list(set(map(Path, directories_list.copy())))
- patterns: list[str] = patterns_list.copy()
- current_working_directory = Path.cwd()
- for pattern in patterns_list:
- # Special case for the .* pattern, otherwise this would only match
- # hidden directories which is probably undesired
- if pattern == ".*":
- continue # pragma: py-not-linux
- patterns.append(pattern)
- if is_dir(Path(pattern)):
- directories.append(Path(pattern))
- else:
- for match in current_working_directory.glob(pattern):
- if is_dir(match):
- directories.append(match)
- directories = list(set(directories))
- directories = list(map(Path, directories))
- directories = list(map(lambda x: x.resolve(), directories))
- directories = list({reload_path for reload_path in directories if is_dir(reload_path)})
- children = []
- for j in range(len(directories)):
- for k in range(j + 1, len(directories)): # pragma: full coverage
- if directories[j] in directories[k].parents:
- children.append(directories[k])
- elif directories[k] in directories[j].parents:
- children.append(directories[j])
- directories = list(set(directories).difference(set(children)))
- return list(set(patterns)), directories
- def _normalize_dirs(dirs: list[str] | str | None) -> list[str]:
- if dirs is None:
- return []
- if isinstance(dirs, str):
- return [dirs]
- return list(set(dirs))
- class Config:
- def __init__(
- self,
- app: ASGIApplication | Callable[..., Any] | str,
- host: str = "127.0.0.1",
- port: int = 8000,
- uds: str | None = None,
- fd: int | None = None,
- loop: LoopFactoryType | str = "auto",
- http: type[asyncio.Protocol] | HTTPProtocolType | str = "auto",
- ws: type[asyncio.Protocol] | WSProtocolType | str = "auto",
- ws_max_size: int = 16 * 1024 * 1024,
- ws_max_queue: int = 32,
- ws_ping_interval: float | None = 20.0,
- ws_ping_timeout: float | None = 20.0,
- ws_per_message_deflate: bool = True,
- lifespan: LifespanType = "auto",
- env_file: str | os.PathLike[str] | None = None,
- log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = LOGGING_CONFIG,
- log_level: str | int | None = None,
- access_log: bool = True,
- use_colors: bool | None = None,
- interface: InterfaceType = "auto",
- reload: bool = False,
- reload_dirs: list[str] | str | None = None,
- reload_delay: float = 0.25,
- reload_includes: list[str] | str | None = None,
- reload_excludes: list[str] | str | None = None,
- workers: int | None = None,
- proxy_headers: bool = True,
- server_header: bool = True,
- date_header: bool = True,
- forwarded_allow_ips: list[str] | str | None = None,
- root_path: str = "",
- limit_concurrency: int | None = None,
- limit_max_requests: int | None = None,
- backlog: int = 2048,
- timeout_keep_alive: int = 5,
- timeout_notify: int = 30,
- timeout_graceful_shutdown: int | None = None,
- timeout_worker_healthcheck: int = 5,
- callback_notify: Callable[..., Awaitable[None]] | None = None,
- ssl_keyfile: str | os.PathLike[str] | None = None,
- ssl_certfile: str | os.PathLike[str] | None = None,
- ssl_keyfile_password: str | None = None,
- ssl_version: int = SSL_PROTOCOL_VERSION,
- ssl_cert_reqs: int = ssl.CERT_NONE,
- ssl_ca_certs: str | os.PathLike[str] | None = None,
- ssl_ciphers: str = "TLSv1",
- headers: list[tuple[str, str]] | None = None,
- factory: bool = False,
- h11_max_incomplete_event_size: int | None = None,
- ):
- self.app = app
- self.host = host
- self.port = port
- self.uds = uds
- self.fd = fd
- self.loop = loop
- self.http = http
- self.ws = ws
- self.ws_max_size = ws_max_size
- self.ws_max_queue = ws_max_queue
- self.ws_ping_interval = ws_ping_interval
- self.ws_ping_timeout = ws_ping_timeout
- self.ws_per_message_deflate = ws_per_message_deflate
- self.lifespan = lifespan
- self.log_config = log_config
- self.log_level = log_level
- self.access_log = access_log
- self.use_colors = use_colors
- self.interface = interface
- self.reload = reload
- self.reload_delay = reload_delay
- self.workers = workers or 1
- self.proxy_headers = proxy_headers
- self.server_header = server_header
- self.date_header = date_header
- self.root_path = root_path
- self.limit_concurrency = limit_concurrency
- self.limit_max_requests = limit_max_requests
- self.backlog = backlog
- self.timeout_keep_alive = timeout_keep_alive
- self.timeout_notify = timeout_notify
- self.timeout_graceful_shutdown = timeout_graceful_shutdown
- self.timeout_worker_healthcheck = timeout_worker_healthcheck
- self.callback_notify = callback_notify
- self.ssl_keyfile = ssl_keyfile
- self.ssl_certfile = ssl_certfile
- self.ssl_keyfile_password = ssl_keyfile_password
- self.ssl_version = ssl_version
- self.ssl_cert_reqs = ssl_cert_reqs
- self.ssl_ca_certs = ssl_ca_certs
- self.ssl_ciphers = ssl_ciphers
- self.headers: list[tuple[str, str]] = headers or []
- self.encoded_headers: list[tuple[bytes, bytes]] = []
- self.factory = factory
- self.h11_max_incomplete_event_size = h11_max_incomplete_event_size
- self.loaded = False
- self.configure_logging()
- self.reload_dirs: list[Path] = []
- self.reload_dirs_excludes: list[Path] = []
- self.reload_includes: list[str] = []
- self.reload_excludes: list[str] = []
- if (reload_dirs or reload_includes or reload_excludes) and not self.should_reload:
- logger.warning(
- "Current configuration will not reload as not all conditions are met, please refer to documentation."
- )
- if self.should_reload:
- reload_dirs = _normalize_dirs(reload_dirs)
- reload_includes = _normalize_dirs(reload_includes)
- reload_excludes = _normalize_dirs(reload_excludes)
- self.reload_includes, self.reload_dirs = resolve_reload_patterns(reload_includes, reload_dirs)
- self.reload_excludes, self.reload_dirs_excludes = resolve_reload_patterns(reload_excludes, [])
- reload_dirs_tmp = self.reload_dirs.copy()
- for directory in self.reload_dirs_excludes:
- for reload_directory in reload_dirs_tmp:
- if directory == reload_directory or directory in reload_directory.parents:
- try:
- self.reload_dirs.remove(reload_directory)
- except ValueError: # pragma: full coverage
- pass
- for pattern in self.reload_excludes:
- if pattern in self.reload_includes:
- self.reload_includes.remove(pattern) # pragma: full coverage
- if not self.reload_dirs:
- if reload_dirs:
- logger.warning(
- "Provided reload directories %s did not contain valid "
- + "directories, watching current working directory.",
- reload_dirs,
- )
- self.reload_dirs = [Path.cwd()]
- logger.info(
- "Will watch for changes in these directories: %s",
- sorted(list(map(str, self.reload_dirs))),
- )
- if env_file is not None:
- from dotenv import load_dotenv
- logger.info("Loading environment from '%s'", env_file)
- load_dotenv(dotenv_path=env_file)
- if workers is None and "WEB_CONCURRENCY" in os.environ:
- self.workers = int(os.environ["WEB_CONCURRENCY"])
- self.forwarded_allow_ips: list[str] | str
- if forwarded_allow_ips is None:
- self.forwarded_allow_ips = os.environ.get("FORWARDED_ALLOW_IPS", "127.0.0.1")
- else:
- self.forwarded_allow_ips = forwarded_allow_ips # pragma: full coverage
- if self.reload and self.workers > 1:
- logger.warning('"workers" flag is ignored when reloading is enabled.')
- @property
- def asgi_version(self) -> Literal["2.0", "3.0"]:
- mapping: dict[str, Literal["2.0", "3.0"]] = {
- "asgi2": "2.0",
- "asgi3": "3.0",
- "wsgi": "3.0",
- }
- return mapping[self.interface]
- @property
- def is_ssl(self) -> bool:
- return bool(self.ssl_keyfile or self.ssl_certfile)
- @property
- def use_subprocess(self) -> bool:
- return bool(self.reload or self.workers > 1)
- def configure_logging(self) -> None:
- logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")
- if self.log_config is not None:
- if isinstance(self.log_config, dict):
- if self.use_colors in (True, False):
- self.log_config["formatters"]["default"]["use_colors"] = self.use_colors
- self.log_config["formatters"]["access"]["use_colors"] = self.use_colors
- logging.config.dictConfig(self.log_config)
- elif isinstance(self.log_config, str) and self.log_config.endswith(".json"):
- with open(self.log_config) as file:
- loaded_config = json.load(file)
- logging.config.dictConfig(loaded_config)
- elif isinstance(self.log_config, str) and self.log_config.endswith((".yaml", ".yml")):
- # Install the PyYAML package or the uvicorn[standard] optional
- # dependencies to enable this functionality.
- import yaml
- with open(self.log_config) as file:
- loaded_config = yaml.safe_load(file)
- logging.config.dictConfig(loaded_config)
- else:
- # See the note about fileConfig() here:
- # https://docs.python.org/3/library/logging.config.html#configuration-file-format
- logging.config.fileConfig(self.log_config, disable_existing_loggers=False)
- if self.log_level is not None:
- if isinstance(self.log_level, str):
- log_level = LOG_LEVELS[self.log_level]
- else:
- log_level = self.log_level
- logging.getLogger("uvicorn.error").setLevel(log_level)
- logging.getLogger("uvicorn.access").setLevel(log_level)
- logging.getLogger("uvicorn.asgi").setLevel(log_level)
- if self.access_log is False:
- logging.getLogger("uvicorn.access").handlers = []
- logging.getLogger("uvicorn.access").propagate = False
- def load(self) -> None:
- assert not self.loaded
- if self.is_ssl:
- assert self.ssl_certfile
- self.ssl: ssl.SSLContext | None = create_ssl_context(
- keyfile=self.ssl_keyfile,
- certfile=self.ssl_certfile,
- password=self.ssl_keyfile_password,
- ssl_version=self.ssl_version,
- cert_reqs=self.ssl_cert_reqs,
- ca_certs=self.ssl_ca_certs,
- ciphers=self.ssl_ciphers,
- )
- else:
- self.ssl = None
- encoded_headers = [(key.lower().encode("latin1"), value.encode("latin1")) for key, value in self.headers]
- self.encoded_headers = (
- [(b"server", b"uvicorn")] + encoded_headers
- if b"server" not in dict(encoded_headers) and self.server_header
- else encoded_headers
- )
- if isinstance(self.http, str):
- http_protocol_class = import_from_string(HTTP_PROTOCOLS.get(self.http, self.http))
- self.http_protocol_class: type[asyncio.Protocol] = http_protocol_class
- else:
- self.http_protocol_class = self.http
- if isinstance(self.ws, str):
- ws_protocol_class = import_from_string(WS_PROTOCOLS.get(self.ws, self.ws))
- self.ws_protocol_class: type[asyncio.Protocol] | None = ws_protocol_class
- else:
- self.ws_protocol_class = self.ws
- self.lifespan_class = import_from_string(LIFESPAN[self.lifespan])
- try:
- self.loaded_app = import_from_string(self.app)
- except ImportFromStringError as exc:
- logger.error("Error loading ASGI app. %s" % exc)
- sys.exit(1)
- try:
- self.loaded_app = self.loaded_app()
- except TypeError as exc:
- if self.factory:
- logger.error("Error loading ASGI app factory: %s", exc)
- sys.exit(1)
- else:
- if not self.factory:
- logger.warning(
- "ASGI app factory detected. Using it, but please consider setting the --factory flag explicitly."
- )
- if self.interface == "auto":
- if inspect.isclass(self.loaded_app):
- use_asgi_3 = hasattr(self.loaded_app, "__await__")
- elif inspect.isfunction(self.loaded_app):
- use_asgi_3 = inspect.iscoroutinefunction(self.loaded_app)
- else:
- call = getattr(self.loaded_app, "__call__", None)
- use_asgi_3 = inspect.iscoroutinefunction(call)
- self.interface = "asgi3" if use_asgi_3 else "asgi2"
- if self.interface == "wsgi":
- self.loaded_app = WSGIMiddleware(self.loaded_app)
- self.ws_protocol_class = None
- elif self.interface == "asgi2":
- self.loaded_app = ASGI2Middleware(self.loaded_app)
- if logger.getEffectiveLevel() <= TRACE_LOG_LEVEL:
- self.loaded_app = MessageLoggerMiddleware(self.loaded_app)
- if self.proxy_headers:
- self.loaded_app = ProxyHeadersMiddleware(self.loaded_app, trusted_hosts=self.forwarded_allow_ips)
- self.loaded = True
- def setup_event_loop(self) -> None:
- raise AttributeError(
- "The `setup_event_loop` method was replaced by `get_loop_factory` in uvicorn 0.36.0.\n"
- "None of those methods are supposed to be used directly. If you are doing it, please let me know here: "
- "https://github.com/Kludex/uvicorn/discussions/2706. Thank you, and sorry for the inconvenience."
- )
- def get_loop_factory(self) -> Callable[[], asyncio.AbstractEventLoop] | None:
- if self.loop in LOOP_FACTORIES:
- loop_factory: Callable[..., Any] | None = import_from_string(LOOP_FACTORIES[self.loop])
- else:
- try:
- return import_from_string(self.loop)
- except ImportFromStringError as exc:
- logger.error("Error loading custom loop setup function. %s" % exc)
- sys.exit(1)
- if loop_factory is None:
- return None
- return loop_factory(use_subprocess=self.use_subprocess)
- def bind_socket(self) -> socket.socket:
- logger_args: list[str | int]
- if self.uds: # pragma: py-win32
- path = self.uds
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- sock.bind(path)
- uds_perms = 0o666
- os.chmod(self.uds, uds_perms)
- except OSError as exc: # pragma: full coverage
- logger.error(exc)
- sys.exit(1)
- message = "Uvicorn running on unix socket %s (Press CTRL+C to quit)"
- sock_name_format = "%s"
- color_message = "Uvicorn running on " + click.style(sock_name_format, bold=True) + " (Press CTRL+C to quit)"
- logger_args = [self.uds]
- elif self.fd: # pragma: py-win32
- sock = socket.fromfd(self.fd, socket.AF_UNIX, socket.SOCK_STREAM)
- message = "Uvicorn running on socket %s (Press CTRL+C to quit)"
- fd_name_format = "%s"
- color_message = "Uvicorn running on " + click.style(fd_name_format, bold=True) + " (Press CTRL+C to quit)"
- logger_args = [sock.getsockname()]
- else:
- family = socket.AF_INET
- addr_format = "%s://%s:%d"
- if self.host and ":" in self.host: # pragma: full coverage
- # It's an IPv6 address.
- family = socket.AF_INET6
- addr_format = "%s://[%s]:%d"
- sock = socket.socket(family=family)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- sock.bind((self.host, self.port))
- except OSError as exc: # pragma: full coverage
- logger.error(exc)
- sys.exit(1)
- message = f"Uvicorn running on {addr_format} (Press CTRL+C to quit)"
- color_message = "Uvicorn running on " + click.style(addr_format, bold=True) + " (Press CTRL+C to quit)"
- protocol_name = "https" if self.is_ssl else "http"
- logger_args = [protocol_name, self.host, sock.getsockname()[1]]
- logger.info(message, *logger_args, extra={"color_message": color_message})
- sock.set_inheritable(True)
- return sock
- @property
- def should_reload(self) -> bool:
- return isinstance(self.app, str) and self.reload
|