Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 66 additions & 47 deletions packages/stepfunction-event-logger/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,64 +62,83 @@ export class StepFunctionEventLogger extends Construct {
targets: [new events_targets.SqsQueue(mainQueue)]
}
)
var datastoreArn = "";
var SQSMessageProcessorFunction = props.lambda;
// If a lambda is provided to the construct, it's the user's responsibility
// to build a datastore and ensure the lambda is connected to it.
// Otherwise, generate the DynamoDB/Postgres table
if (props.lambda === undefined) {
SQSMessageProcessorFunction = new lambda.Function(
this, "SQSMessageProcessorFunction",
{
runtime: lambda.Runtime.PYTHON_3_8,
code: lambda.Code.fromAsset("lambda"),
handler: "event_logger.handler",
timeout: Duration.minutes(1),
events: [new lambda_event_sources.SqsEventSource(mainQueue)]
}
);

// grants message processor lambda permission to consume messages from SQS Queue
mainQueue.grantConsumeMessages(SQSMessageProcessorFunction)

// grants message processor lambda permsisions to read stepfunction execution history
props.stepfunctions.forEach(sf => { sf.grantRead(SQSMessageProcessorFunction!) });

if (props.datastore === "Dynamodb") {
const dynamodb_datastore = new dynamodb.Table(
this, "EventsDatastore", {
partitionKey: {
name: "execution_arn",
type: dynamodb.AttributeType.STRING
},
sortKey: {
name: "timestamp",
type: dynamodb.AttributeType.STRING
}
});
// If a lambda is provided to the construct, it's the user's responsibility
// to build a datastore and ensure the lambda is connected to it.
const SQSMessageProcessorFunction = props.lambda || this.createMessageProcessorFunction(mainQueue);

// grants message processor lambda permission to write to DynamoDB
dynamodb_datastore.grantWriteData(SQSMessageProcessorFunction)
// grants message processor lambda permission to consume messages from SQS Queue
mainQueue.grantConsumeMessages(SQSMessageProcessorFunction)

datastoreArn = dynamodb_datastore.tableArn;
// grants message processor lambda permsisions to read stepfunction execution history
props.stepfunctions.forEach(sf => { sf.grantRead(SQSMessageProcessorFunction) });
}

} else if (props.datastore === "Postgres") {
// TODO:
// const postgres_datastore = new ...
createMessageProcessorFunction (
mainQueue: sqs.Queue,
eventLoggingLevel?: EventLoggingLevel,
datastore?: Datastore
) {
const SQSMessageProcessorFunction = new lambda.Function(
this, "SQSMessageProcessorFunction",
{
runtime: lambda.Runtime.PYTHON_3_8,
code: lambda.Code.fromAsset("lambda"),
handler: "event_logger.handler",
timeout: Duration.minutes(1),
events: [new lambda_event_sources.SqsEventSource(mainQueue)]
}
);

// TODO: grant lambda access to postgres_datastore
// datastoreArn = postgres_datastore.tableArn;
};

// TODO: is this prop really optional?
if (eventLoggingLevel) {
SQSMessageProcessorFunction.addEnvironment(
"EVENT_LOGGING_LEVEL", props.eventLoggingLevel!
"EVENT_LOGGING_LEVEL", eventLoggingLevel
)
}

// TODO: is this prop really optional?
if (datastore) {
SQSMessageProcessorFunction.addEnvironment(
"DATASTORE_TYPE", props.datastore!
"DATASTORE_TYPE", datastore
)

this.addDatastoreForLambda(SQSMessageProcessorFunction, datastore);
}

return SQSMessageProcessorFunction;
}

addDatastoreForLambda(
SQSMessageProcessorFunction: lambda.Function,
datastore: Datastore
) {
if (datastore === Datastore.DYNAMODB) {
const dynamodb_datastore = new dynamodb.Table(
this, "EventsDatastore", {
partitionKey: {
name: "execution_arn",
type: dynamodb.AttributeType.STRING
},
sortKey: {
name: "timestamp",
type: dynamodb.AttributeType.STRING
}
});

// grants message processor lambda permission to write to DynamoDB
dynamodb_datastore.grantWriteData(SQSMessageProcessorFunction)

SQSMessageProcessorFunction.addEnvironment(
"DATASTORE_ARN", datastoreArn!
"DATASTORE_ARN", dynamodb_datastore.tableArn
)
}
} else if (datastore === Datastore.POSTGRES) {
// TODO:
// const postgres_datastore = new ...

// TODO: grant lambda access to postgres_datastore
// datastoreArn = postgres_datastore.tableArn;
};
}
}
2 changes: 1 addition & 1 deletion packages/stepfunction-event-logger/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@
"@aws-cdk/aws-stepfunctions": "^1.73.0",
"@aws-cdk/core": "^1.73.0"
}
}
}