Skip to content

Commit 239ac76

Browse files
authored
Lambda improvements (elastic#1489)
* Fix service_name handling * Replace service.id with faas.id * Update transaction.name to use API resource * Respect service_version if configured explicitly * API gateway service.origin.name * faas.name and faas.version * Fix sqs/sns messageattributes -> headers * Add flushed=True to manual flushes in transport * Create the Client object and instrument lazily * Fix client creation timing * CHANGELOG
1 parent 515ee32 commit 239ac76

File tree

9 files changed

+143
-52
lines changed

9 files changed

+143
-52
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ endif::[]
4242
===== Bug fixes
4343
4444
* Fix Sanic integration to properly respect the `capture_body` config {pull}1485[#1485]
45+
* Lambda fixes to align with the cross-agent spec {pull}1489[#1489]
4546
4647
4748
[[release-notes-6.x]]

elasticapm/contrib/serverless/aws.py

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from typing import Optional
3939

4040
import elasticapm
41-
from elasticapm.base import Client, get_client
41+
from elasticapm.base import Client
4242
from elasticapm.conf import constants
4343
from elasticapm.utils import encoding, get_name_from_func, nested_key
4444
from elasticapm.utils.disttracing import TraceParent
@@ -66,25 +66,25 @@ def handler(event, context):
6666
return {"statusCode": r.status_code, "body": "Success!"}
6767
"""
6868

69-
def __init__(self, name: Optional[str] = None, **kwargs) -> None:
69+
def __init__(self, name: Optional[str] = None, elasticapm_client: Optional[Client] = None, **kwargs) -> None:
7070
self.name = name
7171
self.event = {}
7272
self.context = {}
7373
self.response = None
74+
self.instrumented = False
75+
self.client = elasticapm_client # elasticapm_client is intended for testing only
7476

7577
# Disable all background threads except for transport
7678
kwargs["metrics_interval"] = "0ms"
7779
kwargs["central_config"] = False
7880
kwargs["cloud_provider"] = "none"
7981
kwargs["framework_name"] = "AWS Lambda"
80-
if "service_name" not in kwargs:
82+
if "service_name" not in kwargs and "ELASTIC_APM_SERVICE_NAME" not in os.environ:
8183
kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"]
84+
if "service_version" not in kwargs and "ELASTIC_APM_SERVICE_VERSION" not in os.environ:
85+
kwargs["service_version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION")
8286

83-
self.client = get_client()
84-
if not self.client:
85-
self.client = Client(**kwargs)
86-
if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled:
87-
elasticapm.instrument()
87+
self.client_kwargs = kwargs
8888

8989
def __call__(self, func):
9090
self.name = self.name or get_name_from_func(func)
@@ -96,6 +96,21 @@ def decorated(*args, **kwds):
9696
self.event, self.context = args
9797
else:
9898
self.event, self.context = {}, {}
99+
# We delay client creation until the function is called, so that
100+
# multiple @capture_serverless instances in the same file don't create
101+
# multiple clients
102+
if not self.client:
103+
# Don't use get_client() as we may have a config mismatch due to **kwargs
104+
self.client = Client(**self.client_kwargs)
105+
if (
106+
not self.instrumented
107+
and not self.client.config.debug
108+
and self.client.config.instrument
109+
and self.client.config.enabled
110+
):
111+
elasticapm.instrument()
112+
self.instrumented = True
113+
99114
if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled:
100115
with self:
101116
self.response = func(*args, **kwds)
@@ -124,10 +139,21 @@ def __enter__(self):
124139
)
125140
if self.httpmethod: # API Gateway
126141
self.source = "api"
127-
if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"):
128-
transaction_name = "{} {}".format(self.httpmethod, os.environ["AWS_LAMBDA_FUNCTION_NAME"])
142+
if nested_key(self.event, "requestContext", "httpMethod"):
143+
# API v1
144+
resource = "/{}{}".format(
145+
nested_key(self.event, "requestContext", "stage"),
146+
nested_key(self.event, "requestContext", "resourcePath"),
147+
)
129148
else:
130-
transaction_name = self.name
149+
# API v2
150+
route_key = nested_key(self.event, "requestContext", "routeKey")
151+
route_key = f"/{route_key}" if route_key.startswith("$") else route_key.split(" ", 1)[-1]
152+
resource = "/{}{}".format(
153+
nested_key(self.event, "requestContext", "stage"),
154+
route_key,
155+
)
156+
transaction_name = "{} {}".format(self.httpmethod, resource)
131157
elif "Records" in self.event and len(self.event["Records"]) == 1:
132158
record = self.event["Records"][0]
133159
if record.get("eventSource") == "aws:s3": # S3
@@ -203,21 +229,17 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
203229
faas["coldstart"] = coldstart
204230
faas["trigger"] = {"type": "other"}
205231
faas["execution"] = self.context.aws_request_id
232+
arn = self.context.invoked_function_arn
233+
if len(arn.split(":")) > 7:
234+
arn = ":".join(arn.split(":")[:7])
235+
faas["id"] = arn
236+
faas["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME")
237+
faas["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION")
206238

207239
if self.source == "api":
208240
faas["trigger"]["type"] = "http"
209241
faas["trigger"]["request_id"] = self.event["requestContext"]["requestId"]
210-
path = (
211-
self.event["requestContext"].get("resourcePath")
212-
or self.event["requestContext"]["http"]["path"].split(self.event["requestContext"]["stage"])[-1]
213-
)
214-
service_context["origin"] = {
215-
"name": "{} {}/{}".format(
216-
self.httpmethod,
217-
self.event["requestContext"]["stage"],
218-
path,
219-
)
220-
}
242+
service_context["origin"] = {"name": self.event["requestContext"]["domainName"]}
221243
service_context["origin"]["id"] = self.event["requestContext"]["apiId"]
222244
service_context["origin"]["version"] = self.event.get("version", "1.0")
223245
cloud_context["origin"] = {}
@@ -236,13 +258,18 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
236258
cloud_context["origin"]["region"] = record["awsRegion"]
237259
cloud_context["origin"]["account"] = {"id": record["eventSourceARN"].split(":")[4]}
238260
cloud_context["origin"]["provider"] = "aws"
239-
message_context["queue"] = service_context["origin"]["name"]
261+
message_context["queue"] = {"name": service_context["origin"]["name"]}
240262
if "SentTimestamp" in record["attributes"]:
241263
message_context["age"] = {"ms": int((time.time() * 1000) - int(record["attributes"]["SentTimestamp"]))}
242264
if self.client.config.capture_body in ("transactions", "all") and "body" in record:
243265
message_context["body"] = record["body"]
244266
if self.client.config.capture_headers and record.get("messageAttributes"):
245-
message_context["headers"] = record["messageAttributes"]
267+
headers = {}
268+
for k, v in record["messageAttributes"].items():
269+
if v and v.get("stringValue"):
270+
headers[k] = v.get("stringValue")
271+
if headers:
272+
message_context["headers"] = headers
246273
elif self.source == "sns":
247274
record = self.event["Records"][0]
248275
faas["trigger"]["type"] = "pubsub"
@@ -256,7 +283,7 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
256283
cloud_context["origin"]["region"] = record["Sns"]["TopicArn"].split(":")[3]
257284
cloud_context["origin"]["account_id"] = record["Sns"]["TopicArn"].split(":")[4]
258285
cloud_context["origin"]["provider"] = "aws"
259-
message_context["queue"] = service_context["origin"]["name"]
286+
message_context["queue"] = {"name": service_context["origin"]["name"]}
260287
if "Timestamp" in record["Sns"]:
261288
message_context["age"] = {
262289
"ms": int(
@@ -270,7 +297,12 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
270297
if self.client.config.capture_body in ("transactions", "all") and "Message" in record["Sns"]:
271298
message_context["body"] = record["Sns"]["Message"]
272299
if self.client.config.capture_headers and record["Sns"].get("MessageAttributes"):
273-
message_context["headers"] = record["Sns"]["MessageAttributes"]
300+
headers = {}
301+
for k, v in record["Sns"]["MessageAttributes"].items():
302+
if v and v.get("Type") == "String":
303+
headers[k] = v.get("Value")
304+
if headers:
305+
message_context["headers"] = headers
274306
elif self.source == "s3":
275307
record = self.event["Records"][0]
276308
faas["trigger"]["type"] = "datasource"
@@ -291,11 +323,7 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
291323
"name": os.environ.get("AWS_EXECUTION_ENV"),
292324
"version": platform.python_version(),
293325
}
294-
arn = self.context.invoked_function_arn
295-
if len(arn.split(":")) > 7:
296-
arn = ":".join(arn.split(":")[:7])
297-
metadata["service"]["id"] = arn
298-
metadata["service"]["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION")
326+
metadata["service"]["version"] = self.client.config.service_version
299327
metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")}
300328
# This is the one piece of metadata that requires deep merging. We add it manually
301329
# here to avoid having to deep merge in _transport.add_metadata()
@@ -315,7 +343,7 @@ def set_metadata_and_context(self, coldstart: bool) -> None:
315343
# faas doesn't actually belong in context, but we handle this in to_dict
316344
elasticapm.set_context(faas, "faas")
317345
if message_context:
318-
elasticapm.set_context(service_context, "message")
346+
elasticapm.set_context(message_context, "message")
319347
self.client._transport.add_metadata(metadata)
320348

321349

elasticapm/transport/base.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ def _process_queue(self):
153153

154154
queue_size = 0 if buffer.fileobj is None else buffer.fileobj.tell()
155155

156-
if flush:
156+
forced_flush = flush
157+
if forced_flush:
157158
logger.debug("forced flush")
158159
elif timed_out or timeout == 0:
159160
# update last flush time, as we might have waited for a non trivial amount of time in
@@ -172,7 +173,12 @@ def _process_queue(self):
172173
flush = True
173174
if flush:
174175
if buffer_written:
175-
self._flush(buffer)
176+
self._flush(buffer, forced_flush=forced_flush)
177+
elif forced_flush and "/localhost:" in self.client.config.server_url:
178+
# No data on buffer, but due to manual flush we should send
179+
# an empty payload with flushed=true query param, but only
180+
# to a local APM server (or lambda extension)
181+
self.send(None, flushed=True)
176182
self._last_flush = timeit.default_timer()
177183
buffer = self._init_buffer()
178184
buffer_written = False
@@ -248,7 +254,7 @@ def _init_event_queue(self, chill_until, max_chill_time):
248254
else:
249255
return _queue.Queue(maxsize=10000)
250256

251-
def _flush(self, buffer):
257+
def _flush(self, buffer, forced_flush=False):
252258
"""
253259
Flush the queue. This method should only be called from the event processing queue
254260
:return: None
@@ -262,7 +268,7 @@ def _flush(self, buffer):
262268
# StringIO on Python 2 does not have getbuffer, so we need to fall back to getvalue
263269
data = fileobj.getbuffer() if hasattr(fileobj, "getbuffer") else fileobj.getvalue()
264270
try:
265-
self.send(data)
271+
self.send(data, forced_flush=forced_flush)
266272
self.handle_transport_success()
267273
except Exception as e:
268274
self.handle_transport_fail(e)
@@ -279,7 +285,7 @@ def start_thread(self, pid=None):
279285
except RuntimeError:
280286
pass
281287

282-
def send(self, data):
288+
def send(self, data, forced_flush=False):
283289
"""
284290
You need to override this to do something with the actual
285291
data. Usually - this is sending to a server

elasticapm/transport/http.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ def __init__(self, url: str, *args, **kwargs) -> None:
7070
self._http = None
7171
self._url = url
7272

73-
def send(self, data):
73+
def send(self, data, forced_flush=False):
7474
response = None
7575

7676
headers = self._headers.copy() if self._headers else {}
7777
headers.update(self.auth_headers)
7878

7979
url = self._url
80+
if forced_flush:
81+
url = f"{url}?flushed=true"
8082
try:
8183
try:
8284
response = self.http.urlopen(

elasticapm/transport/http_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(
5959
self._server_info_url = "".join((base, constants.SERVER_INFO_PATH, tail))
6060
super(HTTPTransportBase, self).__init__(client, compress_level=compress_level, **kwargs)
6161

62-
def send(self, data):
62+
def send(self, data, forced_flush=False):
6363
"""
6464
Sends a request to a remote APM Server using HTTP POST.
6565

tests/contrib/serverless/aws_sns_test_data.json

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,20 @@
1515
"Signature": "YBGjMCe1m0QQ0DIWq4gZLy3/0bEyXhLPZJzeo4JYMa2P9ercshfn9s+x9nqd6HSYfO3RG0ebCmzxddgO8UCmaddXbhhMRWYjsIDv3+OvUitG8+bFqvpH/rQVHdCEWla5l+NDcye6d2cl9zuYFliTIFUsBmFcqbiroyZbIIHOczUpxNKK9oQcAXU6RgIl6y30DBgxYmzdMm4FMXPpden84v0LwVOyfqVm2gmeMnlccEOB0TRMe8sLsv7OfWLA3GBl3b14MOUZfvUz4Btb15ssCq++QVHoTQWZnbJ5dA7P3ljMauQCagub0Zefx7uUmWAlczxe/5kREJt8rEfl+pN7Mg==",
1616
"SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem",
1717
"UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:268121251715:basepiwstesttopic:4763d52a-56d1-4dc5-99eb-ffb4315587af",
18-
"MessageAttributes": {}
18+
"MessageAttributes": {
19+
"Greeting": {
20+
"Type": "Binary",
21+
"Value": "SGVsbG8sIFdvcmxkIQ=="
22+
},
23+
"Population": {
24+
"Type": "String",
25+
"Value": "1250800"
26+
},
27+
"City": {
28+
"Type": "String",
29+
"Value": "Any City"
30+
}
31+
}
1932
}
2033
}
2134
]

tests/contrib/serverless/aws_sqs_test_data.json

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,26 @@
1010
"SenderId": "268121251715",
1111
"ApproximateFirstReceiveTimestamp": "1626973700075"
1212
},
13-
"messageAttributes": {},
13+
"messageAttributes": {
14+
"Greeting": {
15+
"binaryValue": "SGVsbG8sIFdvcmxkIQ==",
16+
"stringListValues": [],
17+
"binaryListValues": [],
18+
"dataType": "Binary"
19+
},
20+
"Population": {
21+
"stringValue": "1250800",
22+
"stringListValues": [],
23+
"binaryListValues": [],
24+
"dataType": "Number"
25+
},
26+
"City": {
27+
"stringValue": "Any City",
28+
"stringListValues": [],
29+
"binaryListValues": [],
30+
"dataType": "String"
31+
}
32+
},
1433
"md5OfBody": "5eb63bbbe01eeed093cb22bb8f5acdc3",
1534
"eventSource": "aws:sqs",
1635
"eventSourceARN": "arn:aws:sqs:us-east-1:268121251715:testqueue",

0 commit comments

Comments
 (0)