DynamoDBストリームの利用

Pocket

こんにちは。技術開発部・配信/インフラチームの二階堂です。 弊社DSP「Bypass」ではDynamoDBを利用しております。 今回はその機能の一つであるDynamoDBストリームを紹介したいと思います。

DynamoDBストリームとは

DynamoDBストリームはDynamoDBへの書き込み・更新・削除処理(ttlによる自動削除を含む)を最大24時間保存する機能です。 テーブル設定で「ストリーム有効」にすると保存されるようになります。 保存内容は「キーのみ」「新しいイメージ」「古いイメージ」「新旧イメージ」の4種類から選択可能で、必要な情報のみを保存することで使用料を抑えることが可能です。

ストリームレコードの保存形式

DynamoDBストリームに保存されたデータを扱うにあたり、まずはデータがどのような形式で保存されるかを説明したいと思います。低レベルAPIを利用する場合はこの知識が必須です。

DynamoDBストリームのデータは下図のように複数のシャードから構成され、シャードは複数のストリームレコードから構成されます。 ストリームレコードはDynamoDBへの書き込み処理などリクエスト一つ一つに対応します。 シャードには順番が存在し各シャードには次のシャードのidが保存されています。

(図は公式ページより引用)

ストリームレコードの取得方法

低レベルAPIを利用してデータを取得する際には次の手順で取得します。

  1. DynamoDBのテーブル名からdescribe-tableでLatestStreamArnを取得する
  2. DynamoDBStreamsのdescribe-streamで取得したLatestStreamArnからシャードのリストを取得する
  3. リストの一番最初のシャードのシャードidとLatestStreamArnを使ってget-shard-iteratorでイテレータを取得する
  4. 取得したイテレータを使ってget-recordsでストリームレコードのリストを取得する
  5. ストリームレコードを各々処理する
  6. 2で取得したシャードそれぞれに対して3~5を行う

この手順の注意点は2,4でリストを取得する時に対象が多すぎると一度に全て取得することができない点です。 手順2ではLastEvaluatedShardIdが入っていた場合、手順4ではNextShardIteratorが入っていた場合にはその値を使って再度取得する必要があります。

また、手順4ではシャードが閉じていない場合には次のストリームレコードが存在しなくてもNextShardIteratorが入っています。 そのため常時実行するタイプの処理ではない場合には、数回連続でget-recordsの結果が空だった場合にはNextShardIteratorが入っていても中断するなどの処理が必要になります。

まとめ

DynamoDBストリームはttlによる削除検知など痒いところに手が届く便利な機能ですが取得方法が複雑で躓きやすい部分でもあります。 この記事にがその一助となれば幸いです。

今回は以上となります。

AWS Glue + Athena構成を試す
APIを利用したCloudWatchの設定
AWS ストレージサービス「S3」