In this post, I'll automate the process of adding malicious IP addresses classified by the following third-party service https://proxycheck.io to AWS WAF.
AWS has already built-in managed lists you can leverage to protect your resources against malicious IP addresses used for Recon, DoS, or with a general bad reputation.
- AWSManagedIPReputationList
- AWSManagedReconnaissanceList
- AWSManagedIPDDoSList
But we're building our own list based on web server access logs, specifically Cloudfront.
Prerequisites
- An API key obtained from https://proxycheck.io/ ( they have a generous free plan )
- You need the necessary IAM permissions to deploy an AWS SAM application and create the needed underlying resources.
- An Elasticache Valkey cluster for caching our API call results from ProxyCheck for quota efficiency.
- A telegram bot created so you can receive notifications about WAF updates
- A SAM app initialized with AWS SAM CLI
Walkthrough
We will create a new AWS SAM template file and include the following block of YAML definition to define:
- A Lambda function running our Python script that interacts with ProxyCheck API, stores results in Elasticache Valkey along with the risk score associated with the scanned IP address, and updates the AWS WAF IP set accordingly.
- An EventBridge Rule that will act as an event listener to trigger the lambda function whenever a new CSV file is written to our S3 bucket that contains our exported list of IP addresses.
- The necessary IAM permissions attached to the lambda function to read files from S3 and update the WAF IP Set.
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > ProxyCheck WAF automation ProxyCheck WAF automation Lambda Function that reads IPs from S3, checks risk scores using ProxyCheck.io, and updates AWS WAF IP sets with high-risk IPs. # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 900 # 15 minutes MemorySize: 512 LoggingConfig: LogFormat: JSON Resources: ProxyCheckWAFAutomation: Type: AWS::Serverless::Function Properties: FunctionName: "ProxyCheckWAFAutomation" CodeUri: proxy_check_waf_automation/ Handler: app.lambda_handler Runtime: python3.13 Architectures: - x86_64 Timeout: 900 # 15 minutes MemorySize: 512 Environment: Variables: S3_BUCKET_NAME: "proxycheck-waf-automation" PROXYCHECK_API_KEY: "xxxx-xxxxx-xxxxx-xxxxx" TELEGRAM_BOT_TOKEN: "YOUR_TG_BOT_TOKEN" TELEGRAM_CHAT_ID: "-xxxxx" REDIS_URL: "redis://username:password@hostname/db_name" Events: Trigger: Type: EventBridgeRule Properties: Pattern: source: - "aws.s3" detail-type: - "Object Created" - "Object Updated" detail: bucket: name: - "proxycheck-waf-automation" Policies: - S3ReadPolicy: BucketName: "proxycheck-waf-automation/*" - Statement: - Effect: Allow Action: - wafv2:GetIPSet - wafv2:UpdateIPSet Resource: "*"
You need to update the environment variables values to match your S3 bucket name, proxycheck.io API key and telegram bot token and chat ID. Please use AWS Systems Manager Parameter store to store your secrets. The above template is for demo purposes only.
Below is the code needed in your Python script:
import json import csv import os import io import logging from typing import List, Set, Optional import boto3 import requests import proxycheck import botocore.exceptions import sys import redis import time # Set up logging logger = logging.getLogger() logger.setLevel(logging.INFO) # Initialize AWS clients s3_client = boto3.client('s3') wafv2_client = boto3.client('wafv2', region_name='us-east-1') # Initialize ProxyCheck client proxy_checker = proxycheck.Blocking(key=os.environ.get('PROXYCHECK_API_KEY')) # Initialize Redis client REDIS_URL = os.environ.get('REDIS_URL') redis_client = None def get_redis_client(): """Get Redis client with connection retry logic""" global redis_client if redis_client is None: try: redis_client = redis.from_url( REDIS_URL, socket_timeout=5, socket_connect_timeout=5, retry_on_timeout=True, health_check_interval=30 ) # Test connection redis_client.ping() logger.info("Redis connection established successfully") except Exception as e: logger.warning(f"Failed to connect to Redis: {str(e)}. Will proceed without caching.") redis_client = None return redis_client def lambda_handler(event, context): """Daily IP Checker Lambda Function This function: 1. Reads a CSV file from S3 containing IPs 2. Deduplicates the IPs 3. Checks risk scores using ProxyCheck.io (limited to 1000 requests) 4. Adds high-risk IPs (score > 33) to AWS WAF IP set 5. Sends Telegram notification about the update """ try: # Get configuration from environment variables s3_bucket = os.environ.get('S3_BUCKET_NAME') s3_key = event["detail"]["object"]["key"] waf_scope = 'CLOUDFRONT' waf_ip_set_name = "ProxyCheckAutomation" waf_ip_set_id = "your_id_here" telegram_bot_token = os.environ.get('TELEGRAM_BOT_TOKEN') telegram_chat_id = os.environ.get('TELEGRAM_CHAT_ID') # Validate required environment variables required_vars = [s3_bucket, s3_key, waf_ip_set_name, waf_ip_set_id] if not all(required_vars): raise ValueError("Missing required environment variables") logger.info(f"Starting daily IP check process") # Step 1: Read CSV file from S3 logger.info(f"Reading CSV file from S3: s3://{s3_bucket}/{s3_key}") ips = read_ips_from_s3(s3_bucket, s3_key) logger.info(f"Found {len(ips)} unique IPs after deduplication") # Step 2: Check IP risk scores (limited to 1000 requests) logger.info("Checking IP risk scores with ProxyCheck.io and Redis caching") risky_ips = check_ip_risk_scores(ips[:1000]) logger.info(f"Found {len(risky_ips)} risky IPs (risk score > 33)") # Step 3: Add risky IPs to AWS WAF IP set added_count = 0 if risky_ips: logger.info("Adding risky IPs to AWS WAF IP set") # Convert IPs to CIDR format networks = {f"{ip}/32" for ip in risky_ips} added_count = update_single_ipset(wafv2_client, waf_ip_set_name, waf_scope, waf_ip_set_id, networks) logger.info(f"Added {added_count} IPs to WAF IP set") else: logger.info("No risky IPs found, no updates needed") # Step 4: Send Telegram notification with cache stats if telegram_bot_token and telegram_chat_id: # Get cache efficiency stats for notification cache_stats = get_cache_stats() send_telegram_notification( telegram_bot_token, telegram_chat_id, len(ips), len(risky_ips), added_count, context, cache_stats ) return { "statusCode": 200, "body": json.dumps({ "message": "Daily IP check completed successfully", "total_ips_processed": len(ips), "risky_ips_found": len(risky_ips), "ips_added_to_waf": added_count if risky_ips else 0 }) } except Exception as e: logger.error(f"Error in daily IP check: {str(e)}") # Send error notification via Telegram if configured telegram_bot_token = os.environ.get('TELEGRAM_BOT_TOKEN') telegram_chat_id = os.environ.get('TELEGRAM_CHAT_ID') if telegram_bot_token and telegram_chat_id: send_error_notification(telegram_bot_token, telegram_chat_id, str(e), context) raise e def read_ips_from_s3(bucket: str, key: str) -> List[str]: """Read and deduplicate IPs from CSV file in S3""" try: # Download the CSV file from S3 response = s3_client.get_object(Bucket=bucket, Key=key) csv_content = response['Body'].read().decode('utf-8') # Parse CSV and extract IPs ips: Set[str] = set() csv_reader = csv.reader(io.StringIO(csv_content)) for row in csv_reader: # Skip empty rows if not row or len(row) < 2: continue # IP is in the second column (index 1) ip_value = row[1].strip() if ip_value and is_valid_ip(ip_value): ips.add(ip_value) return list(ips) except Exception as e: logger.error(f"Error reading IPs from S3: {str(e)}") raise def is_valid_ip(ip: str) -> bool: """Basic IP validation""" try: parts = ip.split('.') return len(parts) == 4 and all(0 <= int(part) <= 255 for part in parts) except (ValueError, AttributeError): return False def get_cached_risk_score(ip: str) -> Optional[int]: """Get cached risk score from Redis""" try: redis_conn = get_redis_client() if redis_conn is None: return None cache_key = f"ip_risk:{ip}" cached_score = redis_conn.get(cache_key) if cached_score is not None: return int(cached_score.decode('utf-8')) return None except Exception as e: logger.warning(f"Error reading from Redis cache for IP {ip}: {str(e)}") return None def cache_risk_score(ip: str, risk_score: int, ttl_hours: int = 720): """Cache risk score in Redis with TTL""" try: redis_conn = get_redis_client() if redis_conn is None: return cache_key = f"ip_risk:{ip}" redis_conn.setex(cache_key, ttl_hours * 3600, str(risk_score)) logger.debug(f"Cached risk score for IP {ip}: {risk_score}") except Exception as e: logger.warning(f"Error caching risk score for IP {ip}: {str(e)}") # Global cache stats for tracking cache_stats = {"cache_hits": 0, "api_calls": 0, "checked_count": 0} def get_cache_stats(): """Get current cache statistics""" return cache_stats.copy() def reset_cache_stats(): """Reset cache statistics""" global cache_stats cache_stats = {"cache_hits": 0, "api_calls": 0, "checked_count": 0} def check_ip_risk_scores(ips: List[str]) -> List[str]: """Check IP risk scores using ProxyCheck.io with Redis caching and return risky IPs""" risky_ips = [] reset_cache_stats() # Reset stats at the beginning of each run try: logger.info(f"Starting to check {len(ips)} IPs with Redis caching") for ip in ips: try: # First, check if we have a cached result cached_risk = get_cached_risk_score(ip) if cached_risk is not None: risk_score = cached_risk cache_stats["cache_hits"] += 1 logger.debug(f"Cache hit for IP {ip}: risk score {risk_score}") else: # Check individual IP using proxy_checker.ip() method ip_result = proxy_checker.ip(ip) risk_score = ip_result.risk() cache_stats["api_calls"] += 1 # Cache the result for future use cache_risk_score(ip, risk_score) logger.debug(f"API call for IP {ip}: risk score {risk_score}") cache_stats["checked_count"] += 1 if risk_score > 33: risky_ips.append(ip) logger.info(f"Risky IP found: {ip} (risk score: {risk_score})") # Log progress every 100 IPs if cache_stats["checked_count"] % 100 == 0: logger.info(f"Progress: {cache_stats['checked_count']}/{len(ips)} IPs processed, {len(risky_ips)} risky IPs found. Cache hits: {cache_stats['cache_hits']}, API calls: {cache_stats['api_calls']}") except Exception as e: logger.warning(f"Error checking IP {ip}: {str(e)}") continue except Exception as e: logger.error(f"Error checking IP risk scores: {str(e)}") raise cache_efficiency = (cache_stats["cache_hits"] / cache_stats["checked_count"] * 100) if cache_stats["checked_count"] > 0 else 0 logger.info(f"Completed checking {cache_stats['checked_count']} IPs, found {len(risky_ips)} risky IPs. Cache efficiency: {cache_stats['cache_hits']}/{cache_stats['checked_count']} ({cache_efficiency:.1f}% cache hits), API calls saved: {cache_stats['cache_hits']}") return risky_ips def get_ipset_and_token(client, name, scope, ipset_id): """ Retrieve an existing WAFv2 IPSet. Returns (lock_token, description, existing_addresses_set). """ try: resp = client.get_ip_set(Name=name, Scope=scope, Id=ipset_id) except botocore.exceptions.ClientError as e: logger.error(f"Error fetching IPSet {name}: {e}") raise ipset = resp["IPSet"] lock_token = resp["LockToken"] description = ipset.get("Description", "") existing = set(ipset.get("Addresses", [])) return lock_token, description, existing def update_single_ipset(client, name, scope, ipset_id, networks, is_ipv6=False): """ Add any networks in `networks` to the specified IPSet. `networks` is a set of CIDR network strings (e.g., '192.168.1.1/32'). Returns the number of new IPs added. """ try: # 1) Fetch existing lock_token, description, existing_addrs = get_ipset_and_token( client, name, scope, ipset_id ) # 2) Compute new to_add = networks - existing_addrs if not to_add: logger.info(f"[{name}] no new {'IPv6' if is_ipv6 else 'IPv4'} CIDRs to add.") return 0 # 3) Merge and sort updated = sorted(existing_addrs | networks) limit = 10000 if len(updated) > limit: logger.error(f"[{name}] total addresses {len(updated)} exceed limit {limit}.") raise ValueError(f"IPSet would exceed limit of {limit} addresses") # 4) Update IPSet try: resp = client.update_ip_set( Name=name, Scope=scope, Id=ipset_id, Description=description or 'Daily IP Risk Checker - High Risk IPs', Addresses=list(updated), LockToken=lock_token ) except botocore.exceptions.ClientError as e: logger.error(f"Error updating IPSet {name}: {e}") raise next_token = resp.get("NextLockToken") logger.info( f"[{name}] Inserted {len(to_add)} new items; now has {len(updated)} addresses. " f"NextLockToken: {next_token}" ) return len(to_add) except Exception as e: logger.error(f"Error updating IPSet {name}: {str(e)}") raise def send_telegram_notification(bot_token: str, chat_id: str, total_ips: int, risky_ips: int, added_ips: int, context=None, cache_stats=None): """Send Telegram notification about the IP check results""" try: request_id = context.aws_request_id if context else 'Unknown' # Build cache efficiency message cache_message = "" if cache_stats and cache_stats.get("checked_count", 0) > 0: cache_efficiency = (cache_stats["cache_hits"] / cache_stats["checked_count"] * 100) cache_message = f""" 💾 Cache Performance: • Cache hits: {cache_stats["cache_hits"]} • API calls: {cache_stats["api_calls"]} • Efficiency: {cache_efficiency:.1f}% • API calls saved: {cache_stats["cache_hits"]}""" message = f"""🛡️ Daily IP Check Report 📊 Total IPs processed: {total_ips} ⚠️ Risky IPs found: {risky_ips} 🔒 IPs added to WAF: {added_ips}{cache_message} Status: {'✅ Completed successfully' if risky_ips >= 0 else '❌ Failed'} Request ID: {request_id} """ url = f"https://api.telegram.org/bot{bot_token}/sendMessage" payload = { 'chat_id': chat_id, 'text': message, 'parse_mode': 'HTML' } response = requests.post(url, json=payload, timeout=10) response.raise_for_status() logger.info("Telegram notification sent successfully") except Exception as e: logger.error(f"Error sending Telegram notification: {str(e)}") def send_error_notification(bot_token: str, chat_id: str, error_message: str, context=None): """Send Telegram notification about errors""" try: request_id = context.aws_request_id if context else 'Unknown' message = f"""❌ Daily IP Check Error Error: {error_message} Request ID: {request_id} Please check CloudWatch logs for more details. """ url = f"https://api.telegram.org/bot{bot_token}/sendMessage" payload = { 'chat_id': chat_id, 'text': message, 'parse_mode': 'HTML' } response = requests.post(url, json=payload, timeout=10) response.raise_for_status() except Exception as e: logger.error(f"Error sending error notification: {str(e)}")
The above code:
- Reads a CSV file where IP addresses are stored in the second column when the file is uploaded to the following bucket proxycheck-waf-automation, and de-duplicates them.
- Checks their risk scores with ProxyCheck's client SDK . Once a risky IP is detected, it will be added to the WAF IP Set using the boto3 client.
- Caches scanned IP addresses in Elasticache Valkey along with their risk scores for 30 days to make sure we don't scan them again in this period.
- Sends a summary report to your Telegram chat.
You need to set your already created WAF IP Set ID in the code. In our case, we're updating the AWS WAF to protect Cloudfront distributions with an already existing rule that includes the IP Set to block IP addresses. The above example covers IPv4 addresses; you can apply the same principles to IPv6 addresses.
And below are the results !
Thanks for tuning in!
Top comments (0)