Skip to content

Commit 202f1cc

Browse files
committed
simplify how to wrap methods with push_node and pop_node and bug fixes
1 parent 4250a9c commit 202f1cc

File tree

5 files changed

+237
-204
lines changed

5 files changed

+237
-204
lines changed

datadog_lambda/__init__.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
from datadog_lambda.cold_start import is_cold_start, wrap_find_spec
2+
3+
if is_cold_start():
4+
import os
5+
6+
if (
7+
os.environ.get("DD_TRACE_ENABLED", "true").lower() == "true"
8+
and os.environ.get("DD_COLD_START_TRACING", "true").lower() == "true"
9+
):
10+
from sys import version_info, meta_path
11+
12+
if version_info >= (3, 7): # current implementation only support version > 3.7
13+
for importer in meta_path:
14+
try:
15+
importer.find_spec = wrap_find_spec(importer.find_spec)
16+
except:
17+
pass
18+
119
# The minor version corresponds to the Lambda layer version.
220
# E.g.,, version 0.5.0 gets packaged into layer version 5.
321
try:
@@ -6,13 +24,7 @@
624
import importlib_metadata
725

826
__version__ = importlib_metadata.version(__name__)
9-
import sys
10-
print(f"__INIT__BEFORE_INSTALL {sys.meta_path}")
11-
from datadog_lambda.module import ModuleWatchdog
12-
ModuleWatchdog.install()
13-
print(f"__INIT__AFTER_INSTALL {sys.meta_path}")
1427

15-
import os
1628
import logging
1729

1830
logger = logging.getLogger(__name__)

datadog_lambda/cold_start.py

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
import time
2+
import os
3+
from importlib.abc import Loader
4+
from typing import List
5+
16
_cold_start = True
27
_lambda_container_initialized = False
8+
root_nodes = []
9+
import_stack = []
310

411

512
def set_cold_start():
@@ -21,3 +28,176 @@ def is_cold_start():
2128
def get_cold_start_tag():
2229
"""Returns the cold start tag to be used in metrics"""
2330
return "cold_start:{}".format(str(is_cold_start()).lower())
31+
32+
33+
class ImportNode(object):
34+
def __init__(self, module_name, full_file_path, start_time_ns, end_time_ns=None):
35+
self.module_name = module_name
36+
self.full_file_path = full_file_path
37+
self.start_time_ns = start_time_ns
38+
self.end_time_ns = end_time_ns
39+
self.children = []
40+
41+
42+
def push_node(module_name, file_path):
43+
node = ImportNode(module_name, file_path, time.time_ns())
44+
if import_stack:
45+
import_stack[-1].children.append(node)
46+
import_stack.append(node)
47+
48+
49+
def pop_node(module_name):
50+
if not import_stack:
51+
return
52+
node = import_stack.pop()
53+
if node.module_name != module_name:
54+
return
55+
end_time_ns = time.time_ns()
56+
node.end_time_ns = end_time_ns
57+
if not import_stack: # import_stack empty, a root node has been found
58+
root_nodes.append(node)
59+
60+
61+
def wrap_exec_module(original_exec_module):
62+
def wrapped_method(module):
63+
should_pop = False
64+
spec = module.__spec__
65+
try:
66+
push_node(spec.name, spec.origin)
67+
should_pop = True
68+
except:
69+
pass
70+
try:
71+
return original_exec_module(module)
72+
finally:
73+
if should_pop:
74+
pop_node(spec.name)
75+
76+
return wrapped_method
77+
78+
79+
def wrap_load_module(original_load_module):
80+
def wrapped_method(fullname):
81+
should_pop = False
82+
try:
83+
push_node(fullname, fullname)
84+
should_pop = True
85+
except:
86+
pass
87+
try:
88+
return original_load_module(fullname)
89+
finally:
90+
if should_pop:
91+
pop_node(fullname)
92+
93+
return wrapped_method
94+
95+
96+
def wrap_find_spec(original_find_spec):
97+
def wrapped_find_spec(*args, **kwargs):
98+
spec = original_find_spec(*args, **kwargs)
99+
if spec is None:
100+
return None
101+
loader = getattr(spec, "loader", None)
102+
if loader is not None:
103+
if hasattr(loader, "exec_module") and hasattr(loader, "create_module"):
104+
loader.exec_module = wrap_exec_module(loader.exec_module)
105+
if hasattr(loader, "load_module"): # legacy support
106+
loader.load_module = wrap_load_module(loader.load_module)
107+
return spec
108+
109+
return wrapped_find_spec
110+
111+
112+
class ColdStartTracer(object):
113+
def __init__(
114+
self,
115+
tracer,
116+
function_name,
117+
cold_start_span_finish_time_ns,
118+
trace_ctx,
119+
min_duration_ms: int,
120+
ignored_libs: List[str] = [],
121+
):
122+
self._tracer = tracer
123+
self.function_name = function_name
124+
self.cold_start_span_finish_time_ns = cold_start_span_finish_time_ns
125+
self.min_duration_ms = min_duration_ms
126+
self.trace_ctx = trace_ctx
127+
self.ignored_libs = ignored_libs
128+
self.need_to_reactivate_context = True
129+
130+
def trace(self, root_nodes: List[ImportNode] = root_nodes):
131+
if not root_nodes:
132+
return
133+
cold_start_span_start_time_ns = root_nodes[0].start_time_ns
134+
cold_start_span = self.create_cold_start_span(cold_start_span_start_time_ns)
135+
while root_nodes:
136+
root_node = root_nodes.pop()
137+
self.trace_tree(root_node, cold_start_span)
138+
self.finish_span(cold_start_span, self.cold_start_span_finish_time_ns)
139+
140+
def trace_tree(self, import_node: ImportNode, parent_span):
141+
if (
142+
import_node.end_time_ns - import_node.start_time_ns
143+
< self.min_duration_ms * 1e6
144+
or import_node.module_name in self.ignored_libs
145+
):
146+
return
147+
148+
span = self.start_span(
149+
"aws.lambda.import", import_node.module_name, import_node.start_time_ns
150+
)
151+
tags = {
152+
"resource_names": import_node.module_name,
153+
"resource.name": import_node.module_name,
154+
"filename": import_node.full_file_path,
155+
"operation_name": self.get_operation_name(import_node.full_file_path),
156+
}
157+
span.set_tags(tags)
158+
if parent_span:
159+
span.parent_id = parent_span.span_id
160+
for child_node in import_node.children:
161+
self.trace_tree(child_node, span)
162+
self.finish_span(span, import_node.end_time_ns)
163+
164+
def create_cold_start_span(self, start_time_ns):
165+
span = self.start_span("aws.lambda.load", self.function_name, start_time_ns)
166+
tags = {
167+
"resource_names": self.function_name,
168+
"resource.name": self.function_name,
169+
"operation_name": "aws.lambda.load",
170+
}
171+
span.set_tags(tags)
172+
return span
173+
174+
def start_span(self, span_type, resource, start_time_ns):
175+
if self.need_to_reactivate_context:
176+
self._tracer.context_provider.activate(
177+
self.trace_ctx
178+
) # reactivate required after each finish() call
179+
self.need_to_reactivate_context = False
180+
span_kwargs = {
181+
"service": "aws.lambda",
182+
"resource": resource,
183+
"span_type": span_type,
184+
}
185+
span = self._tracer.trace(span_type, **span_kwargs)
186+
span.start_ns = start_time_ns
187+
return span
188+
189+
def finish_span(self, span, finish_time_ns):
190+
span.finish(finish_time_ns / 1e9)
191+
self.need_to_reactivate_context = True
192+
193+
def get_operation_name(self, filename: str):
194+
if filename is None:
195+
return "aws.lambda.import_core_module"
196+
if not isinstance(filename, str):
197+
return "aws.lambda.import"
198+
if filename.startswith("/opt/"):
199+
return "aws.lambda.import_layer"
200+
elif filename.startswith("/var/lang/"):
201+
return "aws.lambda.import_runtime"
202+
else:
203+
return "aws.lambda.import"

0 commit comments

Comments
 (0)