| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- from __future__ import annotations
- import functools
- import sys
- from collections.abc import Awaitable, Generator
- from contextlib import AbstractAsyncContextManager, contextmanager
- from typing import Any, Callable, Generic, Protocol, TypeVar, overload
- from starlette.types import Scope
- if sys.version_info >= (3, 13): # pragma: no cover
- from inspect import iscoroutinefunction
- from typing import TypeIs
- else: # pragma: no cover
- from asyncio import iscoroutinefunction
- from typing_extensions import TypeIs
- has_exceptiongroups = True
- if sys.version_info < (3, 11): # pragma: no cover
- try:
- from exceptiongroup import BaseExceptionGroup # type: ignore[unused-ignore,import-not-found]
- except ImportError:
- has_exceptiongroups = False
- T = TypeVar("T")
- AwaitableCallable = Callable[..., Awaitable[T]]
- @overload
- def is_async_callable(obj: AwaitableCallable[T]) -> TypeIs[AwaitableCallable[T]]: ...
- @overload
- def is_async_callable(obj: Any) -> TypeIs[AwaitableCallable[Any]]: ...
- def is_async_callable(obj: Any) -> Any:
- while isinstance(obj, functools.partial):
- obj = obj.func
- return iscoroutinefunction(obj) or (callable(obj) and iscoroutinefunction(obj.__call__))
- T_co = TypeVar("T_co", covariant=True)
- class AwaitableOrContextManager(Awaitable[T_co], AbstractAsyncContextManager[T_co], Protocol[T_co]): ...
- class SupportsAsyncClose(Protocol):
- async def close(self) -> None: ... # pragma: no cover
- SupportsAsyncCloseType = TypeVar("SupportsAsyncCloseType", bound=SupportsAsyncClose, covariant=False)
- class AwaitableOrContextManagerWrapper(Generic[SupportsAsyncCloseType]):
- __slots__ = ("aw", "entered")
- def __init__(self, aw: Awaitable[SupportsAsyncCloseType]) -> None:
- self.aw = aw
- def __await__(self) -> Generator[Any, None, SupportsAsyncCloseType]:
- return self.aw.__await__()
- async def __aenter__(self) -> SupportsAsyncCloseType:
- self.entered = await self.aw
- return self.entered
- async def __aexit__(self, *args: Any) -> None | bool:
- await self.entered.close()
- return None
- @contextmanager
- def collapse_excgroups() -> Generator[None, None, None]:
- try:
- yield
- except BaseException as exc:
- if has_exceptiongroups: # pragma: no cover
- while isinstance(exc, BaseExceptionGroup) and len(exc.exceptions) == 1:
- exc = exc.exceptions[0]
- raise exc
- def get_route_path(scope: Scope) -> str:
- path: str = scope["path"]
- root_path = scope.get("root_path", "")
- if not root_path:
- return path
- if not path.startswith(root_path):
- return path
- if path == root_path:
- return ""
- if path[len(root_path)] == "/":
- return path[len(root_path) :]
- return path
|