Introduction
AWS Timestream is a fast, scalable, and serverless time series database service for IoT and operational applications. Lambda is a computing service that lets you run code without provisioning or managing servers. This guide will walk you through creating a Timestream database and inserting the data from the s3 bucket's Excel file using a Lambda function.
1. Create an initial lambda function
Follow Developing AWS Lambda Functions In Locally to create the initial lambda function.
2. Create additional scripts
I have created a Python script to create a S3 bucket and upload the Excel file into it.
create_and_upload.py
import boto3 import botocore import os def create_s3_bucket(bucket_name, region=None): s3_client = boto3.client('s3', region_name=region) try: if region is None: s3_client.create_bucket(Bucket=bucket_name) else: location = {'LocationConstraint': region} s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location) except botocore.exceptions.ClientError as e: print(f"Error occurred while creating bucket: {e}") return False return True def upload_file_to_s3(file_name, bucket_name, object_name=None): # Check if the file exists if not os.path.exists(file_name): print(f"Error: The file {file_name} does not exist.") return False if object_name is None: object_name = os.path.basename(file_name) # Extracts just the file name from the full file path s3_client = boto3.client('s3') try: s3_client.upload_file(file_name, bucket_name, object_name) print(f"File '{file_name}' has been uploaded to bucket '{bucket_name}' as '{object_name}'") return True except Exception as e: print(f"Error occurred while uploading file: {str(e)}") return False def main(): bucket_name = 's3-bucket-name' # Replace with your unique bucket name region = 'region-name' # Replace with your desired region excel_file_path = r'excel-file-path.xlsx' # Replace with your local Excel file path # Create S3 bucket if create_s3_bucket(bucket_name, region): print(f"Bucket '{bucket_name}' created successfully.") # Upload file to S3 if upload_file_to_s3(excel_file_path, bucket_name): print(f"File '{excel_file_path}' uploaded successfully to '{bucket_name}'.") if __name__ == '__main__': main()
3. Write the lambda function
3.1 File Structure
CSVTimestream | |--events | |---event.json | |--timestream | |---app.py | |---samconfig.toml |---template.yaml
My template.yaml file will be the as follows.
3.2 Codes
template.yaml
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > CSVTimestream Sample SAM Template for CSVTimestream # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 300 MemorySize: 128 Resources: TimestreamLambdaFunction: Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction Properties: CodeUri: timestream/ Handler: app.lambda_handler Runtime: python3.9 Architectures: - x86_64 Role: !GetAtt LambdaExecutionRole.Arn LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: [ lambda.amazonaws.com ] Action: [ 'sts:AssumeRole' ] Policies: - PolicyName: TimestreamAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: [ 'timestream:*' ] Resource: '*' - PolicyName: S3BucketAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: [ 's3:GetObject' ] Resource: '*'
My lambda function will be the as follows.
app.py
import boto3 import pandas as pd from botocore.exceptions import ClientError from io import BytesIO # Initialize clients s3_client = boto3.client('s3') timestream_write = boto3.client('timestream-write') # Constants database_name = 'timestream-db-name' BUCKET_NAME = 's3-bucket-name' OBJECT_KEY = 'excel-file-name.xlsx' def create_database(database_name): try: timestream_write.create_database(DatabaseName=database_name) print(f"Database {database_name} created.") except ClientError as e: print(f"Database creation failed: {e}") def create_table(table_name): try: retention_properties = { 'MemoryStoreRetentionPeriodInHours': 24, 'MagneticStoreRetentionPeriodInDays': 7 } timestream_write.create_table(DatabaseName=database_name, TableName=table_name, RetentionProperties=retention_properties) print(f"Table {table_name} created.") except ClientError as e: print(f"Table creation failed: {e}") def get_excel_file(bucket, key): s3_client = boto3.client('s3') response = s3_client.get_object(Bucket=bucket, Key=key) return BytesIO(response['Body'].read()) def process_excel_file(excel_file): # Read the Excel file xls = pd.ExcelFile(excel_file) # Process each sheet in the Excel file for sheet_name in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet_name) # Create a table for each sheet create_table(sheet_name) # Write records to Timestream write_records(df, sheet_name) def write_records(df, table_name): records = [] version = 1 # Start with a base version number for index, row in df.iterrows(): time_str = row['time'].replace('"', '') time_dt = pd.to_datetime(time_str) timestamp_ms = int(time_dt.timestamp() * 1000) # measure_value = row['measure_value::double'] # Build the list of dimensions. dimensions = [ {'Name': 'col_1_name', 'Value': str(row['col_1_name'])}, {'Name': 'col_2_name', 'Value': str(row['col_2_name'])} . . .#continue this based on your Excel file columns ] # Include additional dimensions based on the sheet structure. if 'addi_col' in df.columns: dimensions.append({'Name': 'addi_col', 'Value': str(row['addi_col'])}) record = { 'Dimensions': dimensions, 'MeasureName': row['col_name'], 'MeasureValue': str(row['col_name::double']), # i have added this based on my Excel file 'MeasureValueType': 'DOUBLE', 'Time': str(timestamp_ms), 'Version': version # Adding a version number } records.append(record) version += 1 # Increment the version for each record try: result = timestream_write.write_records(DatabaseName=database_name, TableName=table_name, Records=records, CommonAttributes={}) print( f"Records written to table {table_name} successfully with status: {result['ResponseMetadata']['HTTPStatusCode']}") except timestream_write.exceptions.RejectedRecordsException as e: print("Error writing records:", e) for rejected_record in e.response['RejectedRecords']: print("Rejected Record:", rejected_record) except ClientError as e: print(f"Error writing records: {e}") def lambda_handler(event, context): # Create the Timestream database create_database(database_name) # Get the Excel file from S3 excel_file = get_excel_file(BUCKET_NAME, OBJECT_KEY) # Process the Excel file process_excel_file(excel_file) return { 'statusCode': 200, 'body': "Data loaded to Timestream successfully." }
samconfig.toml will be as follows.
samconfig.toml
# More information about the configuration file can be found here: # https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html version = 0.1 [default] [default.global.parameters] stack_name = "CSVTimestream" [default.build.parameters] cached = true parallel = true [default.validate.parameters] lint = true [default.deploy.parameters] capabilities = "CAPABILITY_IAM" confirm_changeset = true resolve_s3 = true s3_prefix = "CSVTimestream" region = "aws-region" image_repositories = [] [default.package.parameters] resolve_s3 = true [default.sync.parameters] watch = true [default.local_start_api.parameters] warm_containers = "EAGER" [default.local_start_lambda.parameters] warm_containers = "EAGER"
4. Finally
Deploy the lambda function and test it using the local invoke
command. You'll see the Timestream DB has been created and its tables with data.
Top comments (0)