Skip to content

Commit f5b957e

Browse files
authored
Rework Cloudwatch log group tags cache logic (DataDog#786)
- Add client config for limiting max retries - Add a separate tags cache file per log group - Init the cache differently - Allow individual cache file updates w/o having a lock file
1 parent 94d776b commit f5b957e

16 files changed

+528
-65
lines changed

aws/logs_monitoring/caching/cloudwatch_log_group_cache.py

Lines changed: 138 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,41 @@
1+
import json
2+
import logging
13
import os
4+
from random import randint
5+
from time import time
6+
27
import boto3
3-
from caching.base_tags_cache import BaseTagsCache
8+
from botocore.config import Config
49
from caching.common import sanitize_aws_tag_string
5-
from telemetry import send_forwarder_internal_metrics
610
from settings import (
7-
DD_S3_LOG_GROUP_CACHE_FILENAME,
8-
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,
11+
DD_S3_BUCKET_NAME,
12+
DD_S3_LOG_GROUP_CACHE_DIRNAME,
13+
DD_TAGS_CACHE_TTL_SECONDS,
914
)
15+
from telemetry import send_forwarder_internal_metrics
1016

1117

12-
class CloudwatchLogGroupTagsCache(BaseTagsCache):
18+
class CloudwatchLogGroupTagsCache:
1319
def __init__(self, prefix):
14-
super().__init__(
15-
prefix, DD_S3_LOG_GROUP_CACHE_FILENAME, DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME
20+
self.cache_dirname = DD_S3_LOG_GROUP_CACHE_DIRNAME
21+
self.cache_ttl_seconds = DD_TAGS_CACHE_TTL_SECONDS
22+
self.bucket_name = DD_S3_BUCKET_NAME
23+
self.cache_prefix = prefix
24+
self.tags_by_log_group = {}
25+
# We need to use the standard retry mode for the Cloudwatch Logs client that defaults to 3 retries
26+
self.cloudwatch_logs_client = boto3.client(
27+
"logs", config=Config(retries={"mode": "standard"})
1628
)
17-
self.cloudwatch_logs_client = boto3.client("logs")
18-
19-
def should_fetch_tags(self):
20-
return os.environ.get("DD_FETCH_LOG_GROUP_TAGS", "false").lower() == "true"
29+
self.s3_client = boto3.client("s3")
2130

22-
def build_tags_cache(self):
23-
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions
24-
25-
Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true
26-
27-
Returns:
28-
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
29-
"""
30-
new_tags = {}
31-
for log_group in self.tags_by_id.keys():
32-
log_group_tags = self._get_log_group_tags(log_group)
33-
# If we didn't get back log group tags we'll use the locally cached ones if they exist
34-
# This avoids losing tags on a failed api call
35-
if log_group_tags is None:
36-
log_group_tags = self.tags_by_id.get(log_group, [])
37-
new_tags[log_group] = log_group_tags
38-
39-
self.logger.debug(
40-
"All tags in Cloudwatch Log Groups refresh: {}".format(new_tags)
31+
self.logger = logging.getLogger()
32+
self.logger.setLevel(
33+
logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())
4134
)
42-
return True, new_tags
35+
36+
# Initialize the cache
37+
if self._should_fetch_tags():
38+
self._build_tags_cache()
4339

4440
def get(self, log_group):
4541
"""Get the tags for the Cloudwatch Log Group from the cache
@@ -53,34 +49,128 @@ def get(self, log_group):
5349
Returns:
5450
log_group_tags (str[]): the list of "key:value" Datadog tag strings
5551
"""
56-
if self._is_expired():
57-
send_forwarder_internal_metrics("cw_log_group_tags_cache_expired")
58-
self.logger.debug("Local cache expired, fetching cache from S3")
59-
self._refresh()
60-
61-
log_group_tags = self.tags_by_id.get(log_group, None)
62-
if log_group_tags is None:
63-
# If the custom tag fetch env var is not set to true do not fetch
64-
if not self.should_fetch_tags():
65-
self.logger.debug(
66-
"Not fetching custom tags because the env variable DD_FETCH_LOG_GROUP_TAGS is "
67-
"not set to true"
52+
# If the custom tag fetch env var is not set to true do not fetch tags
53+
if not self._should_fetch_tags():
54+
self.logger.debug(
55+
"Not fetching custom tags because the env variable DD_FETCH_LOG_GROUP_TAGS is "
56+
"not set to true"
57+
)
58+
return []
59+
60+
return self._fetch_log_group_tags(log_group)
61+
62+
def _should_fetch_tags(self):
63+
return os.environ.get("DD_FETCH_LOG_GROUP_TAGS", "false").lower() == "true"
64+
65+
def _build_tags_cache(self):
66+
try:
67+
prefix = self._get_cache_file_prefix()
68+
response = self.s3_client.list_objects_v2(
69+
Bucket=DD_S3_BUCKET_NAME, Prefix=prefix
70+
)
71+
cache_files = [content["Key"] for content in response.get("Contents", [])]
72+
for cache_file in cache_files:
73+
log_group_tags, last_modified = self._get_log_group_tags_from_cache(
74+
cache_file
6875
)
69-
return []
70-
log_group_tags = self._get_log_group_tags(log_group) or []
71-
self.tags_by_id[log_group] = log_group_tags
76+
if log_group_tags and not self._is_expired(last_modified):
77+
log_group = cache_file.split("/")[-1].split(".")[0]
78+
self.tags_by_log_group[log_group] = {
79+
"tags": log_group_tags,
80+
"last_modified": last_modified,
81+
}
82+
self.logger.debug(
83+
f"loggroup_tags_cache initialized successfully {self.tags_by_log_group}"
84+
)
85+
except Exception:
86+
self.logger.exception("failed to build log group tags cache", exc_info=True)
87+
88+
def _fetch_log_group_tags(self, log_group):
89+
# first, check in-memory cache
90+
log_group_tags_struct = self.tags_by_log_group.get(log_group, None)
91+
if log_group_tags_struct and not self._is_expired(
92+
log_group_tags_struct.get("last_modified", None)
93+
):
94+
return log_group_tags_struct.get("tags", [])
95+
96+
# then, check cache file, update and return
97+
cache_file_name = self._get_cache_file_name(log_group)
98+
log_group_tags, last_modified = self._get_log_group_tags_from_cache(
99+
cache_file_name
100+
)
101+
if log_group_tags and not self._is_expired(last_modified):
102+
self.tags_by_log_group[log_group] = {
103+
"tags": log_group_tags,
104+
"last_modified": time(),
105+
}
106+
return log_group_tags
107+
108+
# finally, make an api call, update and return
109+
log_group_tags = self._get_log_group_tags(log_group) or []
110+
self._update_log_group_tags_cache(log_group, log_group_tags)
111+
self.tags_by_log_group[log_group] = {
112+
"tags": log_group_tags,
113+
"last_modified": time(),
114+
}
72115

73116
return log_group_tags
74117

118+
def _get_log_group_tags_from_cache(self, cache_file_name):
119+
try:
120+
response = self.s3_client.get_object(
121+
Bucket=self.bucket_name, Key=cache_file_name
122+
)
123+
tags_cache = json.loads(response.get("Body").read().decode("utf-8"))
124+
last_modified_unix_time = int(response.get("LastModified").timestamp())
125+
except Exception:
126+
send_forwarder_internal_metrics("s3_cache_fetch_failure")
127+
self.logger.exception(
128+
"Failed to get log group tags from cache", exc_info=True
129+
)
130+
return None, -1
131+
132+
return tags_cache, last_modified_unix_time
133+
134+
def _update_log_group_tags_cache(self, log_group, tags):
135+
cache_file_name = self._get_cache_file_name(log_group)
136+
try:
137+
self.s3_client.put_object(
138+
Bucket=self.bucket_name,
139+
Key=cache_file_name,
140+
Body=(bytes(json.dumps(tags).encode("UTF-8"))),
141+
)
142+
except Exception:
143+
send_forwarder_internal_metrics("s3_cache_write_failure")
144+
self.logger.exception(
145+
"Failed to update log group tags cache", exc_info=True
146+
)
147+
148+
def _is_expired(self, last_modified):
149+
if not last_modified:
150+
return True
151+
152+
# add a random number of seconds to avoid having all tags refetched at the same time
153+
earliest_time_to_refetch_tags = (
154+
last_modified + self.cache_ttl_seconds + randint(1, 100)
155+
)
156+
return time() > earliest_time_to_refetch_tags
157+
158+
def _get_cache_file_name(self, log_group):
159+
log_group_name = log_group.replace("/", "_")
160+
return f"{self._get_cache_file_prefix()}/{log_group_name}.json"
161+
162+
def _get_cache_file_prefix(self):
163+
return f"{self.cache_dirname}/{self.cache_prefix}"
164+
75165
def _get_log_group_tags(self, log_group):
76166
response = None
77167
try:
78168
send_forwarder_internal_metrics("list_tags_log_group_api_call")
79169
response = self.cloudwatch_logs_client.list_tags_log_group(
80170
logGroupName=log_group
81171
)
82-
except Exception as e:
83-
self.logger.exception(f"Failed to get log group tags due to {e}")
172+
except Exception:
173+
self.logger.exception("Failed to get log group tags", exc_info=True)
84174
formatted_tags = None
85175
if response is not None:
86176
formatted_tags = [

aws/logs_monitoring/settings.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,13 @@ def __init__(self, name, pattern, placeholder):
258258
# These default cache names remain unchanged so we can get existing cache data for these
259259
DD_S3_CACHE_FILENAME = "cache.json"
260260
DD_S3_CACHE_LOCK_FILENAME = "cache.lock"
261-
DD_S3_LOG_GROUP_CACHE_FILENAME = "log-group-cache.json"
262-
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME = "log-group-cache.lock"
263261
DD_S3_STEP_FUNCTIONS_CACHE_FILENAME = "step-functions-cache.json"
264262
DD_S3_STEP_FUNCTIONS_CACHE_LOCK_FILENAME = "step-functions-cache.lock"
265263
DD_S3_TAGS_CACHE_FILENAME = "s3-cache.json"
266264
DD_S3_TAGS_CACHE_LOCK_FILENAME = "s3-cache.lock"
267265

266+
DD_S3_LOG_GROUP_CACHE_DIRNAME = "log-group-cache"
267+
268268
DD_TAGS_CACHE_TTL_SECONDS = int(get_env_var("DD_TAGS_CACHE_TTL_SECONDS", default=300))
269269
DD_S3_CACHE_LOCK_TTL_SECONDS = 60
270270
GET_RESOURCES_LAMBDA_FILTER = "lambda"

aws/logs_monitoring/template.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,9 @@ Resources:
742742
- Fn::Sub: "arn:aws:s3:::${DdForwarderExistingBucketName}"
743743
Condition:
744744
StringLike:
745-
s3:prefix: "retry/*"
745+
s3:prefix:
746+
- "retry/*"
747+
- "log-group-cache/*"
746748
Effect: Allow
747749
- Ref: AWS::NoValue
748750
- Action:

aws/logs_monitoring/tests/run_unit_tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22

33
export DD_API_KEY=11111111111111111111111111111111
44
export DD_ADDITIONAL_TARGET_LAMBDAS=ironmaiden,megadeth
5+
export DD_S3_BUCKET_NAME=dd-s3-bucket
56
python3 -m unittest discover .

aws/logs_monitoring/tests/test_awslogs_handler.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535

3636

3737
class TestAWSLogsHandler(unittest.TestCase):
38-
def test_awslogs_handler_rds_postgresql(self):
38+
@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
39+
def test_awslogs_handler_rds_postgresql(self, mock_cache_init):
3940
event = {
4041
"awslogs": {
4142
"data": base64.b64encode(
@@ -63,6 +64,7 @@ def test_awslogs_handler_rds_postgresql(self):
6364
}
6465
context = None
6566
metadata = {"ddsource": "postgresql", "ddtags": "env:dev"}
67+
mock_cache_init.return_value = None
6668
cache_layer = CacheLayer("")
6769
cache_layer._cloudwatch_log_group_cache.get = MagicMock(
6870
return_value=["test_tag_key:test_tag_value"]
@@ -71,10 +73,12 @@ def test_awslogs_handler_rds_postgresql(self):
7173
verify_as_json(list(awslogs_handler(event, context, metadata, cache_layer)))
7274
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
7375

76+
@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
7477
@patch("caching.cloudwatch_log_group_cache.send_forwarder_internal_metrics")
7578
def test_awslogs_handler_step_functions_tags_added_properly(
7679
self,
7780
mock_forward_metrics,
81+
mock_cache_init,
7882
):
7983
event = {
8084
"awslogs": {
@@ -106,10 +110,12 @@ def test_awslogs_handler_step_functions_tags_added_properly(
106110
context = None
107111
metadata = {"ddsource": "postgresql", "ddtags": "env:dev"}
108112
mock_forward_metrics.side_effect = MagicMock()
113+
mock_cache_init.return_value = None
109114
cache_layer = CacheLayer("")
110115
cache_layer._step_functions_cache.get = MagicMock(
111116
return_value=["test_tag_key:test_tag_value"]
112117
)
118+
cache_layer._cloudwatch_log_group_cache.get = MagicMock()
113119

114120
verify_as_json(list(awslogs_handler(event, context, metadata, cache_layer)))
115121
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))

aws/logs_monitoring/tests/test_cloudtrail_s3.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,12 @@ def get_test_data_gzipped(self) -> io.BytesIO:
9797
gzip.compress(json.dumps(copy.deepcopy(test_data)).encode("utf-8"))
9898
)
9999

100+
@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
100101
@patch("caching.base_tags_cache.boto3")
101102
@patch("steps.handlers.s3_handler.boto3")
102103
@patch("lambda_function.boto3")
103104
def test_s3_cloudtrail_pasing_and_enrichment(
104-
self, lambda_boto3, parsing_boto3, cache_boto3
105+
self, lambda_boto3, parsing_boto3, cache_boto3, mock_cache_init
105106
):
106107
context = Context()
107108
boto3 = parsing_boto3.client()
@@ -117,6 +118,7 @@ def test_s3_cloudtrail_pasing_and_enrichment(
117118
},
118119
}
119120
}
121+
mock_cache_init.return_value = None
120122
cache_layer = CacheLayer("")
121123
cache_layer._s3_tags_cache.get = MagicMock(return_value=[])
122124
cache_layer._lambda_cache.get = MagicMock(return_value=[])

0 commit comments

Comments
 (0)