Introduction
As organizations increasingly rely on AWS Web Application Firewall (WAF) to protect their applications, the need for effective log management and security analytics becomes critical. In this post, we'll walk through building a complete, production-ready pipeline that automatically processes WAF logs from CloudWatch to S3 and integrates with Palo Alto Cortex XDR for advanced security analytics.
What is Cortex XDR?
Cortex XDR (Extended Detection and Response) is Palo Alto Networks' cloud-native security platform that provides comprehensive threat detection, investigation, and response capabilities. It's designed to:
Unified Security Operations:
- Correlate data across endpoints, networks, and cloud environments
- Detect advanced threats using machine learning and behavioral analytics
- Automate incident response with playbooks and orchestration
- Provide centralized visibility into your entire security ecosystem
Why Cortex XDR for WAF Logs:
- Web attack detection: Identifies application-layer attacks, bot traffic, and malicious patterns
- Threat intelligence integration: Correlates WAF events with global threat data
- Automated blocking: Can trigger automated firewall rules based on WAF patterns
- Compliance support: Helps meet security logging and monitoring requirements
For organizations serious about application security, integrating WAF logs with Cortex XDR transforms raw log data into actionable security intelligence.
The Challenge
WAF logs provide invaluable security insights, but managing them effectively presents several challenges:
- Volume: WAF can generate massive amounts of log data
- Cost: Storing logs in CloudWatch can become expensive at scale
- Analysis: Raw logs need to be processed and analyzed for security insights
- Integration: Security teams need logs in their SIEM/SOAR platforms
Our Solution Architecture
We'll build an automated pipeline using AWS CDK that:
- Captures WAF logs from CloudWatch Logs
- Processes logs via Lambda function
- Stores compressed logs in S3 with lifecycle management
- Notifies Cortex XDR via SQS for real-time analysis
- Provides secure access through IAM roles
Architecture Diagram
Data Flow:
- WAF generates security logs from web traffic
- CloudWatch Logs captures and stores log events
- Lambda processes and compresses logs for efficient storage
- S3 stores compressed logs with automatic lifecycle transitions
- SQS notifies Cortex XDR of new log files for real-time analysis
Implementation with AWS CDK
Let's build this infrastructure using AWS CDK in Python:
Key Architectural Decisions
Before diving into the implementation, it's worth explaining a crucial design choice we made regarding log processing timing.
Real-time vs Batch Processing
When configuring CloudWatch Logs subscription filters, you have two main options:
- Real-time processing: Logs are sent to Lambda immediately as they arrive
- Batch processing: Logs are buffered and sent in 5-minute intervals
We chose real-time processing for several critical reasons:
The Hidden Cost Most Analyses Miss
Many cost comparisons focus only on Lambda execution costs, but there's a crucial piece missing from traditional analysis - CloudWatch Logs Insights charges! This oversight completely changes the cost equation.
What Traditional Analysis Misses:
The 5-minute batch approach requires CloudWatch Logs Insights queries to retrieve and batch the logs before sending to Lambda. Most analyses only compare:
- ✅ Lambda invocation costs
- ✅ Lambda compute costs
- ✅ S3 PUT request costs
- ❌ CloudWatch Logs Insights scanning charges (the biggest expense!)
5-Minute Batch Approach (Hidden Costs):
- Lambda invocations: 8,640/month × $0.0000002 = $0.001
- Lambda compute: Lower cost ✅
- S3 PUTs: Fewer PUTs ✅
- CloudWatch Logs Insights: 8,640 queries × data scanned ❌
Real-time Subscription Approach:
- Lambda invocations: 720,000/month × $0.0000002 = $0.14
- Lambda compute: Higher cost ❌
- S3 PUTs: More PUTs ❌
- CloudWatch Logs Insights: $0 ✅
Security Response Time: WAF logs contain potential security threats and attack patterns. In cybersecurity, every minute counts. Real-time processing ensures that:
- Suspicious activity is detected immediately
- Security teams can respond to active attacks quickly
- Cortex XDR can trigger automated responses without delay
Operational Visibility: Real-time logs provide immediate insight into:
- Application performance issues
- Traffic anomalies
- DDoS attack patterns
- Bot detection events
Cost vs. Value Trade-off:
When you factor in CloudWatch Logs Insights charges, real-time processing becomes significantly MORE cost-effective than batch processing. Here's the actual cost breakdown:
Estimated Daily Savings
For moderate WAF traffic (1GB logs/day):
Approach | Lambda Invocations | Lambda Duration | CW API | Total/Day |
---|---|---|---|---|
Real-time | $0.005 | $0.015 | $0 | $0.02 |
Hourly | $0.0001 | $0.008 | $0.50 | $0.51 |
Key Insights:
- CloudWatch API Penalty: Hourly batching requires CloudWatch Logs API calls to query and retrieve logs, costing $0.50/day - 25x more than the entire real-time processing cost.
- Real-time processing eliminates API costs by streaming logs directly via subscription filters, avoiding expensive CloudWatch queries entirely.
The analysis shows real-time processing is 96% cheaper ($0.02 vs $0.51/day) while providing:
- Immediate threat detection
- Real-time security response capabilities
- Better error isolation and recovery
- No CloudWatch API scanning charges
Monthly savings: $14.70 ($0.60 vs $15.30) - real-time pays for itself and saves money!
Cortex XDR Integration:
Security platforms like Cortex XDR are designed for real-time threat detection. Delayed log ingestion can:
- Miss time-sensitive attack patterns
- Reduce the effectiveness of machine learning models
- Impact incident response capabilities
This architectural choice reflects our security-first approach, prioritizing threat detection speed over minor cost optimizations.
Project Structure
waf-log-pipeline/ ├── app.py ├── requirements.txt ├── waf_log_pipeline/ │ ├── __init__.py │ └── waf_log_pipeline_stack.py └── tests/ └── unit/ └── test_waf_log_pipeline_stack.py
Core CDK Stack
import aws_cdk as cdk from aws_cdk import ( Stack, aws_s3 as s3, aws_lambda as _lambda, aws_iam as iam, aws_logs as logs, aws_sqs as sqs, aws_s3_notifications as s3n, CfnOutput, Duration, RemovalPolicy ) class WafLogPipelineStack(Stack): def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) # S3 Bucket for log storage with lifecycle management self.waf_logs_bucket = s3.Bucket( self, "WafLogsBucket", bucket_name="company-waf-logs-for-cortex", encryption=s3.BucketEncryption.S3_MANAGED, versioning=False, block_public_access=s3.BlockPublicAccess.BLOCK_ALL, lifecycle_rules=[ s3.LifecycleRule( id="waf-logs-lifecycle", enabled=True, transitions=[ s3.Transition( storage_class=s3.StorageClass.INFREQUENT_ACCESS, transition_after=Duration.days(30) ), s3.Transition( storage_class=s3.StorageClass.GLACIER, transition_after=Duration.days(90) ), s3.Transition( storage_class=s3.StorageClass.DEEP_ARCHIVE, transition_after=Duration.days(365) ) ], expiration=Duration.days(2555) # 7 years ) ], removal_policy=RemovalPolicy.DESTROY ) # Dead Letter Queue for failed messages cortex_dlq = sqs.Queue( self, "CortexXDRNotificationDLQ", queue_name="cortex-xdr-waf-logs-notifications-dlq", retention_period=Duration.days(14) ) # SQS Queue for Cortex XDR notifications self.cortex_notification_queue = sqs.Queue( self, "CortexXDRNotificationQueue", queue_name="cortex-xdr-waf-logs-notifications", visibility_timeout=Duration.minutes(15), dead_letter_queue=sqs.DeadLetterQueue( max_receive_count=3, queue=cortex_dlq ) ) # Lambda function for log processing self.waf_log_processor = _lambda.Function( self, "WafLogProcessor", runtime=_lambda.Runtime.PYTHON_3_12, handler="index.handler", code=_lambda.Code.from_inline(self._get_lambda_code()), timeout=Duration.minutes(5), memory_size=512, environment={ "S3_BUCKET": self.waf_logs_bucket.bucket_name, "S3_PREFIX": "raw" } ) # IAM permissions for Lambda self.waf_logs_bucket.grant_write(self.waf_log_processor) # CloudWatch Logs subscription filter self.subscription_filter = logs.SubscriptionFilter( self, "WafLogSubscriptionFilter", log_group=logs.LogGroup.from_log_group_name( self, "WafLogGroup", "aws-waf-logs-company" ), destination=logs_destinations.LambdaDestination(self.waf_log_processor), filter_pattern=logs.FilterPattern.all_events() ) # S3 bucket notifications to SQS self.waf_logs_bucket.add_event_notification( s3.EventType.OBJECT_CREATED, s3n.SqsDestination(self.cortex_notification_queue), s3.NotificationKeyFilter(prefix="raw/", suffix=".jsonl.gz") ) # IAM role for Cortex XDR access self.cortex_xdr_role = iam.Role( self, "CortexXDRRole", role_name="CortexXDR-WAFLogs-AssumedRole", assumed_by=iam.ServicePrincipal("cortex.paloaltonetworks.com"), external_ids=["your-external-id-here"], inline_policies={ "CortexXDRS3Access": iam.PolicyDocument( statements=[ iam.PolicyStatement( effect=iam.Effect.ALLOW, actions=["s3:GetObject"], resources=[f"{self.waf_logs_bucket.bucket_arn}/*"] ), iam.PolicyStatement( effect=iam.Effect.ALLOW, actions=["s3:ListBucket"], resources=[self.waf_logs_bucket.bucket_arn] ) ] ), "CortexXDRSQSAccess": iam.PolicyDocument( statements=[ iam.PolicyStatement( effect=iam.Effect.ALLOW, actions=[ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:ChangeMessageVisibility" ], resources=[self.cortex_notification_queue.queue_arn] ) ] ) } ) def _get_lambda_code(self) -> str: return ''' import json import boto3 import gzip import base64 from datetime import datetime import os s3_client = boto3.client('s3') def handler(event, context): """Process CloudWatch Logs and store in S3.""" s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] # Parse CloudWatch Logs event cw_data = event['awslogs']['data'] compressed_payload = base64.b64decode(cw_data) uncompressed_payload = gzip.decompress(compressed_payload) log_data = json.loads(uncompressed_payload) # Process log events processed_logs = [] for log_event in log_data['logEvents']: try: # Parse WAF log (already in JSON format) waf_log = json.loads(log_event['message']) processed_logs.append(json.dumps(waf_log)) except json.JSONDecodeError: # Handle non-JSON log entries processed_logs.append(json.dumps({ "timestamp": log_event['timestamp'], "message": log_event['message'] })) # Create filename with timestamp timestamp = datetime.utcnow().strftime('%Y%m%d-%H%M%S-%f')[:-3] filename = f"{s3_prefix}/waf-logs-{timestamp}.jsonl.gz" # Compress and upload to S3 if processed_logs: content = '\\n'.join(processed_logs) compressed_content = gzip.compress(content.encode('utf-8')) s3_client.put_object( Bucket=s3_bucket, Key=filename, Body=compressed_content, ContentType='application/gzip', ContentEncoding='gzip' ) print(f"Uploaded {len(processed_logs)} log entries to s3://{s3_bucket}/{filename}") return { 'statusCode': 200, 'body': json.dumps(f'Processed {len(processed_logs)} log entries') } '''
Testing Infrastructure
Create comprehensive but simple tests to validate your infrastructure:
import aws_cdk as core from aws_cdk import assertions from waf_log_pipeline.waf_log_pipeline_stack import WafLogPipelineStack def test_s3_bucket_exists(): """Test that S3 bucket is created.""" app = core.App() stack = WafLogPipelineStack(app, "test-stack") template = assertions.Template.from_stack(stack) template.resource_count_is("AWS::S3::Bucket", 1) def test_lambda_function_exists(): """Test that Lambda function is created.""" app = core.App() stack = WafLogPipelineStack(app, "test-stack") template = assertions.Template.from_stack(stack) template.resource_count_is("AWS::Lambda::Function", 2) # Main + CDK helper def test_sqs_queues_exist(): """Test that SQS queues are created.""" app = core.App() stack = WafLogPipelineStack(app, "test-stack") template = assertions.Template.from_stack(stack) template.resource_count_is("AWS::SQS::Queue", 2) # Main + DLQ def test_cortex_role_exists(): """Test that Cortex XDR role is created.""" app = core.App() stack = WafLogPipelineStack(app, "test-stack") template = assertions.Template.from_stack(stack) template.has_resource_properties("AWS::IAM::Role", { "RoleName": "CortexXDR-WAFLogs-AssumedRole" })
Deployment
Deploy your infrastructure with these simple commands:
# Install dependencies pip install -r requirements.txt # Bootstrap CDK (first time only) cdk bootstrap # Deploy the stack cdk deploy # Run tests python -m pytest tests/ -v
Cortex XDR Integration
Once deployed, configure Cortex XDR to consume the logs:
1. Get Integration Details
# Retrieve configuration values aws cloudformation describe-stacks \ --stack-name waf-log-pipeline \ --query 'Stacks[0].Outputs'
2. Configure in Cortex XDR
Navigate to: Settings → Data Sources → Add Data Source → Amazon S3
Configuration:
- SQS URL: Use the queue URL from stack outputs
- Role ARN: Use the Cortex role ARN from outputs
- External ID: Your configured external ID
- Log Type: Generic
- Log Format: JSON
- Compression: gzip
- Vendor: AWS
- Product: WAF
Key Features & Benefits
Cost Optimization
- S3 Lifecycle Management: Automatically transitions logs to cheaper storage classes
- Compressed Storage: Gzip compression reduces storage costs by ~70%
- CloudWatch Log Offloading: Reduces expensive CloudWatch Logs storage
Scalability
- Lambda Auto-scaling: Handles varying log volumes automatically
- SQS Buffering: Manages traffic spikes and processing delays
- S3 Infinite Scale: No storage capacity concerns
Reliability
- Dead Letter Queue: Captures failed message processing
- Error Handling: Robust Lambda error handling and retries
- Infrastructure as Code: Consistent, repeatable deployments
Security
- IAM Least Privilege: Minimal required permissions for each component
- Encryption: S3 server-side encryption enabled
- VPC Integration: Can be deployed in VPC for additional isolation
Monitoring and Troubleshooting
CloudWatch Metrics to Monitor
- Lambda function errors and duration
- SQS queue depth and message age
- S3 PUT/GET request metrics
Common Issues and Solutions
Lambda timeouts: Increase memory allocation or timeout duration
SQS message accumulation: Check Lambda error logs and DLQ
Missing logs in Cortex: Verify IAM permissions and SQS configuration
Cost Analysis
For a typical deployment processing 1GB of WAF logs daily:
- S3 Storage: ~$0.50/month (with lifecycle transitions)
- Lambda Execution: ~$2.00/month
- SQS Messages: ~$0.10/month
- Data Transfer: Minimal within same region
Total: ~$2.60/month vs ~$15/month keeping logs in CloudWatch
Conclusion
This pipeline provides a robust, cost-effective solution for WAF log management and security analytics. By leveraging Infrastructure as Code with AWS CDK, we've created a maintainable, scalable system that integrates seamlessly with modern security platforms like Cortex XDR.
The combination of automated processing, cost optimization, and real-time security analytics makes this architecture ideal for organizations serious about application security monitoring.
References:
- Cortext XDR
- Cloudwatch Pricing
- Ingest generic logs from Amazon S3 Ready to implement this in your environment?
Top comments (0)