Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う

Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う

2025.01.05

いわさです。

Amplify Gen2 を使って、DynamoDB でスコアを管理するアプリケーションを作成しています。

このアプリケーションでは、スコアが不定期に加算・減算されるイベントが発生します。
これをスコアアクティビティイベントと呼んでおり、外部から AppSync 経由でトリガーされています。構成図にするとこんな感じでしょうか。

before.png
スコアアクティビティイベントを受信するだけのアプリ

Amplify の Data コンポーネントでスコアアクティビティテーブルを定義するだけなので、以下のみで実現出来ます。

/amplify/data/resource.ts
import { type ClientSchema, a, defineData } from '@aws-amplify/backend'; const schema = a.schema({ ScoreActivities: a .model({ team_id: a.string().required(), score_change: a.integer().required(), }) .authorization((allow) => [ allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"), allow.publicApiKey() ]) }); : 

細切れにチームごとのスコアアクティビティイベントが発生するのですが、チームごとの合計スコアを管理したくなりました。スコアアクティビティテーブルをアプリケーションで集計することも出来るのですが、イベント購読やランキングなどを考えると「合計スコア」テーブルで管理出来ると良いのではと考えました。

そこで、今回は次のように DynamoDB Streams を経由して、スコアアクティビティイベント発生時に Lambda 関数経由で合計スコアテーブルを更新仕組みを実装してみたいと思います。
構成図で表現すると次のピンク色の枠の中を実装するイメージです。

after.png

TotalScore テーブルの実装

まずは合計スコアを管理するテーブル「TotalScore」を Data コンポーネント経由で定義します。
これは Amplify Gen2 でスキーマ定義を追加するだけなので簡単に実装が出来ます。

/amplify/data/resource.ts
import { type ClientSchema, a, defineData } from '@aws-amplify/backend'; const schema = a.schema({ ScoreActivities: a .model({ team_id: a.string().required(), score_change: a.integer().required(), }) .authorization((allow) => [ allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"), allow.publicApiKey() ]), TotalScore: a .model({ team_id: a.string().required(), total_score: a.integer().required(), }) .identifier(['team_id']) .authorization((allow) => [ allow.ownerDefinedIn("team_id").identityClaim("custom:team_id"), allow.publicApiKey() ]), }); : 

デプロイ後、Amplify コンソールのデータマネージャーから新しいテーブルが追加されていることを確認出来ました。

5616CA8F-9A78-443C-B43E-DAE1D20CEEFF.png

DynamoDB Streams から Lambda 関数をトリガー

続いて、ScoreActivities テーブルにアイテムが追加されたタイミングで TotalScore テーブルの合計スコア値を更新したいと思いますので、DynamoDB Streams 経由で Lambda 関数をトリガーします。

Amplify Gen2 では DyanamoDB Streams がデフォルトで有効になっており、Lambda 関数を定義してイベントマッピングを作成してやれば動作します。

https://docs.amplify.aws/react/build-a-backend/functions/examples/dynamo-db-stream/

Lambda 関数は先程定義した TotalScore へリクエストを送信する必要があるので、Amplify が生成する GraphQL エンドポイント URL と API キーを、Lambda 関数に引き渡します。

先日記事に書いたのですが、このパターンは循環依存が発生するケースになりますので、次の記事を参考に Lambda 関数は Amplify 管理外のカスタムリソースとして定義しました。

https://dev.classmethod.jp/articles/amplify-gen2-circular-lambda/

最終的な実装は次のような感じに。別途記事にしたいと思いますが StartingPosition の仕様については少し注意したほうが良いです。データ損失が起きる場合があるので。

amplify/backend.ts
import { defineBackend } from '@aws-amplify/backend'; import { auth } from './auth/resource'; import { data } from './data/resource'; import { preTokenGenerationV2 } from './auth/pre-token-generation-v2/resource'; import { Effect, PolicyStatement } from 'aws-cdk-lib/aws-iam'; import { EventSourceMapping, StartingPosition } from 'aws-cdk-lib/aws-lambda'; import { NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs'; import * as url from 'node:url'; const backend = defineBackend({ auth, data, preTokenGenerationV2, }); const { cfnUserPool } = backend.auth.resources.cfnResources cfnUserPool.addPropertyOverride("LambdaConfig.PreTokenGenerationConfig",{ LambdaVersion: 'V2_0', LambdaArn: backend.preTokenGenerationV2.resources.lambda.functionArn, }); const updateTotalScoreStack = backend.createStack('UpdateTotalScoreStack'); const funcitonUpdateTotalScoreOnActivity = new NodejsFunction( updateTotalScoreStack, 'update-totalscore-on-activity', { entry: url.fileURLToPath(new URL('./functions/update-totalscore-on-activity/handler.ts', import.meta.url)), environment: { APPSYNC_ENDPOINT: backend.data.resources.cfnResources.cfnGraphqlApi.attrGraphQlUrl, APPSYNC_API_KEY: backend.data.resources.cfnResources.cfnApiKey?.attrApiKey || '', }, initialPolicy: [ new PolicyStatement({ effect: Effect.ALLOW, actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", ], resources: ["*"], }), ], } ) new EventSourceMapping( updateTotalScoreStack, "TotalScoreFunctionEventSourceMapping", { eventSourceArn: backend.data.resources.tables["ScoreActivities"].tableStreamArn, target: funcitonUpdateTotalScoreOnActivity, batchSize: 100, startingPosition: StartingPosition.LATEST, } ); 

Lambda 関数から AppSync 経由で TotalScore を更新

最後に DynamoDB Streams からトリガーされた Lambda 関数で TotalScore テーブルに書き込みを行えば完了です。

今回は、次の記事を参考に Lambda 関数で Axios を使ってリクエストを送信します。記事ではデフォルトプロバイダーで SigV4 署名していますが、私のほうは諸事情により API キーを使いました。

https://dev.classmethod.jp/articles/amplify-gen2-appsync-mutation-lambda/

イベントで受信したスコアを TotalScore に加算します。該当キーのアイテムが存在しない場合は作成も行います。
コードは生成 AI に手伝ってもらいながら手直しを入れたものです。便利な世の中になったものだ。

amplify/functions/update-totalscore-on-activity/handler.ts
import axios, { AxiosError } from "axios"; import type { DynamoDBStreamHandler } from "aws-lambda"; import { Logger } from "@aws-lambda-powertools/logger"; type TotalScoreInput = { team_id: string; total_score: number; }; type GraphQLError = { message: string; path: string[]; errorType: string; }; const logger = new Logger({ logLevel: "INFO", serviceName: "dynamodb-stream-handler", }); // GraphQL操作の定義 const CREATE_TOTAL_SCORE = ` mutation CreateTotalScore($input: CreateTotalScoreInput!) { createTotalScore(input: $input) { team_id total_score } } `; const UPDATE_TOTAL_SCORE = ` mutation UpdateTotalScore($input: UpdateTotalScoreInput!) { updateTotalScore(input: $input) { team_id total_score } } `; const GET_TOTAL_SCORE = ` query GetTotalScore($team_id: String!) { getTotalScore(team_id: $team_id) { team_id total_score } } `; async function executeGraphQLRequest(query: string, variables: any) { if (!process.env.APPSYNC_ENDPOINT || !process.env.APPSYNC_API_KEY) { throw new Error("Missing required environment variables: APPSYNC_ENDPOINT or APPSYNC_API_KEY"); } const response = await axios.post( process.env.APPSYNC_ENDPOINT, { query, variables, }, { headers: { 'Content-Type': 'application/json', 'x-api-key': process.env.APPSYNC_API_KEY, }, } ); if (response.data.errors) { throw new Error(`GraphQL Error: ${JSON.stringify(response.data.errors)}`); } return response.data; } async function updateOrCreateTotalScore(teamId: string, newTotalScore: number) { try { const updateResponse = await executeGraphQLRequest( UPDATE_TOTAL_SCORE, { input: { team_id: teamId, total_score: newTotalScore } } ); logger.info('Successfully updated total score', { teamId, newTotalScore }); return updateResponse; } catch (error) { if (error instanceof Error) { if (error.message.includes('ConditionalCheckFailedException')) { logger.info('Record not found, creating new total score', { teamId, newTotalScore }); const createResponse = await executeGraphQLRequest( CREATE_TOTAL_SCORE, { input: { team_id: teamId, total_score: newTotalScore } } ); return createResponse; } } throw error; } } export const handler: DynamoDBStreamHandler = async (event) => { const batchItemFailures: Array<{itemIdentifier: string}> = []; logger.info('Starting to process DynamoDB Stream records', { recordCount: event.Records.length }); for (const record of event.Records) { try { if (record.eventName !== 'INSERT') { logger.info(`Skipping non-INSERT event: ${record.eventName}`); continue; } const newImage = record.dynamodb?.NewImage; if (!newImage) { logger.warn('No NewImage in record, skipping'); continue; } const teamId = newImage.team_id?.S; const scoreChange = parseInt(newImage.score_change?.N ?? '0'); if (!teamId || scoreChange === 0) { logger.warn('Invalid record data', { teamId, scoreChange }); continue; } const getTotalScoreResponse = await executeGraphQLRequest( GET_TOTAL_SCORE, { team_id: teamId } ); const currentScore = getTotalScoreResponse.data.getTotalScore?.total_score ?? 0; const newTotalScore = currentScore + scoreChange; const result = await updateOrCreateTotalScore(teamId, newTotalScore); logger.info('Successfully processed score change', { teamId, scoreChange, currentScore, newTotalScore, operation: result.data.createTotalScore ? 'create' : 'update' }); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); const errorStack = error instanceof Error ? error.stack : undefined; logger.error('Error processing record', { error: errorMessage, stack: errorStack, recordId: record.eventID }); if (record.eventID) { batchItemFailures.push({ itemIdentifier: record.eventID }); } } } return { batchItemFailures }; }; 

実行結果

ではデプロイ後に試してみます。
まずは ScoreActivities テーブルで、team1 に対して 1000 を加算します。

246F144B-256F-4A2F-B952-3F516656CE41.png

TotalScore を確認してみると、team1 の合計スコアが 1000 になっていることを確認出来ました。新規作成の部分うまく動いていそうです。

C0589BF1-05FB-487E-856B-A0797C750EE4.png

続いて同じチームに対して、さらに 200 加算します。

1BA31C1E-8686-4F33-93C5-0BF57263BB22.png

1200 に加算されました。更新も出来てますね。

804C8C59-21A3-425B-8BA6-EA0C6BE9DCA6.png

続いて違うチームのスコアを発生させてみました。

31616A32-65A7-4628-B5BF-3FA62B9F274D.png

既存のチームには影響なく新しいチームのアイテムが作成されました。
この時点では team1 のほうがスコアが高いです。

E46CD4AF-6357-4375-A2CD-08F94A30CFB9.png

ここで team1 に対して減算イベントを発生させます。-300 です。

CE305C28-AB28-4FEE-BF07-042F4901E42F.png

減算も出来てますね。team2 のほうが合計スコアが大きくなりました。

0194A02B-3471-44D6-B64B-47C447971030.png

さいごに

本日は Amplify Gen2 で DynamoDB Streams を使って DynamoDB テーブル間のデータ連携を行う方法として、 AppSync スキーマ間の連携に DynamoDB Streams 経由を使ってみました。まぁ悪くないのではと思いますがどうでしょう。

Amplify + TypeScript 力があまり高くないもので、生成 AI ベースの Lambda 関数となっているので、見る人が見ると「なんだこのコードは」となるのかもしれません。そのあたりは大目に見てください。

この記事をシェアする

FacebookHatena blogX

関連記事