- Notifications
You must be signed in to change notification settings - Fork 569
Add async transport #4614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async transport #4614
Changes from 40 commits
9f24136 001f36c 401b1bc 3f43d8f 15fa295 ef780f3 f63e46f 1804271 11da869 779a0d6 0895d23 bbf426b 744dc8a fcc8040 9a43d9b b5eda0e 9e380b8 ee44621 d9f7383 d2e647b 859a0e2 4a58ce7 cbecde7 c8bb55a 05a7de7 38246d0 8b226cb 823215e 4eed4fd afd494d fcc7ac3 f659514 8c542ce 30dde67 3392e0e 6c85500 ae5a864 9c537e6 f7554b2 6cb72ad 9171c5d 111861b 4744817 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -6,6 +6,7 @@ | |
| import socket | ||
| import ssl | ||
| import time | ||
| import asyncio | ||
| from datetime import datetime, timedelta, timezone | ||
| from collections import defaultdict | ||
| from urllib.request import getproxies | ||
| | @@ -17,18 +18,27 @@ | |
| | ||
| try: | ||
| import httpcore | ||
| except ImportError: | ||
| httpcore = None # type: ignore | ||
| | ||
| try: | ||
| import h2 # noqa: F401 | ||
| | ||
| HTTP2_ENABLED = True | ||
| HTTP2_ENABLED = httpcore is not None | ||
| except ImportError: | ||
| HTTP2_ENABLED = False | ||
| | ||
| try: | ||
| ASYNC_TRANSPORT_ENABLED = httpcore is not None | ||
| except ImportError: | ||
sl0thentr0py marked this conversation as resolved. Show resolved Hide resolved | ||
| ASYNC_TRANSPORT_ENABLED = False | ||
cursor[bot] marked this conversation as resolved. Show resolved Hide resolved antonpirker marked this conversation as resolved. Show resolved Hide resolved There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Unnecessary ImportError HandlingThe Locations (1) Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The import throwing the error happens in the next PR, #4615, apologies for the slightly unclean separation here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Redundant ImportError HandlingThe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Redundant Import Handling in Async TransportThe | ||
| | ||
| import urllib3 | ||
| import certifi | ||
| | ||
| from sentry_sdk.consts import EndpointType | ||
| from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions | ||
| from sentry_sdk.worker import BackgroundWorker | ||
| from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker | ||
| from sentry_sdk.envelope import Envelope, Item, PayloadRef | ||
| | ||
| from typing import TYPE_CHECKING | ||
| | @@ -173,7 +183,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: | |
| Transport.__init__(self, options) | ||
| assert self.parsed_dsn is not None | ||
| self.options: Dict[str, Any] = options | ||
| self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) | ||
| self._worker = self._create_worker(options) | ||
| self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) | ||
| self._disabled_until: Dict[Optional[str], datetime] = {} | ||
| # We only use this Retry() class for the `get_retry_after` method it exposes | ||
| | @@ -224,6 +234,19 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: | |
| elif self._compression_algo == "br": | ||
| self._compression_level = 4 | ||
| | ||
| def _create_worker(self, options: dict[str, Any]) -> Worker: | ||
| Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
| ||
| async_enabled = options.get("_experiments", {}).get("transport_async", False) | ||
antonpirker marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| try: | ||
| asyncio.get_running_loop() | ||
| worker_cls = ( | ||
| AsyncWorker | ||
| if async_enabled and ASYNC_TRANSPORT_ENABLED | ||
| else BackgroundWorker | ||
| ) | ||
| except RuntimeError: | ||
| worker_cls = BackgroundWorker | ||
| return worker_cls(queue_size=options["transport_queue_size"]) | ||
cursor[bot] marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| | ||
| def record_lost_event( | ||
| self: Self, | ||
| reason: str, | ||
| | @@ -567,6 +590,240 @@ def flush( | |
| self._worker.flush(timeout, callback) | ||
| | ||
| | ||
| if not ASYNC_TRANSPORT_ENABLED: | ||
| # Sorry, no AsyncHttpTransport for you | ||
| class AsyncHttpTransport(BaseHttpTransport): | ||
| def __init__(self: Self, options: Dict[str, Any]) -> None: | ||
| super().__init__(options) | ||
| logger.warning( | ||
| "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." | ||
| ||
| ) | ||
| | ||
| else: | ||
| | ||
| class AsyncHttpTransport(HttpTransportCore): # type: ignore | ||
| def __init__(self: Self, options: Dict[str, Any]) -> None: | ||
| super().__init__(options) | ||
| # Requires event loop at init time | ||
| self.loop = asyncio.get_running_loop() | ||
| self.background_tasks: set[asyncio.Task[None]] = set() | ||
| | ||
| def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: | ||
| return next( | ||
| ( | ||
| val.decode("ascii") | ||
| for key, val in response.headers | ||
| if key.decode("ascii").lower() == header | ||
| ), | ||
| None, | ||
| ) | ||
| | ||
| async def _send_envelope(self: Self, envelope: Envelope) -> None: | ||
| _prepared_envelope = self._prepare_envelope(envelope) | ||
| if _prepared_envelope is not None: | ||
| envelope, body, headers = _prepared_envelope | ||
| await self._send_request( | ||
| body.getvalue(), | ||
| headers=headers, | ||
| endpoint_type=EndpointType.ENVELOPE, | ||
| envelope=envelope, | ||
| ) | ||
| return None | ||
| | ||
| async def _send_request( | ||
| self: Self, | ||
| body: bytes, | ||
| headers: Dict[str, str], | ||
| endpoint_type: EndpointType, | ||
| envelope: Optional[Envelope], | ||
| ) -> None: | ||
| self._update_headers(headers) | ||
| try: | ||
| response = await self._request( | ||
| "POST", | ||
| endpoint_type, | ||
| body, | ||
| headers, | ||
| ) | ||
| except Exception: | ||
| self._handle_request_error(envelope=envelope, loss_reason="network") | ||
| raise | ||
| try: | ||
| self._handle_response(response=response, envelope=envelope) | ||
| finally: | ||
| await response.aclose() | ||
| | ||
| async def _request( # type: ignore[override] | ||
| self: Self, | ||
| method: str, | ||
| endpoint_type: EndpointType, | ||
| body: Any, | ||
| headers: Mapping[str, str], | ||
| ) -> httpcore.Response: | ||
| return await self._pool.request( | ||
| method, | ||
| self._auth.get_api_url(endpoint_type), | ||
| content=body, | ||
| headers=headers, # type: ignore | ||
| extensions={ | ||
| "timeout": { | ||
| "pool": self.TIMEOUT, | ||
| "connect": self.TIMEOUT, | ||
| "write": self.TIMEOUT, | ||
| "read": self.TIMEOUT, | ||
| } | ||
| }, | ||
| ) | ||
| | ||
| async def _flush_client_reports(self: Self, force: bool = False) -> None: | ||
| Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think this does not need to be Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. merging it in, we will take care of this when we do the mixin Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is async because otherwise the worker process_callback needs the case distinction between coroutines and sync functions. See: | ||
| client_report = self._fetch_pending_client_report(force=force, interval=60) | ||
| if client_report is not None: | ||
| self.capture_envelope(Envelope(items=[client_report])) | ||
| | ||
| async def _capture_envelope(self: Self, envelope: Envelope) -> None: | ||
| async def send_envelope_wrapper() -> None: | ||
| with capture_internal_exceptions(): | ||
| await self._send_envelope(envelope) | ||
| await self._flush_client_reports() | ||
| | ||
| if not self._worker.submit(send_envelope_wrapper): | ||
| self.on_dropped_event("full_queue") | ||
| for item in envelope.items: | ||
| self.record_lost_event("queue_overflow", item=item) | ||
| | ||
| def capture_envelope(self: Self, envelope: Envelope) -> None: | ||
| # Synchronous entry point | ||
| try: | ||
| asyncio.get_running_loop() | ||
| # We are on the main thread running the event loop | ||
| task = asyncio.create_task(self._capture_envelope(envelope)) | ||
| self.background_tasks.add(task) | ||
| task.add_done_callback(self.background_tasks.discard) | ||
| except RuntimeError: | ||
| # We are in a background thread, not running an event loop, | ||
| # have to launch the task on the loop in a threadsafe way. | ||
| if self.loop and self.loop.is_running(): | ||
| asyncio.run_coroutine_threadsafe( | ||
| self._capture_envelope(envelope), | ||
| self.loop, | ||
| ) | ||
antonpirker marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| else: | ||
| # The event loop is no longer running | ||
| logger.warning("Async Transport is not running in an event loop.") | ||
| self.on_dropped_event("internal_sdk_error") | ||
| for item in envelope.items: | ||
| self.record_lost_event("internal_sdk_error", item=item) | ||
| | ||
| def flush( # type: ignore[override] | ||
| self: Self, | ||
| timeout: float, | ||
| callback: Optional[Callable[[int, float], None]] = None, | ||
| ) -> Optional[asyncio.Task[None]]: | ||
| logger.debug("Flushing HTTP transport") | ||
| | ||
| if timeout > 0: | ||
| self._worker.submit(lambda: self._flush_client_reports(force=True)) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Method Not Awaited in Synchronous ContextThe Locations (1)There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Method Not Awaited Causes Flush FailureThe | ||
| return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] | ||
| return None | ||
antonpirker marked this conversation as resolved. Show resolved Hide resolved | ||
| | ||
| def _get_pool_options(self: Self) -> Dict[str, Any]: | ||
| options: Dict[str, Any] = { | ||
| "http2": False, # no HTTP2 for now | ||
| "retries": 3, | ||
| } | ||
| | ||
| socket_options = ( | ||
| self.options["socket_options"] | ||
| if self.options["socket_options"] is not None | ||
| else [] | ||
| ) | ||
| | ||
| used_options = {(o[0], o[1]) for o in socket_options} | ||
| for default_option in KEEP_ALIVE_SOCKET_OPTIONS: | ||
| if (default_option[0], default_option[1]) not in used_options: | ||
| socket_options.append(default_option) | ||
| | ||
| options["socket_options"] = socket_options | ||
| | ||
| ssl_context = ssl.create_default_context() | ||
| ssl_context.load_verify_locations( | ||
| self.options["ca_certs"] # User-provided bundle from the SDK init | ||
| or os.environ.get("SSL_CERT_FILE") | ||
| or os.environ.get("REQUESTS_CA_BUNDLE") | ||
| or certifi.where() | ||
| ) | ||
| cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") | ||
| key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") | ||
| if cert_file is not None: | ||
| ssl_context.load_cert_chain(cert_file, key_file) | ||
| | ||
| options["ssl_context"] = ssl_context | ||
| | ||
| return options | ||
| | ||
| def _make_pool( | ||
| self: Self, | ||
| ) -> Union[ | ||
| httpcore.AsyncSOCKSProxy, | ||
| httpcore.AsyncHTTPProxy, | ||
| httpcore.AsyncConnectionPool, | ||
| ]: | ||
| if self.parsed_dsn is None: | ||
| raise ValueError("Cannot create HTTP-based transport without valid DSN") | ||
| proxy = None | ||
| no_proxy = self._in_no_proxy(self.parsed_dsn) | ||
| | ||
| # try HTTPS first | ||
| https_proxy = self.options["https_proxy"] | ||
| if self.parsed_dsn.scheme == "https" and (https_proxy != ""): | ||
| proxy = https_proxy or (not no_proxy and getproxies().get("https")) | ||
| | ||
| # maybe fallback to HTTP proxy | ||
| http_proxy = self.options["http_proxy"] | ||
| if not proxy and (http_proxy != ""): | ||
| proxy = http_proxy or (not no_proxy and getproxies().get("http")) | ||
| | ||
| opts = self._get_pool_options() | ||
| | ||
| if proxy: | ||
| proxy_headers = self.options["proxy_headers"] | ||
| if proxy_headers: | ||
| opts["proxy_headers"] = proxy_headers | ||
| | ||
| if proxy.startswith("socks"): | ||
| try: | ||
| if "socket_options" in opts: | ||
| socket_options = opts.pop("socket_options") | ||
| if socket_options: | ||
| logger.warning( | ||
| "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." | ||
| ) | ||
| return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) | ||
| except RuntimeError: | ||
| logger.warning( | ||
| "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", | ||
| proxy, | ||
| ) | ||
| else: | ||
| return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) | ||
| | ||
| return httpcore.AsyncConnectionPool(**opts) | ||
| | ||
| def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore | ||
| | ||
| logger.debug("Killing HTTP transport") | ||
| self._worker.kill() | ||
| for task in self.background_tasks: | ||
| task.cancel() | ||
| self.background_tasks.clear() | ||
| try: | ||
| # Return the pool cleanup task so caller can await it if needed | ||
| return self.loop.create_task(self._pool.aclose()) # type: ignore | ||
| except RuntimeError: | ||
| logger.warning("Event loop not running, aborting kill.") | ||
| return None | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Transport Flush IssuesThe Furthermore, the Lastly, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: AsyncHttpTransport Kill Method Fails Resource CleanupThe
Additional Locations (1) Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove the background task set, it should not be necessary anymore I think | ||
| | ||
antonpirker marked this conversation as resolved. Show resolved Hide resolved cursor[bot] marked this conversation as resolved. Show resolved Hide resolved | ||
| | ||
| class HttpTransport(BaseHttpTransport): | ||
| if TYPE_CHECKING: | ||
| _pool: Union[PoolManager, ProxyManager] | ||
| | @@ -812,11 +1069,18 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: | |
| ref_transport = options["transport"] | ||
| | ||
| use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) | ||
| | ||
| use_async_transport = options.get("_experiments", {}).get("transport_async", False) | ||
| # By default, we use the http transport class | ||
| transport_cls: Type[Transport] = ( | ||
| Http2Transport if use_http2_transport else HttpTransport | ||
| ) | ||
| if use_async_transport: | ||
| ||
| try: | ||
| asyncio.get_running_loop() | ||
| transport_cls: Type[Transport] = AsyncHttpTransport | ||
| except RuntimeError: | ||
| # No event loop running, fall back to sync transport | ||
| logger.warning("No event loop running, falling back to sync transport.") | ||
| transport_cls = Http2Transport if use_http2_transport else HttpTransport | ||
| else: | ||
| transport_cls = Http2Transport if use_http2_transport else HttpTransport | ||
| | ||
| if isinstance(ref_transport, Transport): | ||
| return ref_transport | ||
| | ||
Uh oh!
There was an error while loading. Please reload this page.