from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions from sentry_sdk.worker import BackgroundWorker from sentry_sdk.envelope import Envelope, get_event_data_category
from sentry_sdk._types import MYPY
if MYPY: from typing import Type from typing import Any from typing import Optional from typing import Dict from typing import Union from typing import Callable from urllib3.poolmanager import PoolManager # type: ignore from urllib3.poolmanager import ProxyManager
from sentry_sdk._types import Event
try: from urllib.request import getproxies except ImportError: from urllib import getproxies # type: ignore
class Transport(object): """Baseclass for all transports.
A transport is used to send an event to sentry. """
def capture_event(
self, event # type: Event
): # type: (...) -> None """This gets invoked with the event dictionary when an event should
be sent to sentry. """ raise NotImplementedError()
def capture_envelope(
self, envelope # type: Envelope
): # type: (...) -> None """This gets invoked with an envelope when an event should
be sent to sentry. The default implementation invokes `capture_event` if the envelope contains an event and ignores all other envelopes. """
event = envelope.get_event() if event isnotNone:
self.capture_event(event) returnNone
def flush(
self,
timeout, # type: float
callback=None, # type: Optional[Any]
): # type: (...) -> None """Wait `timeout` seconds for the current events to be sent out.""" pass
# new sentries with more rate limit insights. We honor this header # no matter of the status code to update our internal rate limits.
header = response.headers.get("x-sentry-rate-limit") if header: for limit in header.split(","): try:
retry_after, categories, _ = limit.strip().split(":", 2)
retry_after = datetime.utcnow() + timedelta(
seconds=int(retry_after)
) for category in categories.split(";") or (None,):
self._disabled_until[category] = retry_after except (LookupError, ValueError): continue
# old sentries only communicate global rate limit hits via the # retry-after header on 429. This header can also be emitted on new # sentries if a proxy in front wants to globally slow things down. elif response.status == 429:
self._disabled_until[None] = datetime.utcnow() + timedelta(
seconds=self._retry.get_retry_after(response) or 60
)
# remove all items from the envelope which are over quota
envelope.items[:] = [
x for x in envelope.items ifnot self._check_disabled(x.data_category)
] ifnot envelope.items: returnNone
body = io.BytesIO() with gzip.GzipFile(fileobj=body, mode="w") as f:
envelope.serialize_into(f)
# If no transport is given, we use the http transport class if ref_transport isNone:
transport_cls = HttpTransport # type: Type[Transport] elif isinstance(ref_transport, Transport): return ref_transport elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
transport_cls = ref_transport elif callable(ref_transport): return _FunctionTransport(ref_transport) # type: ignore
# if a transport class is given only instanciate it if the dsn is not # empty or None if options["dsn"]: return transport_cls(options)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.