Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ endif::[]
//[float]
//===== Bug fixes

=== Unreleased

//[float]
//===== Features


[float]
===== Bug fixes
* Fix an issue where compressed spans would count against `transaction_max_spans` {pull}1377[#1377]
* Make sure HTTP connections are not re-used after a process fork {pull}1374[#1374]

[[release-notes-6.x]]
=== Python Agent version 6.x

Expand All @@ -39,7 +50,6 @@ endif::[]
===== Bug fixes

* Fix some context fields and metadata handling in AWS Lambda support {pull}1368[#1368]
* Fix an issue where compressed spans would count against `transaction_max_spans` {pull}1377[#1377]

[[release-notes-6.6.0]]
==== 6.6.0 - 2021-10-18
Expand Down
5 changes: 5 additions & 0 deletions elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ def _flush(self, buffer):
def start_thread(self, pid=None):
super(Transport, self).start_thread(pid=pid)
if (not self._thread or self.pid != self._thread.pid) and not self._closed:
self.handle_fork()
try:
self._thread = threading.Thread(target=self._process_queue, name="eapm event processor thread")
self._thread.daemon = True
Expand Down Expand Up @@ -321,6 +322,10 @@ def handle_transport_fail(self, exception=None, **kwargs):
logger.error("Failed to submit message: %r", message, exc_info=getattr(exception, "print_trace", True))
self.state.set_fail()

def handle_fork(self) -> None:
"""Helper method to run code after a fork has been detected"""
pass


# left for backwards compatibility
AsyncTransport = Transport
Expand Down
45 changes: 29 additions & 16 deletions elasticapm/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,21 @@


class Transport(HTTPTransportBase):
def __init__(self, url, *args, **kwargs):
def __init__(self, url: str, *args, **kwargs) -> None:
super(Transport, self).__init__(url, *args, **kwargs)
url_parts = compat.urlparse.urlparse(url)
pool_kwargs = {"cert_reqs": "CERT_REQUIRED", "ca_certs": self.ca_certs, "block": True}
if self._server_cert and url_parts.scheme != "http":
pool_kwargs.update(
{"assert_fingerprint": self.cert_fingerprint, "assert_hostname": False, "cert_reqs": ssl.CERT_NONE}
)
del pool_kwargs["ca_certs"]
elif not self._verify_server_cert and url_parts.scheme != "http":
pool_kwargs["cert_reqs"] = ssl.CERT_NONE
pool_kwargs["assert_hostname"] = False
proxies = compat.getproxies_environment()
proxy_url = proxies.get("https", proxies.get("http", None))
if proxy_url and not compat.proxy_bypass_environment(url_parts.netloc):
self.http = urllib3.ProxyManager(proxy_url, **pool_kwargs)
else:
self.http = urllib3.PoolManager(**pool_kwargs)
if url.startswith("https"):
if self._server_cert:
pool_kwargs.update(
{"assert_fingerprint": self.cert_fingerprint, "assert_hostname": False, "cert_reqs": ssl.CERT_NONE}
)
del pool_kwargs["ca_certs"]
elif not self._verify_server_cert:
pool_kwargs["cert_reqs"] = ssl.CERT_NONE
pool_kwargs["assert_hostname"] = False
self._pool_kwargs = pool_kwargs
self._http = None
self._url = url

def send(self, data):
response = None
Expand Down Expand Up @@ -113,6 +110,22 @@ def send(self, data):
if response:
response.close()

@property
def http(self) -> urllib3.PoolManager:
if not self._http:
url_parts = compat.urlparse.urlparse(self._url)
proxies = compat.getproxies_environment()
proxy_url = proxies.get("https", proxies.get("http", None))
if proxy_url and not compat.proxy_bypass_environment(url_parts.netloc):
self._http = urllib3.ProxyManager(proxy_url, **self._pool_kwargs)
else:
self._http = urllib3.PoolManager(**self._pool_kwargs)
return self._http

def handle_fork(self) -> None:
# reset http pool to avoid sharing connections with the parent process
self._http = None

def get_config(self, current_version=None, keys=None):
"""
Gets configuration from a remote APM Server
Expand Down