transport.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899
  1. from abc import ABC, abstractmethod
  2. import io
  3. import os
  4. import gzip
  5. import socket
  6. import ssl
  7. import time
  8. import warnings
  9. from datetime import datetime, timedelta, timezone
  10. from collections import defaultdict
  11. from urllib.request import getproxies
  12. try:
  13. import brotli # type: ignore
  14. except ImportError:
  15. brotli = None
  16. import urllib3
  17. import certifi
  18. import sentry_sdk
  19. from sentry_sdk.consts import EndpointType
  20. from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
  21. from sentry_sdk.worker import BackgroundWorker
  22. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  23. from typing import TYPE_CHECKING, cast, List, Dict
  24. if TYPE_CHECKING:
  25. from typing import Any
  26. from typing import Callable
  27. from typing import DefaultDict
  28. from typing import Iterable
  29. from typing import Mapping
  30. from typing import Optional
  31. from typing import Self
  32. from typing import Tuple
  33. from typing import Type
  34. from typing import Union
  35. from urllib3.poolmanager import PoolManager
  36. from urllib3.poolmanager import ProxyManager
  37. from sentry_sdk._types import Event, EventDataCategory
  38. KEEP_ALIVE_SOCKET_OPTIONS = []
  39. for option in [
  40. (socket.SOL_SOCKET, lambda: getattr(socket, "SO_KEEPALIVE"), 1), # noqa: B009
  41. (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPIDLE"), 45), # noqa: B009
  42. (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPINTVL"), 10), # noqa: B009
  43. (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPCNT"), 6), # noqa: B009
  44. ]:
  45. try:
  46. KEEP_ALIVE_SOCKET_OPTIONS.append((option[0], option[1](), option[2]))
  47. except AttributeError:
  48. # a specific option might not be available on specific systems,
  49. # e.g. TCP_KEEPIDLE doesn't exist on macOS
  50. pass
  51. class Transport(ABC):
  52. """Baseclass for all transports.
  53. A transport is used to send an event to sentry.
  54. """
  55. parsed_dsn = None # type: Optional[Dsn]
  56. def __init__(self, options=None):
  57. # type: (Self, Optional[Dict[str, Any]]) -> None
  58. self.options = options
  59. if options and options["dsn"] is not None and options["dsn"]:
  60. self.parsed_dsn = Dsn(options["dsn"])
  61. else:
  62. self.parsed_dsn = None
  63. def capture_event(self, event):
  64. # type: (Self, Event) -> None
  65. """
  66. DEPRECATED: Please use capture_envelope instead.
  67. This gets invoked with the event dictionary when an event should
  68. be sent to sentry.
  69. """
  70. warnings.warn(
  71. "capture_event is deprecated, please use capture_envelope instead!",
  72. DeprecationWarning,
  73. stacklevel=2,
  74. )
  75. envelope = Envelope()
  76. envelope.add_event(event)
  77. self.capture_envelope(envelope)
  78. @abstractmethod
  79. def capture_envelope(self, envelope):
  80. # type: (Self, Envelope) -> None
  81. """
  82. Send an envelope to Sentry.
  83. Envelopes are a data container format that can hold any type of data
  84. submitted to Sentry. We use it to send all event data (including errors,
  85. transactions, crons check-ins, etc.) to Sentry.
  86. """
  87. pass
  88. def flush(
  89. self,
  90. timeout,
  91. callback=None,
  92. ):
  93. # type: (Self, float, Optional[Any]) -> None
  94. """
  95. Wait `timeout` seconds for the current events to be sent out.
  96. The default implementation is a no-op, since this method may only be relevant to some transports.
  97. Subclasses should override this method if necessary.
  98. """
  99. return None
  100. def kill(self):
  101. # type: (Self) -> None
  102. """
  103. Forcefully kills the transport.
  104. The default implementation is a no-op, since this method may only be relevant to some transports.
  105. Subclasses should override this method if necessary.
  106. """
  107. return None
  108. def record_lost_event(
  109. self,
  110. reason, # type: str
  111. data_category=None, # type: Optional[EventDataCategory]
  112. item=None, # type: Optional[Item]
  113. *,
  114. quantity=1, # type: int
  115. ):
  116. # type: (...) -> None
  117. """This increments a counter for event loss by reason and
  118. data category by the given positive-int quantity (default 1).
  119. If an item is provided, the data category and quantity are
  120. extracted from the item, and the values passed for
  121. data_category and quantity are ignored.
  122. When recording a lost transaction via data_category="transaction",
  123. the calling code should also record the lost spans via this method.
  124. When recording lost spans, `quantity` should be set to the number
  125. of contained spans, plus one for the transaction itself. When
  126. passing an Item containing a transaction via the `item` parameter,
  127. this method automatically records the lost spans.
  128. """
  129. return None
  130. def is_healthy(self):
  131. # type: (Self) -> bool
  132. return True
  133. def _parse_rate_limits(header, now=None):
  134. # type: (str, Optional[datetime]) -> Iterable[Tuple[Optional[EventDataCategory], datetime]]
  135. if now is None:
  136. now = datetime.now(timezone.utc)
  137. for limit in header.split(","):
  138. try:
  139. parameters = limit.strip().split(":")
  140. retry_after_val, categories = parameters[:2]
  141. retry_after = now + timedelta(seconds=int(retry_after_val))
  142. for category in categories and categories.split(";") or (None,):
  143. yield category, retry_after # type: ignore
  144. except (LookupError, ValueError):
  145. continue
  146. class BaseHttpTransport(Transport):
  147. """The base HTTP transport."""
  148. TIMEOUT = 30 # seconds
  149. def __init__(self, options):
  150. # type: (Self, Dict[str, Any]) -> None
  151. from sentry_sdk.consts import VERSION
  152. Transport.__init__(self, options)
  153. assert self.parsed_dsn is not None
  154. self.options = options # type: Dict[str, Any]
  155. self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
  156. self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
  157. self._disabled_until = {} # type: Dict[Optional[EventDataCategory], datetime]
  158. # We only use this Retry() class for the `get_retry_after` method it exposes
  159. self._retry = urllib3.util.Retry()
  160. self._discarded_events = defaultdict(int) # type: DefaultDict[Tuple[EventDataCategory, str], int]
  161. self._last_client_report_sent = time.time()
  162. self._pool = self._make_pool()
  163. # Backwards compatibility for deprecated `self.hub_class` attribute
  164. self._hub_cls = sentry_sdk.Hub
  165. experiments = options.get("_experiments", {})
  166. compression_level = experiments.get(
  167. "transport_compression_level",
  168. experiments.get("transport_zlib_compression_level"),
  169. )
  170. compression_algo = experiments.get(
  171. "transport_compression_algo",
  172. (
  173. "gzip"
  174. # if only compression level is set, assume gzip for backwards compatibility
  175. # if we don't have brotli available, fallback to gzip
  176. if compression_level is not None or brotli is None
  177. else "br"
  178. ),
  179. )
  180. if compression_algo == "br" and brotli is None:
  181. logger.warning(
  182. "You asked for brotli compression without the Brotli module, falling back to gzip -9"
  183. )
  184. compression_algo = "gzip"
  185. compression_level = None
  186. if compression_algo not in ("br", "gzip"):
  187. logger.warning(
  188. "Unknown compression algo %s, disabling compression", compression_algo
  189. )
  190. self._compression_level = 0
  191. self._compression_algo = None
  192. else:
  193. self._compression_algo = compression_algo
  194. if compression_level is not None:
  195. self._compression_level = compression_level
  196. elif self._compression_algo == "gzip":
  197. self._compression_level = 9
  198. elif self._compression_algo == "br":
  199. self._compression_level = 4
  200. def record_lost_event(
  201. self,
  202. reason, # type: str
  203. data_category=None, # type: Optional[EventDataCategory]
  204. item=None, # type: Optional[Item]
  205. *,
  206. quantity=1, # type: int
  207. ):
  208. # type: (...) -> None
  209. if not self.options["send_client_reports"]:
  210. return
  211. if item is not None:
  212. data_category = item.data_category
  213. quantity = 1 # If an item is provided, we always count it as 1 (except for attachments, handled below).
  214. if data_category == "transaction":
  215. # Also record the lost spans
  216. event = item.get_transaction_event() or {}
  217. # +1 for the transaction itself
  218. span_count = (
  219. len(cast(List[Dict[str, object]], event.get("spans") or [])) + 1
  220. )
  221. self.record_lost_event(reason, "span", quantity=span_count)
  222. elif data_category == "attachment":
  223. # quantity of 0 is actually 1 as we do not want to count
  224. # empty attachments as actually empty.
  225. quantity = len(item.get_bytes()) or 1
  226. elif data_category is None:
  227. raise TypeError("data category not provided")
  228. self._discarded_events[data_category, reason] += quantity
  229. def _get_header_value(self, response, header):
  230. # type: (Self, Any, str) -> Optional[str]
  231. return response.headers.get(header)
  232. def _update_rate_limits(self, response):
  233. # type: (Self, Union[urllib3.BaseHTTPResponse, httpcore.Response]) -> None
  234. # new sentries with more rate limit insights. We honor this header
  235. # no matter of the status code to update our internal rate limits.
  236. header = self._get_header_value(response, "x-sentry-rate-limits")
  237. if header:
  238. logger.warning("Rate-limited via x-sentry-rate-limits")
  239. self._disabled_until.update(_parse_rate_limits(header))
  240. # old sentries only communicate global rate limit hits via the
  241. # retry-after header on 429. This header can also be emitted on new
  242. # sentries if a proxy in front wants to globally slow things down.
  243. elif response.status == 429:
  244. logger.warning("Rate-limited via 429")
  245. retry_after_value = self._get_header_value(response, "Retry-After")
  246. retry_after = (
  247. self._retry.parse_retry_after(retry_after_value)
  248. if retry_after_value is not None
  249. else None
  250. ) or 60
  251. self._disabled_until[None] = datetime.now(timezone.utc) + timedelta(
  252. seconds=retry_after
  253. )
  254. def _send_request(
  255. self,
  256. body,
  257. headers,
  258. endpoint_type=EndpointType.ENVELOPE,
  259. envelope=None,
  260. ):
  261. # type: (Self, bytes, Dict[str, str], EndpointType, Optional[Envelope]) -> None
  262. def record_loss(reason):
  263. # type: (str) -> None
  264. if envelope is None:
  265. self.record_lost_event(reason, data_category="error")
  266. else:
  267. for item in envelope.items:
  268. self.record_lost_event(reason, item=item)
  269. headers.update(
  270. {
  271. "User-Agent": str(self._auth.client),
  272. "X-Sentry-Auth": str(self._auth.to_header()),
  273. }
  274. )
  275. try:
  276. response = self._request(
  277. "POST",
  278. endpoint_type,
  279. body,
  280. headers,
  281. )
  282. except Exception:
  283. self.on_dropped_event("network")
  284. record_loss("network_error")
  285. raise
  286. try:
  287. self._update_rate_limits(response)
  288. if response.status == 429:
  289. # if we hit a 429. Something was rate limited but we already
  290. # acted on this in `self._update_rate_limits`. Note that we
  291. # do not want to record event loss here as we will have recorded
  292. # an outcome in relay already.
  293. self.on_dropped_event("status_429")
  294. pass
  295. elif response.status >= 300 or response.status < 200:
  296. logger.error(
  297. "Unexpected status code: %s (body: %s)",
  298. response.status,
  299. getattr(response, "data", getattr(response, "content", None)),
  300. )
  301. self.on_dropped_event("status_{}".format(response.status))
  302. record_loss("network_error")
  303. finally:
  304. response.close()
  305. def on_dropped_event(self, _reason):
  306. # type: (Self, str) -> None
  307. return None
  308. def _fetch_pending_client_report(self, force=False, interval=60):
  309. # type: (Self, bool, int) -> Optional[Item]
  310. if not self.options["send_client_reports"]:
  311. return None
  312. if not (force or self._last_client_report_sent < time.time() - interval):
  313. return None
  314. discarded_events = self._discarded_events
  315. self._discarded_events = defaultdict(int)
  316. self._last_client_report_sent = time.time()
  317. if not discarded_events:
  318. return None
  319. return Item(
  320. PayloadRef(
  321. json={
  322. "timestamp": time.time(),
  323. "discarded_events": [
  324. {"reason": reason, "category": category, "quantity": quantity}
  325. for (
  326. (category, reason),
  327. quantity,
  328. ) in discarded_events.items()
  329. ],
  330. }
  331. ),
  332. type="client_report",
  333. )
  334. def _flush_client_reports(self, force=False):
  335. # type: (Self, bool) -> None
  336. client_report = self._fetch_pending_client_report(force=force, interval=60)
  337. if client_report is not None:
  338. self.capture_envelope(Envelope(items=[client_report]))
  339. def _check_disabled(self, category):
  340. # type: (str) -> bool
  341. def _disabled(bucket):
  342. # type: (Any) -> bool
  343. ts = self._disabled_until.get(bucket)
  344. return ts is not None and ts > datetime.now(timezone.utc)
  345. return _disabled(category) or _disabled(None)
  346. def _is_rate_limited(self):
  347. # type: (Self) -> bool
  348. return any(
  349. ts > datetime.now(timezone.utc) for ts in self._disabled_until.values()
  350. )
  351. def _is_worker_full(self):
  352. # type: (Self) -> bool
  353. return self._worker.full()
  354. def is_healthy(self):
  355. # type: (Self) -> bool
  356. return not (self._is_worker_full() or self._is_rate_limited())
  357. def _send_envelope(self, envelope):
  358. # type: (Self, Envelope) -> None
  359. # remove all items from the envelope which are over quota
  360. new_items = []
  361. for item in envelope.items:
  362. if self._check_disabled(item.data_category):
  363. if item.data_category in ("transaction", "error", "default", "statsd"):
  364. self.on_dropped_event("self_rate_limits")
  365. self.record_lost_event("ratelimit_backoff", item=item)
  366. else:
  367. new_items.append(item)
  368. # Since we're modifying the envelope here make a copy so that others
  369. # that hold references do not see their envelope modified.
  370. envelope = Envelope(headers=envelope.headers, items=new_items)
  371. if not envelope.items:
  372. return None
  373. # since we're already in the business of sending out an envelope here
  374. # check if we have one pending for the stats session envelopes so we
  375. # can attach it to this enveloped scheduled for sending. This will
  376. # currently typically attach the client report to the most recent
  377. # session update.
  378. client_report_item = self._fetch_pending_client_report(interval=30)
  379. if client_report_item is not None:
  380. envelope.items.append(client_report_item)
  381. content_encoding, body = self._serialize_envelope(envelope)
  382. assert self.parsed_dsn is not None
  383. logger.debug(
  384. "Sending envelope [%s] project:%s host:%s",
  385. envelope.description,
  386. self.parsed_dsn.project_id,
  387. self.parsed_dsn.host,
  388. )
  389. headers = {
  390. "Content-Type": "application/x-sentry-envelope",
  391. }
  392. if content_encoding:
  393. headers["Content-Encoding"] = content_encoding
  394. self._send_request(
  395. body.getvalue(),
  396. headers=headers,
  397. endpoint_type=EndpointType.ENVELOPE,
  398. envelope=envelope,
  399. )
  400. return None
  401. def _serialize_envelope(self, envelope):
  402. # type: (Self, Envelope) -> tuple[Optional[str], io.BytesIO]
  403. content_encoding = None
  404. body = io.BytesIO()
  405. if self._compression_level == 0 or self._compression_algo is None:
  406. envelope.serialize_into(body)
  407. else:
  408. content_encoding = self._compression_algo
  409. if self._compression_algo == "br" and brotli is not None:
  410. body.write(
  411. brotli.compress(
  412. envelope.serialize(), quality=self._compression_level
  413. )
  414. )
  415. else: # assume gzip as we sanitize the algo value in init
  416. with gzip.GzipFile(
  417. fileobj=body, mode="w", compresslevel=self._compression_level
  418. ) as f:
  419. envelope.serialize_into(f)
  420. return content_encoding, body
  421. def _get_pool_options(self):
  422. # type: (Self) -> Dict[str, Any]
  423. raise NotImplementedError()
  424. def _in_no_proxy(self, parsed_dsn):
  425. # type: (Self, Dsn) -> bool
  426. no_proxy = getproxies().get("no")
  427. if not no_proxy:
  428. return False
  429. for host in no_proxy.split(","):
  430. host = host.strip()
  431. if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
  432. return True
  433. return False
  434. def _make_pool(self):
  435. # type: (Self) -> Union[PoolManager, ProxyManager, httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]
  436. raise NotImplementedError()
  437. def _request(
  438. self,
  439. method,
  440. endpoint_type,
  441. body,
  442. headers,
  443. ):
  444. # type: (Self, str, EndpointType, Any, Mapping[str, str]) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]
  445. raise NotImplementedError()
  446. def capture_envelope(
  447. self,
  448. envelope, # type: Envelope
  449. ):
  450. # type: (...) -> None
  451. def send_envelope_wrapper():
  452. # type: () -> None
  453. with capture_internal_exceptions():
  454. self._send_envelope(envelope)
  455. self._flush_client_reports()
  456. if not self._worker.submit(send_envelope_wrapper):
  457. self.on_dropped_event("full_queue")
  458. for item in envelope.items:
  459. self.record_lost_event("queue_overflow", item=item)
  460. def flush(
  461. self,
  462. timeout,
  463. callback=None,
  464. ):
  465. # type: (Self, float, Optional[Callable[[int, float], None]]) -> None
  466. logger.debug("Flushing HTTP transport")
  467. if timeout > 0:
  468. self._worker.submit(lambda: self._flush_client_reports(force=True))
  469. self._worker.flush(timeout, callback)
  470. def kill(self):
  471. # type: (Self) -> None
  472. logger.debug("Killing HTTP transport")
  473. self._worker.kill()
  474. @staticmethod
  475. def _warn_hub_cls():
  476. # type: () -> None
  477. """Convenience method to warn users about the deprecation of the `hub_cls` attribute."""
  478. warnings.warn(
  479. "The `hub_cls` attribute is deprecated and will be removed in a future release.",
  480. DeprecationWarning,
  481. stacklevel=3,
  482. )
  483. @property
  484. def hub_cls(self):
  485. # type: (Self) -> type[sentry_sdk.Hub]
  486. """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
  487. HttpTransport._warn_hub_cls()
  488. return self._hub_cls
  489. @hub_cls.setter
  490. def hub_cls(self, value):
  491. # type: (Self, type[sentry_sdk.Hub]) -> None
  492. """DEPRECATED: This attribute is deprecated and will be removed in a future release."""
  493. HttpTransport._warn_hub_cls()
  494. self._hub_cls = value
  495. class HttpTransport(BaseHttpTransport):
  496. if TYPE_CHECKING:
  497. _pool: Union[PoolManager, ProxyManager]
  498. def _get_pool_options(self):
  499. # type: (Self) -> Dict[str, Any]
  500. num_pools = self.options.get("_experiments", {}).get("transport_num_pools")
  501. options = {
  502. "num_pools": 2 if num_pools is None else int(num_pools),
  503. "cert_reqs": "CERT_REQUIRED",
  504. "timeout": urllib3.Timeout(total=self.TIMEOUT),
  505. }
  506. socket_options = None # type: Optional[List[Tuple[int, int, int | bytes]]]
  507. if self.options["socket_options"] is not None:
  508. socket_options = self.options["socket_options"]
  509. if self.options["keep_alive"]:
  510. if socket_options is None:
  511. socket_options = []
  512. used_options = {(o[0], o[1]) for o in socket_options}
  513. for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
  514. if (default_option[0], default_option[1]) not in used_options:
  515. socket_options.append(default_option)
  516. if socket_options is not None:
  517. options["socket_options"] = socket_options
  518. options["ca_certs"] = (
  519. self.options["ca_certs"] # User-provided bundle from the SDK init
  520. or os.environ.get("SSL_CERT_FILE")
  521. or os.environ.get("REQUESTS_CA_BUNDLE")
  522. or certifi.where()
  523. )
  524. options["cert_file"] = self.options["cert_file"] or os.environ.get(
  525. "CLIENT_CERT_FILE"
  526. )
  527. options["key_file"] = self.options["key_file"] or os.environ.get(
  528. "CLIENT_KEY_FILE"
  529. )
  530. return options
  531. def _make_pool(self):
  532. # type: (Self) -> Union[PoolManager, ProxyManager]
  533. if self.parsed_dsn is None:
  534. raise ValueError("Cannot create HTTP-based transport without valid DSN")
  535. proxy = None
  536. no_proxy = self._in_no_proxy(self.parsed_dsn)
  537. # try HTTPS first
  538. https_proxy = self.options["https_proxy"]
  539. if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
  540. proxy = https_proxy or (not no_proxy and getproxies().get("https"))
  541. # maybe fallback to HTTP proxy
  542. http_proxy = self.options["http_proxy"]
  543. if not proxy and (http_proxy != ""):
  544. proxy = http_proxy or (not no_proxy and getproxies().get("http"))
  545. opts = self._get_pool_options()
  546. if proxy:
  547. proxy_headers = self.options["proxy_headers"]
  548. if proxy_headers:
  549. opts["proxy_headers"] = proxy_headers
  550. if proxy.startswith("socks"):
  551. use_socks_proxy = True
  552. try:
  553. # Check if PySocks dependency is available
  554. from urllib3.contrib.socks import SOCKSProxyManager
  555. except ImportError:
  556. use_socks_proxy = False
  557. logger.warning(
  558. "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.",
  559. proxy,
  560. )
  561. if use_socks_proxy:
  562. return SOCKSProxyManager(proxy, **opts)
  563. else:
  564. return urllib3.PoolManager(**opts)
  565. else:
  566. return urllib3.ProxyManager(proxy, **opts)
  567. else:
  568. return urllib3.PoolManager(**opts)
  569. def _request(
  570. self,
  571. method,
  572. endpoint_type,
  573. body,
  574. headers,
  575. ):
  576. # type: (Self, str, EndpointType, Any, Mapping[str, str]) -> urllib3.BaseHTTPResponse
  577. return self._pool.request(
  578. method,
  579. self._auth.get_api_url(endpoint_type),
  580. body=body,
  581. headers=headers,
  582. )
  583. try:
  584. import httpcore
  585. import h2 # noqa: F401
  586. except ImportError:
  587. # Sorry, no Http2Transport for you
  588. class Http2Transport(HttpTransport):
  589. def __init__(self, options):
  590. # type: (Self, Dict[str, Any]) -> None
  591. super().__init__(options)
  592. logger.warning(
  593. "You tried to use HTTP2Transport but don't have httpcore[http2] installed. Falling back to HTTPTransport."
  594. )
  595. else:
  596. class Http2Transport(BaseHttpTransport): # type: ignore
  597. """The HTTP2 transport based on httpcore."""
  598. TIMEOUT = 15
  599. if TYPE_CHECKING:
  600. _pool: Union[
  601. httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool
  602. ]
  603. def _get_header_value(self, response, header):
  604. # type: (Self, httpcore.Response, str) -> Optional[str]
  605. return next(
  606. (
  607. val.decode("ascii")
  608. for key, val in response.headers
  609. if key.decode("ascii").lower() == header
  610. ),
  611. None,
  612. )
  613. def _request(
  614. self,
  615. method,
  616. endpoint_type,
  617. body,
  618. headers,
  619. ):
  620. # type: (Self, str, EndpointType, Any, Mapping[str, str]) -> httpcore.Response
  621. response = self._pool.request(
  622. method,
  623. self._auth.get_api_url(endpoint_type),
  624. content=body,
  625. headers=headers, # type: ignore
  626. extensions={
  627. "timeout": {
  628. "pool": self.TIMEOUT,
  629. "connect": self.TIMEOUT,
  630. "write": self.TIMEOUT,
  631. "read": self.TIMEOUT,
  632. }
  633. },
  634. )
  635. return response
  636. def _get_pool_options(self):
  637. # type: (Self) -> Dict[str, Any]
  638. options = {
  639. "http2": self.parsed_dsn is not None
  640. and self.parsed_dsn.scheme == "https",
  641. "retries": 3,
  642. } # type: Dict[str, Any]
  643. socket_options = (
  644. self.options["socket_options"]
  645. if self.options["socket_options"] is not None
  646. else []
  647. )
  648. used_options = {(o[0], o[1]) for o in socket_options}
  649. for default_option in KEEP_ALIVE_SOCKET_OPTIONS:
  650. if (default_option[0], default_option[1]) not in used_options:
  651. socket_options.append(default_option)
  652. options["socket_options"] = socket_options
  653. ssl_context = ssl.create_default_context()
  654. ssl_context.load_verify_locations(
  655. self.options["ca_certs"] # User-provided bundle from the SDK init
  656. or os.environ.get("SSL_CERT_FILE")
  657. or os.environ.get("REQUESTS_CA_BUNDLE")
  658. or certifi.where()
  659. )
  660. cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE")
  661. key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE")
  662. if cert_file is not None:
  663. ssl_context.load_cert_chain(cert_file, key_file)
  664. options["ssl_context"] = ssl_context
  665. return options
  666. def _make_pool(self):
  667. # type: (Self) -> Union[httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]
  668. if self.parsed_dsn is None:
  669. raise ValueError("Cannot create HTTP-based transport without valid DSN")
  670. proxy = None
  671. no_proxy = self._in_no_proxy(self.parsed_dsn)
  672. # try HTTPS first
  673. https_proxy = self.options["https_proxy"]
  674. if self.parsed_dsn.scheme == "https" and (https_proxy != ""):
  675. proxy = https_proxy or (not no_proxy and getproxies().get("https"))
  676. # maybe fallback to HTTP proxy
  677. http_proxy = self.options["http_proxy"]
  678. if not proxy and (http_proxy != ""):
  679. proxy = http_proxy or (not no_proxy and getproxies().get("http"))
  680. opts = self._get_pool_options()
  681. if proxy:
  682. proxy_headers = self.options["proxy_headers"]
  683. if proxy_headers:
  684. opts["proxy_headers"] = proxy_headers
  685. if proxy.startswith("socks"):
  686. try:
  687. if "socket_options" in opts:
  688. socket_options = opts.pop("socket_options")
  689. if socket_options:
  690. logger.warning(
  691. "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options."
  692. )
  693. return httpcore.SOCKSProxy(proxy_url=proxy, **opts)
  694. except RuntimeError:
  695. logger.warning(
  696. "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.",
  697. proxy,
  698. )
  699. else:
  700. return httpcore.HTTPProxy(proxy_url=proxy, **opts)
  701. return httpcore.ConnectionPool(**opts)
  702. class _FunctionTransport(Transport):
  703. """
  704. DEPRECATED: Users wishing to provide a custom transport should subclass
  705. the Transport class, rather than providing a function.
  706. """
  707. def __init__(
  708. self,
  709. func, # type: Callable[[Event], None]
  710. ):
  711. # type: (...) -> None
  712. Transport.__init__(self)
  713. self._func = func
  714. def capture_event(
  715. self,
  716. event, # type: Event
  717. ):
  718. # type: (...) -> None
  719. self._func(event)
  720. return None
  721. def capture_envelope(self, envelope: Envelope) -> None:
  722. # Since function transports expect to be called with an event, we need
  723. # to iterate over the envelope and call the function for each event, via
  724. # the deprecated capture_event method.
  725. event = envelope.get_event()
  726. if event is not None:
  727. self.capture_event(event)
  728. def make_transport(options):
  729. # type: (Dict[str, Any]) -> Optional[Transport]
  730. ref_transport = options["transport"]
  731. use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
  732. # By default, we use the http transport class
  733. transport_cls = Http2Transport if use_http2_transport else HttpTransport # type: Type[Transport]
  734. if isinstance(ref_transport, Transport):
  735. return ref_transport
  736. elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
  737. transport_cls = ref_transport
  738. elif callable(ref_transport):
  739. warnings.warn(
  740. "Function transports are deprecated and will be removed in a future release."
  741. "Please provide a Transport instance or subclass, instead.",
  742. DeprecationWarning,
  743. stacklevel=2,
  744. )
  745. return _FunctionTransport(ref_transport)
  746. # if a transport class is given only instantiate it if the dsn is not
  747. # empty or None
  748. if options["dsn"]:
  749. return transport_cls(options)
  750. return None