You can use the script below to check whether you are receiving the metrics correctly.
import boto3 from datetime import datetime, timedelta import time import pytz cloudwatch = boto3.client('cloudwatch') def get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time, end_time): params = { 'StartTime': start_time, 'EndTime': end_time, 'MetricDataQueries': [ { 'Id': 'm1', 'MetricStat': { 'Metric': { 'Namespace': namespace, 'MetricName': metric_name, 'Dimensions': dimensions }, 'Period': 30, 'Stat': 'Sum' } } ] } try: response = cloudwatch.get_metric_data(**params) return response['MetricDataResults'] except Exception as e: print(f"Error fetching metrics: {e}") return [] def main(): namespace = "CWAgent" metric_name = "<MetricsName>" dimensions = [{'Name': 'path', 'Value': 'pathName'}, {'Name':'metric_type', 'Value': 'metric_type_value'}] target_value = 1000 collection_interval_seconds = 30 # utc_now = datetime.now(pytz.UTC) - timedelta(seconds=300) while True: end_time = datetime.now(pytz.UTC) - timedelta(seconds=10) start_time = end_time - timedelta(seconds=collection_interval_seconds) start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S') end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%S') metrics_data = get_cloudwatch_metrics(namespace, metric_name, dimensions, start_time_str, end_time_str) print("Currrent Time: ", datetime.now(pytz.UTC)) print("Start Time: ", start_time_str) print("End Time: ", end_time_str) print(metrics_data) # if metrics_data: # aggregated_value = sum(metrics_data[0]['Values']) if metrics_data[0]['Values'] else None # if aggregated_value and aggregated_value >= target_value: # print(f"Target metric value met or exceeded: {aggregated_value}") # else: # print(f"Current aggregated value: {aggregated_value}, below target.") # else: # print("No metrics data received.") print(f"Sleeping for 10 seconds...") time.sleep(10) print() print() if __name__ == "__main__": main()
Requirements.txt:
boto3==1.34.117 botocore==1.34.117 jmespath==1.0.1 python-dateutil==2.9.0.post0 pytz==2024.1 s3transfer==0.10.1 six==1.16.0 urllib3==2.2.1
I have already explained each parameter and its function. For more information, you can refer to this blog:

Scale Deployments Based on HTTP Requests with Keda
CodeWithVed ・ Sep 17
In the above Python script, some parts are commented out. It is entirely up to you how you want to use this script. Since I wanted to check only the metrics and how often it prints the values and at what time, I was simply printing the time, Also you can modify the metrics_collection_time and period based on your needs.
Alternatively, i also mentioned the go code which could you help you if your primary code in Golang:
package main import ( "fmt" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" ) func main() { namespace := "CWAgent" metricName := "ingestion/commandservice/totalRequest_count" dimensions := []*cloudwatch.Dimension{ {Name: aws.String("path"), Value: aws.String("pathName")}, {Name: aws.String("metric_type"), Value: aws.String("metricsTypeValue")}, } targetValue := float64(100) collectionIntervalSeconds := int64(30) for { endTime := time.Now().UTC() startTime := endTime.Add(-time.Duration(collectionIntervalSeconds) * time.Second) // var endTime, startTime time.Time // layout := "2006-01-02T15:04:05Z" // endTime, err := time.Parse(layout, "2024-06-05T05:52:33Z") // startTime, err = time.Parse(layout, "2024-06-05T05:51:33Z") // if err != nil { // fmt.Println("Error parsing time:", err) // return // } params := &cloudwatch.GetMetricDataInput{ EndTime: &endTime, StartTime: &startTime, MetricDataQueries: []*cloudwatch.MetricDataQuery{ { Id: aws.String("m1"), MetricStat: &cloudwatch.MetricStat{ Metric: &cloudwatch.Metric{ Namespace: aws.String(namespace), MetricName: aws.String(metricName), Dimensions: dimensions, }, Period: aws.Int64(10), Stat: aws.String("Sum"), }, }, }, } sess, err := session.NewSession(&aws.Config{ Region: aws.String("us-east-1"), }) if err != nil { fmt.Println("Error creating session:", err) return } svc := cloudwatch.New(sess) result, err := svc.GetMetricData(params) if err != nil { fmt.Println("Error getting metric data:", err) continue } var aggregatedValue float64 if len(result.MetricDataResults) > 0 && result.MetricDataResults[0].Values != nil { aggregatedValue = sumFloat64Slice(result.MetricDataResults[0].Values) } fmt.Printf("Start Time: %s\n", startTime.Format(time.RFC3339)) fmt.Printf("End Time: %s\n", endTime.Format(time.RFC3339)) // for _, value := range result.MetricDataResults[0].Values { // fmt.Println(*value) // } if aggregatedValue >= targetValue { fmt.Printf("Target metric value met or exceeded: %.2f\n", aggregatedValue) } else { fmt.Printf("Current aggregated value: %.2f, below target.\n", aggregatedValue) } time.Sleep(10 * time.Second) } } func sumFloat64Slice(slice []*float64) float64 { sum := float64(0) for _, v := range slice { sum += *v } return sum }
Top comments (0)