Skip to content

Commit cd8a7ca

Browse files
committed
Remove regex from ingest pipeline
1 parent a8a8075 commit cd8a7ca

File tree

3 files changed

+88
-70
lines changed

3 files changed

+88
-70
lines changed

packages/awsfirehose/changelog.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
# newer versions go on top
2+
- version: "1.8.2"
3+
changes:
4+
- description: Remove regex from ingest pipeline
5+
type: bugfix
6+
link: https://github.com/elastic/integrations/pull/14845
27
- version: "1.8.1"
38
changes:
49
- description: Update correct link for pull request

packages/awsfirehose/data_stream/logs/elasticsearch/ingest_pipeline/default.yml

Lines changed: 82 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -16,113 +16,126 @@ processors:
1616
}
1717
1818
// AWS WAF Logs
19-
if (ctx['aws.kinesis.name'] != null && ctx['aws.kinesis.name'].contains('aws-waf-logs-')) {
20-
ctx.event.dataset = 'aws.waf';
21-
}
22-
else if (ctx['aws.cloudwatch.log_group'] != null && ctx['aws.cloudwatch.log_group'].contains('aws-waf-logs-')) {
19+
def message_lower = ctx.message.toLowerCase();
20+
21+
if (ctx['aws.kinesis.name'] != null && ctx['aws.kinesis.name'].contains('aws-waf-logs-') ||
22+
(ctx['aws.cloudwatch.log_group'] != null && ctx['aws.cloudwatch.log_group'].contains('aws-waf-logs-'))) {
2323
ctx.event.dataset = 'aws.waf';
2424
}
25-
else if (ctx.message.contains('webaclld') && ctx.message.contains('terminatingRule') &&
26-
ctx.message.contains('httpSource') && ctx.message.contains('ruleGroupList') && ctx.message.contains('rateBasedRuleList') &&
27-
ctx.message.contains('nonTerminatingMatchingRules') && ctx.message.contains('httpRequest') && ctx.message.contains('labels')) {
25+
else if (message_lower.contains('webaclid') && message_lower.contains('terminatingrule') &&
26+
message_lower.contains('httpsource') && message_lower.contains('rulegrouplist')) {
2827
ctx.event.dataset = 'aws.waf';
2928
}
29+
3030
// AWS CloudTrail Logs
3131
else if (ctx['aws.cloudwatch.log_stream'] != null && ctx['aws.cloudwatch.log_stream'].contains('CloudTrail')) {
3232
ctx.event.dataset = 'aws.cloudtrail';
3333
}
34+
3435
// AWS VPC Flow Logs
35-
else if (ctx.message.splitOnToken(" ").length == 14) {
36+
else if (message_lower.contains('"eni-') && message_lower.contains('"vpc-') && message_lower.contains('accept') || message_lower.contains('reject')) {
3637
ctx.event.dataset = 'aws.vpcflow';
3738
}
39+
3840
// AWS Firewall Logs
39-
else if (ctx.message.contains('firewall_name') && ctx.message.contains('availability_zone') &&
40-
ctx.message.contains('event_timestamp') && ctx.message.contains('event')) {
41+
else if (message_lower.contains('"firewall_name":') && message_lower.contains('"availability_zone":') &&
42+
message_lower.contains('"event_timestamp":') && message_lower.contains('"event":')) {
4143
ctx.event.dataset = 'aws.firewall_logs';
4244
}
43-
// AWS Route53 Resolver Logs - This needs to be before the Route53 Public Logs
44-
else if (ctx.message != null && ctx.message.contains('version') && ctx.message.contains('account_id') && ctx.message.contains('region') &&
45-
ctx.message.contains('vpc_id') && ctx.message.contains('query_timestamp') && ctx.message.contains('query_name') &&
46-
ctx.message.contains('query_type') && ctx.message.contains('query_class') && ctx.message.contains('rcode') &&
47-
ctx.message.contains('answers') && ctx.message.contains('srcaddr') && ctx.message.contains('srcport') &&
48-
ctx.message.contains('transport') && ctx.message.contains('srcids')) {
45+
46+
// AWS Route53 Resolver Logs
47+
else if (message_lower.contains('"version":') && message_lower.contains('"account_id":') && message_lower.contains('"region":') &&
48+
message_lower.contains('"vpc_id":') && message_lower.contains('"query_timestamp":')) {
4949
ctx.event.dataset = 'aws.route53_resolver_logs';
5050
}
51+
5152
// AWS Route53 Public Logs
52-
else if (ctx['aws.cloudwatch.log_stream'] != null && ctx['aws.cloudwatch.log_group'] != null &&
53-
ctx['aws.cloudwatch.log_group'].contains('/aws/route53/')) {
54-
def split_log_stream_name = ctx['aws.cloudwatch.log_stream'].splitOnToken('/');
55-
if (split_log_stream_name.length == 2) {
56-
def hosted_zone_id = split_log_stream_name[0];
57-
def edge_location_id = split_log_stream_name[1];
58-
if (ctx.message != null && ctx.message.contains(hosted_zone_id) && ctx.message.contains(edge_location_id)) {
59-
ctx.event.dataset = 'aws.route53_public_logs';
60-
}
53+
else if (ctx['aws.cloudwatch.log_group'] != null && ctx['aws.cloudwatch.log_group'].contains('/aws/route53/')) {
54+
if (ctx.message.contains("T") && ctx.message.contains("Z") && ctx.message.contains("NOERROR") && ctx.message.contains("UDP")) {
55+
ctx.event.dataset = 'aws.route53_public_logs';
6156
}
6257
}
63-
// AWS API Gateway Logs - This needs to be before the S3 Access Logs
64-
else if ((ctx.message != null && ctx.message.contains('requestId') && ctx.message.contains('ip')
65-
&& ctx.message.contains('requestTime') && ctx.message.contains('httpMethod') && ctx.message.contains('routeKey')
66-
&& ctx.message.contains('status') && ctx.message.contains('protocol') && ctx.message.contains('responseLength'))
67-
|| (ctx.message != null && ctx.message.contains('requestId') && ctx.message.contains('ip') && ctx.message.contains('caller')
68-
&& ctx.message.contains('user') && ctx.message.contains('requestTime') && ctx.message.contains('httpMethod')
69-
&& ctx.message.contains('resourcePath') && ctx.message.contains('status') && ctx.message.contains('protocol')
70-
&& ctx.message.contains('responseLength'))
71-
|| (ctx.message != null && ctx.message.contains('requestId') && ctx.message.contains('ip') && ctx.message.contains('caller')
72-
&& ctx.message.contains('user') && ctx.message.contains('requestTime') && ctx.message.contains('eventType')
73-
&& ctx.message.contains('routeKey') && ctx.message.contains('status') && ctx.message.contains('connectionId'))) {
58+
59+
// AWS API Gateway Logs
60+
else if (message_lower.contains('"requestid":') && message_lower.contains('"ip":')
61+
&& (message_lower.contains('"requesttime":') || message_lower.contains('"request_time":'))
62+
&& (message_lower.contains('"httpmethod":') || message_lower.contains('"eventtype":'))) {
7463
ctx.event.dataset = 'aws.apigateway_logs';
7564
}
76-
// AWS S3 Access Logs
77-
else if (ctx.message.length() > 0) {
78-
int tokenCount = 1;
65+
66+
// AWS S3 Access Logs, CloudFront, and ELB Logs
67+
else {
68+
// Inlined logic for splitting tokens
69+
def tokens_result = new ArrayList();
7970
StringBuilder currentToken = new StringBuilder();
80-
String hostHeader = "-";
8171
boolean insideQuotes = false;
8272
for (int i = 0; i < ctx.message.length(); i++) {
83-
String c = String.valueOf(ctx.message.charAt(i));
84-
if (c.equals(" ") && !insideQuotes) {
85-
tokenCount++;
86-
if (tokenCount == 24) {
87-
hostHeader = currentToken.toString();
88-
}
89-
currentToken = new StringBuilder();
90-
} else if (c.equals("\"")) {
73+
char c = ctx.message.charAt(i);
74+
if (c.toString().equals("\"")) {
9175
insideQuotes = !insideQuotes;
9276
currentToken.append(c);
77+
} else if (c.toString().equals(" ") && !insideQuotes) {
78+
if (currentToken.length() > 0) {
79+
tokens_result.add(currentToken.toString());
80+
}
81+
currentToken = new StringBuilder();
9382
} else {
9483
currentToken.append(c);
9584
}
9685
}
97-
if (hostHeader != "-" && hostHeader.contains('s3') && hostHeader.contains('amazonaws.com')) {
98-
ctx.event.dataset = 'aws.s3access';
99-
} else if (tokenCount == 25) {
86+
if (currentToken.length() > 0) {
87+
tokens_result.add(currentToken.toString()); // Add the last token
88+
}
89+
def tokens = tokens_result.toArray(new String[0]);
90+
def tokenCount = tokens.length;
91+
92+
// Check for S3 Access logs first using a more reliable token count
93+
if (tokenCount >= 24) {
94+
def hostHeader = tokens[23];
95+
if (hostHeader.contains('s3') && hostHeader.contains('amazonaws.com')) {
96+
ctx.event.dataset = 'aws.s3access';
97+
}
98+
}
99+
if (tokenCount == 25) {
100100
ctx.event.dataset = 'aws.s3access';
101-
} else {
102-
tokenCount = 1;
103-
insideQuotes = false;
104-
for (int i = 0; i < ctx.message.length(); i++) {
105-
String c = String.valueOf(ctx.message.charAt(i));
106-
if (c.equals(" ") && !insideQuotes) {
107-
tokenCount++;
108-
} else if (c.equals("\"")) {
109-
insideQuotes = !insideQuotes;
101+
}
102+
103+
// Fallback to CloudFront and ELB if S3 check fails
104+
if (ctx.event.dataset == null) {
105+
// AWS CloudFront Logs - Updated logic
106+
if (tokenCount == 33) {
107+
String date = tokens[0];
108+
String time = tokens[1];
109+
if (date != null && time != null &&
110+
date.length() == 10 && date.substring(4, 5).equals("-") && date.substring(7, 8).equals("-") &&
111+
time.length() == 8 && time.substring(2, 3).equals(":") && time.substring(5, 6).equals(":")) {
112+
ctx.event.dataset = 'aws.cloudfront_logs';
110113
}
111114
}
112-
// AWS CloudFront Logs
113-
if (tokenCount==33 && ctx.message =~ /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\s[a-zA-Z0-9-]+\s\d+\s(\d+\.\d+\.\d+\.\d+|[a-fA-F0-9:]+)/) {
114-
ctx.event.dataset = 'aws.cloudfront_logs';
115+
// AWS ELB Logs - Updated logic to handle multiple log types
116+
else if (tokens.length > 0 && (tokens[0].equals("http") || tokens[0].equals("https") || tokens[0].equals("tcp") || tokens[0].equals("tls") || tokens[0].equals("udp"))) {
117+
// This identifies ALBs and NLBs
118+
ctx.event.dataset = 'aws.elb_logs';
115119
}
116-
//AWS ELB Logs
117-
else if ((tokenCount == 15 || tokenCount == 29 || tokenCount == 22) &&
118-
(ctx.message =~ /.*\s(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})|([0-9a-fA-F:.]+:\d{1,5})\s(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})|([0-9a-fA-F:.]+:\d{1,5})\s-?\d+(\.\d+)?\s/)) {
119-
ctx.event.dataset = 'aws.elb_logs';
120+
else if ((tokenCount == 15 || tokenCount == 29 || tokenCount == 22)) { // For classic ELBs
121+
if (tokens.length >= 4) {
122+
boolean token2IsIpPort = tokens[2].contains(':');
123+
boolean token3IsIpPort = tokens[3].contains(':');
124+
125+
try {
126+
Double.parseDouble(tokens[4]);
127+
if (token2IsIpPort && token3IsIpPort) {
128+
ctx.event.dataset = 'aws.elb_logs';
129+
}
130+
} catch (Exception e) {
131+
// Token 4 is not a number, so this is not an ELB log
132+
}
133+
}
120134
}
121135
}
122136
}
123-
124137
ignore_failure: true
125-
138+
126139
on_failure:
127140
- set:
128141
field: error.message

packages/awsfirehose/manifest.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
format_version: "3.3.0"
22
name: awsfirehose
33
title: Amazon Data Firehose
4-
version: 1.8.1
4+
version: 1.8.2
55
description: Stream logs and metrics from Amazon Data Firehose into Elastic Cloud.
66
type: integration
77
categories:

0 commit comments

Comments
 (0)