sources/new-dynamodb-stream-event/new-dynamodb-stream-event.mjs
import common from "../../common/common-dynamodb-streams.mjs"; export default { ...common, key: "aws-new-dynamodb-stream-event", name: "New DynamoDB Stream Event", description: "Emit new event when a DynamoDB stream receives new events. [See the docs here](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)", type: "source", version: "0.0.6", dedupe: "unique", props: { ...common.props, stream: { type: "string", label: "Stream", description: "ARN of the Stream to watch for new events", async options({ prevContext }) { const { streams, lastEvaluatedStreamArn, } = await this.listEnabledStreams({ ExclusiveStartStreamArn: prevContext.lastEvaluatedStreamArn, }); const options = streams.map((stream) => ({ label: `${stream.TableName} - Created ${stream.StreamLabel}`, value: stream.StreamArn, })); return { options, context: { lastEvaluatedStreamArn, }, }; }, }, }, hooks: { async deploy() { const { stream, _getShardIterators, _setShardIterators, listShards, getShardIterator, } = this; const shardIterators = _getShardIterators(); const { StreamDescription: streamDescription } = await listShards({ StreamArn: stream, Limit: 100, }); if (streamDescription?.Shards?.length === 0) { throw new Error("No shards found in stream"); } const activeShards = streamDescription.Shards .filter((shard) => !shard.SequenceNumberRange.EndingSequenceNumber); if (activeShards.length === 0) { throw new Error("No active shards found"); } const shardIds = activeShards.map(({ ShardId }) => ShardId); for (const shardId of shardIds) { const { ShardIterator: shardIterator } = await getShardIterator({ ShardId: shardId, StreamArn: stream, ShardIteratorType: "LATEST", }); shardIterators[shardId] = shardIterator; } _setShardIterators(shardIterators); }, }, methods: { ...common.methods, _getShardIterators() { return this.db.get("shardIterators") || {}; }, _setShardIterators(value) { this.db.set("shardIterators", value); }, generateMeta({ eventID, eventName, dynamodb, }) { return { id: eventID, summary: `New Event: ${eventName}`, ts: Date.parse(dynamodb.ApproximateCreationDateTime), }; }, }, async run() { const { stream, isStreamEnabled, _getShardIterators, _setShardIterators, getRecords, generateMeta, } = this; if (!(await isStreamEnabled(stream))) { throw new Error("Stream is no longer enabled."); } const shardIterators = _getShardIterators(); for (const [ shardId, shardIterator, ] of Object.entries(shardIterators)) { if (!shardIterator) { continue; } const { Records: records, NextShardIterator: nextShardIterator, } = await getRecords({ ShardIterator: shardIterator, Limit: 100, }); for (const record of records) { const meta = generateMeta(record); this.$emit(record, meta); } shardIterators[shardId] = nextShardIterator; } _setShardIterators(shardIterators); }, };