| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899 |
- from abc import ABC, abstractmethod
- import io
- import os
- import gzip
- import socket
- import ssl
- import time
- import warnings
- from datetime import datetime, timedelta, timezone
- from collections import defaultdict
- from urllib.request import getproxies
- try:
- import brotli # type: ignore
- except ImportError:
- brotli = None
- import urllib3
- import certifi
- import sentry_sdk
- from sentry_sdk.consts import EndpointType
- from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
- from sentry_sdk.worker import BackgroundWorker
- from sentry_sdk.envelope import Envelope, Item, PayloadRef
- from typing import TYPE_CHECKING, cast, List, Dict
- if TYPE_CHECKING:
- from typing import Any
- from typing import Callable
- from typing import DefaultDict
- from typing import Iterable
- from typing import Mapping
- from typing import Optional
- from typing import Self
- from typing import Tuple
- from typing import Type
- from typing import Union
- from urllib3.poolmanager import PoolManager
- from urllib3.poolmanager import ProxyManager
- from sentry_sdk._types import Event, EventDataCategory
- KEEP_ALIVE_SOCKET_OPTIONS = []
- for option in [
- (socket.SOL_SOCKET, lambda: getattr(socket, "SO_KEEPALIVE"), 1), # noqa: B009
- (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPIDLE"), 45), # noqa: B009
- (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPINTVL"), 10), # noqa: B009
- (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPCNT"), 6), # noqa: B009
- ]:
- try:
- KEEP_ALIVE_SOCKET_OPTIONS.append((option[0], option[1](), option[2]))
- except AttributeError:
- # a specific option might not be available on specific systems,
- # e.g. TCP_KEEPIDLE doesn't exist on macOS
- pass
- class Transport(ABC):
- """Baseclass for all transports.
- A transport is used to send an event to sentry.
- """
- parsed_dsn = None # type: Optional[Dsn]
- def __init__(self, options=None):
- # type: (Self, Optional[Dict[str, Any]]) -> None
- self.options = options
- if options and options["dsn"] is not None and options["dsn"]:
- self.parsed_dsn = Dsn(options["dsn"])
- else:
- self.parsed_dsn = None
- def capture_event(self, event):
- # type: (Self, Event) -> None
- """
- DEPRECATED: Please use capture_envelope instead.
- This gets invoked with the event dictionary when an event should
- be sent to sentry.
- """
- warnings.warn(
- "capture_event is deprecated, please use capture_envelope instead!",
- DeprecationWarning,
- stacklevel=2,
- )
- envelope = Envelope()
- envelope.add_event(event)
- self.capture_envelope(envelope)
- @abstractmethod
- def capture_envelope(self, envelope):
- # type: (Self, Envelope) -> None
- """
- Send an envelope to Sentry.
- Envelopes are a data container format that can hold any type of data
- submitted to Sentry. We use it to send all event data (including errors,
- transactions, crons check-ins, etc.) to Sentry.
- """
- pass
- def flush(
- self,
- timeout,
- callback=None,
- ):
- # type: (Self, float, Optional[Any]) -> None
- """
- Wait `timeout` seconds for the current events to be sent out.
- The default implementation is a no-op, since this method may only be relevant to some transports.
- Subclasses should override this method if necessary.
- """
- return None
- def kill(self):
- # type: (Self) -> None
- """
- Forcefully kills the transport.
- The default implementation is a no-op, since this method may only be relevant to some transports.
- Subclasses should override this method if necessary.
- """
- return None
- def record_lost_event(
- self,
- reason, # type: str
- data_category=None, # type: Optional[EventDataCategory]
- item=None, # type: Optional[Item]
- *,
- quantity=1, # type: int
- ):
- # type: (...) -> None
- """This increments a counter for event loss by reason and
- data category by the given positive-int quantity (default 1).
- If an item is provided, the data category and quantity are
- extracted from the item, and the values passed for
- data_category and quantity are ignored.
- When recording a lost transaction via data_category="transaction",
- the calling code should also record the lost spans via this method.
- When recording lost spans, `quantity` should be set to the number
- of contained spans, plus one for the transaction itself. When
- passing an Item containing a transaction via the `item` parameter,
- this method automatically records the lost spans.
- """
- return None
- def is_healthy(self):
- # type: (Self) -> bool
- return True
- def _parse_rate_limits(header, now=None):
- # type: (str, Optional[datetime]) -> Iterable[Tuple[Optional[EventDataCategory], datetime]]
- if now is None:
- now = datetime.now(timezone.utc)
- for limit in header.split(","):
- try:
- parameters = limit.strip().split(":")
- retry_after_val, categories = parameters[:2]
- retry_after = now + timedelta(seconds=int(retry_after_val))
- for category in categories and categories.split(";") or (None,):
- yield category, retry_after # type: ignore
- except (LookupError, ValueError):
- continue
- class BaseHttpTransport(Transport):
- """The base HTTP transport."""
- TIMEOUT = 30 # seconds
- def __init__(self, options):
- # type: (Self, Dict[str, Any]) -> None
- from sentry_sdk.consts import VERSION
- Transport.__init__(self, options)
- assert self.parsed_dsn is not None
- self.options = options # type: Dict[str, Any]
- self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
- self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
- self._disabled_until = {} # type: Dict[Optional[EventDataCategory], datetime]
- # We only use this Retry() class for the `get_retry_after` method it exposes
- self._retry = urllib3.util.Retry()
- self._discarded_events = defaultdict(int) # type: DefaultDict[Tuple[EventDataCategory, str], int]
- self._last_client_report_sent = time.time()
- self._pool = self._make_pool()
- # Backwards compatibility for deprecated `self.hub_class` attribute
- self._hub_cls = sentry_sdk.Hub
- experiments = options.get("_experiments", {})
- compression_level = experiments.get(
- "transport_compression_level",
- experiments.get("transport_zlib_compression_level"),
- )
- compression_algo = experiments.get(
- "transport_compression_algo",
- (
- "gzip"
- # if only compression level is set, assume gzip for backwards compatibility
- # if we don't have brotli available, fallback to gzip
- if compression_level is not None or brotli is None
- else "br"
- ),
- )
- if compression_algo == "br" and brotli is None:
- logger.warning(
- "You asked for brotli compression without the Brotli module, falling back to gzip -9"
- )
- compression_algo = "gzip"
- compression_level = None
- if compression_algo not in ("br", "gzip"):
- logger.warning(
- "Unknown compression algo %s, disabling compression", compression_algo
- )
- self._compression_level = 0
- self._compression_algo = None
- else:
- self._compression_algo = compression_algo
- if compression_level is not None:
- self._compression_level = compression_level
- elif self._compression_algo == "gzip":
- self._compression_level = 9
- elif self._compression_algo == "br":
- self._compression_level = 4
- def record_lost_event(
- self,
- reason, # type: str
- data_category=None, # type: Optional[EventDataCategory]
- item=None, # type: Optional[Item]
- *,
- quantity=1, # type: int
- ):
- # type: (...) -> None
- if not self.options["send_client_reports"]:
- return
- if item is not None:
- data_category = item.data_category
- quantity = 1 # If an item is provided, we always count it as 1 (except for attachments, handled below).
- if data_category == "transaction":
- # Also record the lost spans
- event = item.get_transaction_event() or {}
- # +1 for the transaction itself
- span_count = (
- len(cast(List[Dict[str, object]], event.get("spans") or [])) + 1
- )
- self.record_lost_event(reason, "span", quantity=span_count)
- elif data_category == "attachment":
- # quantity of 0 is actually 1 as we do not want to count
- # empty attachments as actually empty.
- quantity = len(item.get_bytes()) or 1
- elif data_category is None:
- raise TypeError("data category not provided")
- self._discarded_events[data_category, reason] += quantity
- def _get_header_value(self, response, header):
- # type: (Self, Any, str) -> Optional[str]
- return response.headers.get(header)
- def _update_rate_limits(self, response):
- # type: (Self, Union[urllib3.BaseHTTPResponse, httpcore.Response]) -> None
- # new sentries with more rate limit insights. We honor this header
- # no matter of the status code to update our internal rate limits.
- header = self._get_header_value(response, "x-sentry-rate-limits")
- if header:
- logger.warning("Rate-limited via x-sentry-rate-limits")
- self._disabled_until.update(_parse_rate_limits(header))
- # old sentries only communicate global rate limit hits via the
- # retry-after header on 429. This header can also be emitted on new
- # sentries if a proxy in front wants to globally slow things down.
- elif response.status == 429:
- logger.warning("Rate-limited via 429")
- retry_after_value = self._get_header_value(response, "Retry-After")
- retry_after = (
- self._retry.parse_retry_after(retry_after_value)
- if retry_after_value is not None
- else None
- ) or 60
- self._disabled_until[None] = datetime.now(timezone.utc) + timedelta(
- seconds=retry_after
- )
- def _send_request(
- self,
- body,
- headers,
- endpoint_type=EndpointType.ENVELOPE,
- envelope=None,
- ):
- # type: (Self, bytes, Dict[str, str], EndpointType, Optional[Envelope]) -> None
- def record_loss(reason):
- # type: (str) -> None
- if envelope is None:
- self.record_lost_event(reason, data_category="error")
- else:
- for item in envelope.items:
- self.record_lost_event(reason, item=item)
- headers.update(
- {
- "User-Agent": str(self._auth.client),
- "X-Sentry-Auth": str(self._auth.to_header()),
- }
- )
- try:
- response = self._request(
- "POST",
- endpoint_type,
- body,
- headers,
- )
- except Exception:
- self.on_dropped_event("network")
- record_loss("network_error")
- raise
- try:
- self._update_rate_limits(response)
- if response.status == 429:
- # if we hit a 429. Something was rate limited but we already
- # acted on this in `self._update_rate_limits`. Note that we
- # do not want to record event loss here as we will have recorded
- # an outcome in relay already.
- self.on_dropped_event("status_429")
- pass
- elif response.status >= 300 or response.status < 200:
- logger.error(
- "Unexpected status code: %s (body: %s)",
- response.status,
- getattr(response, "data", getattr(response, "content", None)),
- )
- self.on_dropped_event("status_{}".format(response.status))
- record_loss("network_error")
- finally:
- response.close()
- def on_dropped_event(self, _reason):
- # type: (Self, str) -> None
- return None
- def _fetch_pending_client_report(self, force=False, interval=60):
- # type: (Self, bool, int) -> Optional[Item]
- if not self.options["send_client_reports"]:
- return None
- if not (force or self._last_client_report_sent < time.time() - interval):
- return None
- discarded_events = self._discarded_events
- self._discarded_events = defaultdict(int)
- self._last_client_report_sent = time.time()
- if not discarded_events:
- return None
- return Item(
- PayloadRef(
- json={
- "timestamp": time.time(),
- "discarded_events": [
- {"reason": reason, "category": category, "quantity": quantity}
- for (
- (category, reason),
- quantity,
- ) in discarded_events.items()
- ],
- }
- ),
- type="client_report",
- )
- def _flush_client_reports(self, force=False):
- # type: (Self, bool) -> None
- client_report = self._fetch_pending_client_report(force=force, interval=60)
- if client_report is not None:
- self.capture_envelope(Envelope(items=[client_report]))
- def _check_disabled(self, category):
- # type: (str) -> bool
- def _disabled(bucket):
- # type: (Any) -> bool
- ts = self._disabled_until.get(bucket)
- return ts is not None and ts > datetime.now(timezone.utc)
- return _disabled(category) or _disabled(None)
- def _is_rate_limited(self):
- # type: (Self) -> bool
- return any(
- ts > datetime.now(timezone.utc) for ts in self._disabled_until.values()
- )
- def _is_worker_full(self):
- # type: (Self) -> bool
- return self._worker.full()
- def is_healthy(self):
- # type: (Self) -> bool
- return not (self._is_worker_full() or self._is_rate_limited())
- def _send_envelope(self, envelope):
- # type: (Self, Envelope) -> None
- # remove all items from the envelope which are over quota
- new_items = []
- for item in envelope.items:
- if self._check_disabled(item.data_category):
- if item.data_category in ("transaction", "error", "default", "statsd"):
- self.on_dropped_event("self_rate_limits")
- self.record_lost_event("ratelimit_backoff", item=item)
- else:
- new_items.append(item)
- # Since we're modifying the envelope here make a copy so that others
- # that hold references do not see their envelope modified.
- envelope = Envelope(headers=envelope.headers, items=new_items)
- if not envelope.items:
- return None
- # since we're already in the business of sending out an envelope here
- # check if we have one pending for the stats session envelopes so we
- # can attach it to this enveloped scheduled for sending. This will
- # currently typically attach the client report to the most recent
- # session update.
- client_report_item = self._fetch_pending_client_report(interval=30)
- if client_report_item is not None:
- envelope.items.append(client_report_item)
- content_encoding, body = self._serialize_envelope(envelope)
- assert self.parsed_dsn is not None
- logger.debug(
- "Sending envelope [%s] project:%s host:%s",
- envelope.description,
- self.parsed_dsn.project_id,
- self.parsed_dsn.host,
- )
- headers = {
- "Content-Type": "application/x-sentry-envelope",
- }
- if content_encoding:
- headers["Content-Encoding"] = content_encoding
- self._send_request(
- body.getvalue(),
- headers=headers,
- endpoint_type=EndpointType.ENVELOPE,
- envelope=envelope,
- )
- return None
- def _serialize_envelope(self, envelope):
- # type: (Self, Envelope) -> tuple[Optional[str], io.BytesIO]
- content_encoding = None
- body = io.BytesIO()
- if self._compression_level == 0 or self._compression_algo is None:
- envelope.serialize_into(body)
- else:
- content_encoding = self._compression_algo
- if self._compression_algo == "br" and brotli is not None:
- body.write(
- brotli.compress(
- envelope.serialize(), quality=self._compression_level
- )
- )
- else: # assume gzip as we sanitize the algo value in init
- with gzip.GzipFile(
- fileobj=body, mode="w", compresslevel=self._compression_level
- ) as f:
- envelope.serialize_into(f)
- return content_encoding, body
- def _get_pool_options(self):
- # type: (Self) -> Dict[str, Any]
- raise NotImplementedError()
- def _in_no_proxy(self, parsed_dsn):
- # type: (Self, Dsn) -> bool
- no_proxy = getproxies().get("no")
- if not no_proxy:
- return False
- for host in no_proxy.split(","):
- host = host.strip()
- if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
- return True
- return False
- def _make_pool(self):
- # type: (Self) -> Union[PoolManager, ProxyManager, httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]
- raise NotImplementedError()
- def _request(
- self,
- method,
- endpoint_type,
- body,
- headers,
- ):
- # type: (Self, str, EndpointType, Any, Mapping[str, str]) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]
- raise NotImplementedError()
- def capture_envelope(
- self,
- envelope, # type: Envelope
- ):
- # type: (...) -> None
- def send_envelope_wrapper():
- # type: () -> None
- with capture_internal_exceptions():
- self._send_envelope(envelope)
- self._flush_client_reports()
- if not self._worker.submit(send_envelope_wrapper):
- self.on_dropped_event("full_queue")
- for item in envelope.items:
- self.record_lost_event("queue_overflow", item=item)
- def flush(
- self,
- timeout,
- callback=None,
- ):
- # type: (Self, float, Optional[Callable[[int, float], None]]) -> None
- logger.debug("Flushing HTTP transport")
- if timeout > 0:
- self._worker.submit(lambda: self._flush_client_reports(force=True))
- self._worker.flush(timeout, callback)
- def kill(self):
- # type: (Self) -> None
- logger.debug("Killing HTTP transport")
- self._worker.kill()
- @staticmethod
- def _warn_hub_cls():
- # type: () -> None
- """Convenience method to warn users about the deprecation of the `hub_cls` attribute."""
- warnings.warn(
- "The `hub_cls` attribute is deprecated and will be removed in a future release.",
- DeprecationWarning,
- stacklevel=3,
- )
- @property
- def hub_cls(self):
- # type: (Self) -> type[sentry_sdk.Hub]
- """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
- HttpTransport._warn_hub_cls()
- return self._hub_cls
- @hub_cls.setter
- def hub_cls(self, value):
- # type: (Self, type[sentry_sdk.Hub]) -> None
- """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
- HttpTransport._warn_hub_cls()
- self._hub_cls = value
- class HttpTransport(BaseHttpTransport):
- if TYPE_CHECKING:
- _pool: Union[PoolManager, ProxyManager]
- def _get_pool_options(self):
- # type: (Self) -> Dict[str, Any]
- num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
- options = {
- "num_pools": 2 if num_pools is None else int(num_pools),
- "cert_reqs": "CERT_REQUIRED",
- "timeout": urllib3.Timeout(total=self.TIMEOUT),
- }
- socket_options = None # type: Optional[List[Tuple[int, int, int | bytes]]]
- if self.options["socket_options"] is not None:
- socket_options = self.options["socket_options"]
- if self.options["keep_alive"]:
- if socket_options is None:
- socket_options = []
- used_options = {(o[0], o[1]) for o in socket_options}
- for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
- if (default_option[0], default_option[1]) not in used_options:
- socket_options.append(default_option)
- if socket_options is not None:
- options["socket_options"] = socket_options
- options["ca_certs"] = (
- self.options["ca_certs"] # User-provided bundle from the SDK init
- or os.environ.get("SSL_CERT_FILE")
- or os.environ.get("REQUESTS_CA_BUNDLE")
- or certifi.where()
- )
- options["cert_file"] = self.options["cert_file"] or os.environ.get(
- "CLIENT_CERT_FILE"
- )
- options["key_file"] = self.options["key_file"] or os.environ.get(
- "CLIENT_KEY_FILE"
- )
- return options
- def _make_pool(self):
- # type: (Self) -> Union[PoolManager, ProxyManager]
- if self.parsed_dsn is None:
- raise ValueError("Cannot create HTTP-based transport without valid DSN")
- proxy = None
- no_proxy = self._in_no_proxy(self.parsed_dsn)
- # try HTTPS first
- https_proxy = self.options["https_proxy"]
- if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
- proxy = https_proxy or (not no_proxy and getproxies().get("https"))
- # maybe fallback to HTTP proxy
- http_proxy = self.options["http_proxy"]
- if not proxy and (http_proxy != ""):
- proxy = http_proxy or (not no_proxy and getproxies().get("http"))
- opts = self._get_pool_options()
- if proxy:
- proxy_headers = self.options["proxy_headers"]
- if proxy_headers:
- opts["proxy_headers"] = proxy_headers
- if proxy.startswith("socks"):
- use_socks_proxy = True
- try:
- # Check if PySocks dependency is available
- from urllib3.contrib.socks import SOCKSProxyManager
- except ImportError:
- use_socks_proxy = False
- logger.warning(
- "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
- proxy,
- )
- if use_socks_proxy:
- return SOCKSProxyManager(proxy, **opts)
- else:
- return urllib3.PoolManager(**opts)
- else:
- return urllib3.ProxyManager(proxy, **opts)
- else:
- return urllib3.PoolManager(**opts)
- def _request(
- self,
- method,
- endpoint_type,
- body,
- headers,
- ):
- # type: (Self, str, EndpointType, Any, Mapping[str, str]) -> urllib3.BaseHTTPResponse
- return self._pool.request(
- method,
- self._auth.get_api_url(endpoint_type),
- body=body,
- headers=headers,
- )
- try:
- import httpcore
- import h2 # noqa: F401
- except ImportError:
- # Sorry, no Http2Transport for you
- class Http2Transport(HttpTransport):
- def __init__(self, options):
- # type: (Self, Dict[str, Any]) -> None
- super().__init__(options)
- logger.warning(
- "You tried to use HTTP2Transport but don't have httpcore[http2] installed. Falling back to HTTPTransport."
- )
- else:
- class Http2Transport(BaseHttpTransport): # type: ignore
- """The HTTP2 transport based on httpcore."""
- TIMEOUT = 15
- if TYPE_CHECKING:
- _pool: Union[
- httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool
- ]
- def _get_header_value(self, response, header):
- # type: (Self, httpcore.Response, str) -> Optional[str]
- return next(
- (
- val.decode("ascii")
- for key, val in response.headers
- if key.decode("ascii").lower() == header
- ),
- None,
- )
- def _request(
- self,
- method,
- endpoint_type,
- body,
- headers,
- ):
- # type: (Self, str, EndpointType, Any, Mapping[str, str]) -> httpcore.Response
- response = self._pool.request(
- method,
- self._auth.get_api_url(endpoint_type),
- content=body,
- headers=headers, # type: ignore
- extensions={
- "timeout": {
- "pool": self.TIMEOUT,
- "connect": self.TIMEOUT,
- "write": self.TIMEOUT,
- "read": self.TIMEOUT,
- }
- },
- )
- return response
- def _get_pool_options(self):
- # type: (Self) -> Dict[str, Any]
- options = {
- "http2": self.parsed_dsn is not None
- and self.parsed_dsn.scheme == "https",
- "retries": 3,
- } # type: Dict[str, Any]
- socket_options = (
- self.options["socket_options"]
- if self.options["socket_options"] is not None
- else []
- )
- used_options = {(o[0], o[1]) for o in socket_options}
- for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
- if (default_option[0], default_option[1]) not in used_options:
- socket_options.append(default_option)
- options["socket_options"] = socket_options
- ssl_context = ssl.create_default_context()
- ssl_context.load_verify_locations(
- self.options["ca_certs"] # User-provided bundle from the SDK init
- or os.environ.get("SSL_CERT_FILE")
- or os.environ.get("REQUESTS_CA_BUNDLE")
- or certifi.where()
- )
- cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
- key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
- if cert_file is not None:
- ssl_context.load_cert_chain(cert_file, key_file)
- options["ssl_context"] = ssl_context
- return options
- def _make_pool(self):
- # type: (Self) -> Union[httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]
- if self.parsed_dsn is None:
- raise ValueError("Cannot create HTTP-based transport without valid DSN")
- proxy = None
- no_proxy = self._in_no_proxy(self.parsed_dsn)
- # try HTTPS first
- https_proxy = self.options["https_proxy"]
- if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
- proxy = https_proxy or (not no_proxy and getproxies().get("https"))
- # maybe fallback to HTTP proxy
- http_proxy = self.options["http_proxy"]
- if not proxy and (http_proxy != ""):
- proxy = http_proxy or (not no_proxy and getproxies().get("http"))
- opts = self._get_pool_options()
- if proxy:
- proxy_headers = self.options["proxy_headers"]
- if proxy_headers:
- opts["proxy_headers"] = proxy_headers
- if proxy.startswith("socks"):
- try:
- if "socket_options" in opts:
- socket_options = opts.pop("socket_options")
- if socket_options:
- logger.warning(
- "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
- )
- return httpcore.SOCKSProxy(proxy_url=proxy, **opts)
- except RuntimeError:
- logger.warning(
- "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
- proxy,
- )
- else:
- return httpcore.HTTPProxy(proxy_url=proxy, **opts)
- return httpcore.ConnectionPool(**opts)
- class _FunctionTransport(Transport):
- """
- DEPRECATED: Users wishing to provide a custom transport should subclass
- the Transport class, rather than providing a function.
- """
- def __init__(
- self,
- func, # type: Callable[[Event], None]
- ):
- # type: (...) -> None
- Transport.__init__(self)
- self._func = func
- def capture_event(
- self,
- event, # type: Event
- ):
- # type: (...) -> None
- self._func(event)
- return None
- def capture_envelope(self, envelope: Envelope) -> None:
- # Since function transports expect to be called with an event, we need
- # to iterate over the envelope and call the function for each event, via
- # the deprecated capture_event method.
- event = envelope.get_event()
- if event is not None:
- self.capture_event(event)
- def make_transport(options):
- # type: (Dict[str, Any]) -> Optional[Transport]
- ref_transport = options["transport"]
- use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
- # By default, we use the http transport class
- transport_cls = Http2Transport if use_http2_transport else HttpTransport # type: Type[Transport]
- if isinstance(ref_transport, Transport):
- return ref_transport
- elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
- transport_cls = ref_transport
- elif callable(ref_transport):
- warnings.warn(
- "Function transports are deprecated and will be removed in a future release."
- "Please provide a Transport instance or subclass, instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- return _FunctionTransport(ref_transport)
- # if a transport class is given only instantiate it if the dsn is not
- # empty or None
- if options["dsn"]:
- return transport_cls(options)
- return None
|