DEV Community

Randika Madhushan Perera
Randika Madhushan Perera

Posted on

Creating AWS Timestream Database Using A Lambda Function

Introduction

AWS Timestream is a fast, scalable, and serverless time series database service for IoT and operational applications. Lambda is a computing service that lets you run code without provisioning or managing servers. This guide will walk you through creating a Timestream database and inserting the data from the s3 bucket's Excel file using a Lambda function.

diagram

1. Create an initial lambda function

Follow Developing AWS Lambda Functions In Locally to create the initial lambda function.

2. Create additional scripts

I have created a Python script to create a S3 bucket and upload the Excel file into it.

create_and_upload.py

import boto3 import botocore import os def create_s3_bucket(bucket_name, region=None): s3_client = boto3.client('s3', region_name=region) try: if region is None: s3_client.create_bucket(Bucket=bucket_name) else: location = {'LocationConstraint': region} s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location) except botocore.exceptions.ClientError as e: print(f"Error occurred while creating bucket: {e}") return False return True def upload_file_to_s3(file_name, bucket_name, object_name=None): # Check if the file exists if not os.path.exists(file_name): print(f"Error: The file {file_name} does not exist.") return False if object_name is None: object_name = os.path.basename(file_name) # Extracts just the file name from the full file path s3_client = boto3.client('s3') try: s3_client.upload_file(file_name, bucket_name, object_name) print(f"File '{file_name}' has been uploaded to bucket '{bucket_name}' as '{object_name}'") return True except Exception as e: print(f"Error occurred while uploading file: {str(e)}") return False def main(): bucket_name = 's3-bucket-name' # Replace with your unique bucket name region = 'region-name' # Replace with your desired region excel_file_path = r'excel-file-path.xlsx' # Replace with your local Excel file path # Create S3 bucket if create_s3_bucket(bucket_name, region): print(f"Bucket '{bucket_name}' created successfully.") # Upload file to S3 if upload_file_to_s3(excel_file_path, bucket_name): print(f"File '{excel_file_path}' uploaded successfully to '{bucket_name}'.") if __name__ == '__main__': main() 
Enter fullscreen mode Exit fullscreen mode

3. Write the lambda function

3.1 File Structure

CSVTimestream | |--events | |---event.json | |--timestream | |---app.py | |---samconfig.toml |---template.yaml 
Enter fullscreen mode Exit fullscreen mode

My template.yaml file will be the as follows.

3.2 Codes

template.yaml

AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > CSVTimestream Sample SAM Template for CSVTimestream # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 300 MemorySize: 128 Resources: TimestreamLambdaFunction: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: timestream/ Handler: app.lambda_handler Runtime: python3.9 Architectures: - x86_64 Role: !GetAtt LambdaExecutionRole.Arn LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: [ lambda.amazonaws.com ] Action: [ 'sts:AssumeRole' ] Policies: - PolicyName: TimestreamAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: [ 'timestream:*' ] Resource: '*' - PolicyName: S3BucketAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: [ 's3:GetObject' ] Resource: '*' 
Enter fullscreen mode Exit fullscreen mode

My lambda function will be the as follows.

app.py

import boto3 import pandas as pd from botocore.exceptions import ClientError from io import BytesIO # Initialize clients s3_client = boto3.client('s3') timestream_write = boto3.client('timestream-write') # Constants database_name = 'timestream-db-name' BUCKET_NAME = 's3-bucket-name' OBJECT_KEY = 'excel-file-name.xlsx' def create_database(database_name): try: timestream_write.create_database(DatabaseName=database_name) print(f"Database {database_name} created.") except ClientError as e: print(f"Database creation failed: {e}") def create_table(table_name): try: retention_properties = { 'MemoryStoreRetentionPeriodInHours': 24, 'MagneticStoreRetentionPeriodInDays': 7 } timestream_write.create_table(DatabaseName=database_name, TableName=table_name, RetentionProperties=retention_properties) print(f"Table {table_name} created.") except ClientError as e: print(f"Table creation failed: {e}") def get_excel_file(bucket, key): s3_client = boto3.client('s3') response = s3_client.get_object(Bucket=bucket, Key=key) return BytesIO(response['Body'].read()) def process_excel_file(excel_file): # Read the Excel file xls = pd.ExcelFile(excel_file) # Process each sheet in the Excel file for sheet_name in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet_name) # Create a table for each sheet create_table(sheet_name) # Write records to Timestream write_records(df, sheet_name) def write_records(df, table_name): records = [] version = 1 # Start with a base version number for index, row in df.iterrows(): time_str = row['time'].replace('"', '') time_dt = pd.to_datetime(time_str) timestamp_ms = int(time_dt.timestamp() * 1000) # measure_value = row['measure_value::double'] # Build the list of dimensions. dimensions = [ {'Name': 'col_1_name', 'Value': str(row['col_1_name'])}, {'Name': 'col_2_name', 'Value': str(row['col_2_name'])} . . .#continue this based on your Excel file columns ] # Include additional dimensions based on the sheet structure. if 'addi_col' in df.columns: dimensions.append({'Name': 'addi_col', 'Value': str(row['addi_col'])}) record = { 'Dimensions': dimensions, 'MeasureName': row['col_name'], 'MeasureValue': str(row['col_name::double']), # i have added this based on my Excel file 'MeasureValueType': 'DOUBLE', 'Time': str(timestamp_ms), 'Version': version # Adding a version number } records.append(record) version += 1 # Increment the version for each record try: result = timestream_write.write_records(DatabaseName=database_name, TableName=table_name, Records=records, CommonAttributes={}) print( f"Records written to table {table_name} successfully with status: {result['ResponseMetadata']['HTTPStatusCode']}") except timestream_write.exceptions.RejectedRecordsException as e: print("Error writing records:", e) for rejected_record in e.response['RejectedRecords']: print("Rejected Record:", rejected_record) except ClientError as e: print(f"Error writing records: {e}") def lambda_handler(event, context): # Create the Timestream database create_database(database_name) # Get the Excel file from S3 excel_file = get_excel_file(BUCKET_NAME, OBJECT_KEY) # Process the Excel file process_excel_file(excel_file) return { 'statusCode': 200, 'body': "Data loaded to Timestream successfully." } 
Enter fullscreen mode Exit fullscreen mode

samconfig.toml will be as follows.

samconfig.toml

# More information about the configuration file can be found here: # https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html version = 0.1 [default] [default.global.parameters] stack_name = "CSVTimestream" [default.build.parameters] cached = true parallel = true [default.validate.parameters] lint = true [default.deploy.parameters] capabilities = "CAPABILITY_IAM" confirm_changeset = true resolve_s3 = true s3_prefix = "CSVTimestream" region = "aws-region" image_repositories = [] [default.package.parameters] resolve_s3 = true [default.sync.parameters] watch = true [default.local_start_api.parameters] warm_containers = "EAGER" [default.local_start_lambda.parameters] warm_containers = "EAGER" 
Enter fullscreen mode Exit fullscreen mode

4. Finally

Deploy the lambda function and test it using the local invoke command. You'll see the Timestream DB has been created and its tables with data.

Top comments (0)