前置き
AWSの中でも特に有名なサービスのひとつとしてDynamoDBがあります。手軽に使えるNoSQLデータベースとして中心的な存在ですが、一般的なRDBとは仕様が大きく異なっているため、活用には敷居が高いという話もよく聞きます。
IoTデータとDynamoDB
一例としてこのようなIoTシステムのデータを考えてみましょう。
システムには8つのデバイスがあり、それぞれの装置が8つのチャンネルを持っています。チャンネルごとに測定値が時系列データとして保存されており、用途に応じてこれらからデータを検索してくるというのがミッションです。
DynamoDBでは、検索(Query)に利用できるキーとして
- プライマリキー
- ソートキー
の2つを用意することができる、という仕様があります。言い換えると、一般的なDBとは違って自由なキーで検索することは(データベースの機能だけでは)できません。ですから、事前にどのような検索を行いたいかを検討し、これらのキーを上手に設定してやることが肝心です。
IoTデータによくある検索の例としてはこのようなものがあるでしょう。
- デバイス名、チャンネル番号を指定して、ある時間帯のデータを出力したい
このような検索をかんたんに行うためのキー設計を考えると、ちょっと困ったことになってしまいます。
- デバイス名で検索したいので、プライマリキーをデバイス名に設定
- 時間を範囲指定したいので、測定時間をソートキーに設定
と、ここまでは良いのですが、これではチャンネル番号で検索することができなくなります。
- デバイス名で検索したいので、プライマリキーをデバイス名に設定
- チャンネル番号でも検索したいので、ソートキーをチャンネル番号に設定
こういうふうに考えるかもしれませんが、DynamoDBには
- プライマリキー + ソートキーの組み合わせがユニークでなければならない
という強い制約があるので、そもそもデータを格納することができなくなります。同一のデバイス名、同一のチャンネル番号を持つデータは時間を追って大量に増えていきますから、ユニークどころの騒ぎではありません。
更に困ったことに、ひとつのデバイスが複数の測定チャンネルを持つような構成で、かつ全てのチャンネル測定が同時に行われる(正確には、全てのチャンネルのタイムスタンプが同一になる)場合には、上記のような作戦はいっさい使えなくなります。というのは、同じデバイス名、同じタイムスタンプのデータがチャンネルの数だけ同時に存在することになるため、DynamoDBの制約が満たせなくなるからです。
さてどうしようか…というのが本記事の内容になります。
まずは基礎から
DynamoDBテーブル作成
まずは、本件を検討するためのデータベースを作ってみましょう。まずはこのようなcsvファイルを用意してみます。これは8つのデバイスのそれぞれ8つのチャンネルについて10msごとのデータを持つ、2,560,000行、214MBのcsvファイルです。
id,timestamp,name,ch,value
7e302c45-9abf-45b0-8021-4f6dc0cb9a53,1737272803.776,device-1,ch1,2.129
baf76549-94e4-4527-a0b2-c106002f49fa,1737272803.776,device-1,ch2,1.665
4feb75c5-e353-4521-824a-6e8020c1591a,1737272803.776,device-1,ch3,2.025
b787b894-80ff-4a51-8fcf-5468d576dfa5,1737272803.776,device-1,ch4,2.390
04f0d5d4-1bfd-41ce-8f68-5de672517bfd,1737272803.776,device-1,ch5,2.031
8e865a85-a7c8-4592-b981-0dfb7db3f7de,1737272803.776,device-1,ch6,2.658
fc141054-ce22-4b6c-b178-726cb8aacf32,1737272803.776,device-1,ch7,2.458
4f780123-dda9-4bd7-ab04-78f93d3eddba,1737272803.776,device-1,ch8,2.281
f0c331d3-04b8-4cd8-8854-de6add1c8687,1737272803.786,device-1,ch1,2.633
58d7c167-0530-4aa1-a905-cb7a6c368c4e,1737272803.786,device-1,ch2,1.743
本来のデータに加え、固有のid(uuid4)が追加されているのが分かります。これをパーティションキーとして利用することで、DynamoDBの制約であるキーのユニーク性を保証することができるようになります。
これをS3にアップロードした後、DynamoDBにインポートしてやります。パーティションキーはid、ソートキーはタイムスタンプです。
$ aws dynamodb import-table --s3-bucket-source S3Bucket=bucket,S3KeyPrefix=test.csv \
--input-format CSV \
--table-creation-parameters '{"TableName":"ddb-test",\
"KeySchema": [{"AttributeName":"id","KeyType":"HASH"}, {"AttributeName":"timestamp","KeyType":"RANGE"}],\
"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"},{"AttributeName":"timestamp","AttributeType":"N"}],\
"BillingMode":"PAY_PER_REQUEST"}' \
--input-format-options '{"Csv":{"Delimiter": ","}}'
{
"ImportTableDescription": {
"ImportArn": "*",
"ImportStatus": "IN_PROGRESS",
"TableArn": "*",
"TableId": "*",
"ClientToken": "*",
"S3BucketSource": {
"S3BucketOwner": "++++++++++",
"S3Bucket": "bucket",
"S3KeyPrefix": "test_s.csv"
},
"ErrorCount": 0,
"InputFormat": "CSV",
"InputFormatOptions": {
"Csv": {
"Delimiter": ","
}
},
"InputCompressionType": "NONE",
"TableCreationParameters": {
"TableName": "ddb-test-s",
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
},
{
"AttributeName": "timestamp",
"AttributeType": "N"
}
],
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
},
{
"AttributeName": "timestamp",
"KeyType": "RANGE"
}
],
"BillingMode": "PAY_PER_REQUEST"
},
"StartTime": "2025-01-19T07:31:16.721000+00:00",
"ProcessedItemCount": 0,
"ImportedItemCount": 0
}
}
こんな感じのテーブルが生成されました。ただし結構な時間が(10分ほど)かかりますのでのんびりお待ちください。
中身はこういう感じです。
素朴に検索してみよう
さて準備ができたので、まずは素朴に検索してみましょう。デバイス名「device-4」のチャンネル番号3のデータを抽出してみます。
前述の通り、DynamoDBでは事前にキーとして指定したものしか検索に用いることができません。なので、素朴な方法では
- 全件を取得
- 受け取った側で抽出
という流れで処理を行うことになります。この方法のメリットとして
- 設計が単純で、開発工数が少ない
反面、デメリットとして
- 検索効率が極めて悪い
という点が挙げられます。検索ごとに全件スキャンが発生するため、処理時間、コストのいずれも非常に効率が悪くなっています。なので
- 開発初期のテスト段階
- 検索頻度が低く、効率が悪くても特に問題が無い場合
などは、素直にこの方法を採用することができるでしょう。ちなみに今回のデータでは件数が多すぎるため、素朴な方法では実用的に検索することが困難でした。
ちょっと工夫してみよう
GSIの活用
DynamoDBにはGSI(Global Secondary Index, グローバルセカンダリインデックス)という仕組みが用意されています。これは元々のテーブルとは別に、異なったインデックスを持つ仮想的なテーブルを作成することができるというものです。今回のケースではデバイス名をキーに含めたGSIを作成することで、元々のテーブルでは行うことができない「デバイス名による検索」ができるようにします。
$ aws dynamodb update-table --no-cli-pager \
--table-name "ddb-test" \
--attribute-definitions '[{"AttributeName":"name","AttributeType":"S"},{"AttributeName":"timestamp","AttributeType":"N"}]' \
--global-secondary-index-updates '[{"Create": {"IndexName": "test-1","KeySchema": [{"AttributeName":"name","KeyType":"HASH"},{"AttributeName":"timestamp","KeyType":"RANGE"}],\
"Projection":{"ProjectionType":"ALL"}}}]'
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
},
{
"AttributeName": "name",
"AttributeType": "S"
},
{
"AttributeName": "timestamp",
"AttributeType": "N"
}
],
"TableName": "ddb-test-s",
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
},
{
"AttributeName": "timestamp",
"KeyType": "RANGE"
}
],
"TableStatus": "UPDATING",
"CreationDateTime": "2025-01-19T07:58:57.298000+00:00",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "*",
"TableId": "*",
"BillingModeSummary": {
"BillingMode": "PAY_PER_REQUEST",
"LastUpdateToPayPerRequestDateTime": "2025-01-19T08:01:33.795000+00:00"
},
"GlobalSecondaryIndexes": [
{
"IndexName": "test-1",
"KeySchema": [
{
"AttributeName": "name",
"KeyType": "HASH"
},
{
"AttributeName": "timestamp",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "ALL"
},
"IndexStatus": "CREATING",
"Backfilling": false,
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0
},
"IndexSizeBytes": 0,
"ItemCount": 0,
"IndexArn": "*"
}
],
"DeletionProtectionEnabled": false,
"WarmThroughput": {
"ReadUnitsPerSecond": 12000,
"WriteUnitsPerSecond": 4000,
"Status": "ACTIVE"
}
}
}
これまた時間がかかりますがゆっくりと待ちましょう。GSIを作成したことにより、「デバイス名による検索」、即ちデバイス名をキーとしたQueryができるようになって検索効率がぐっと向上します。
ただ、この場合でもチャンネル番号による絞り込みはできないため、デバイス名での検索結果を全件取得した後にアプリケーション側でチャンネル番号による抽出を行う必要があります。素朴な方法ではちょっとまずい、という場合には有効ですが、まだまだ工夫の余地がありそうです。
もう一工夫してみよう
複合ソートキー
DynamoDBでは複合キーという機能があり、複数のキーからなる仮想のソートキーを設定してやることができます。この機能をGSIといっしょに使うことで様々な検索を行うことが可能です。
再びテーブル作成
テーブル作成時に用いる元データ(csvファイル)に、複合キーとして用いるためのデータを追加してやりましょう。具体的には
- [チャンネル番号]#[タイムスタンプ]
という列を追加してやります。こんな感じのデータです。sk1という列が追加されており、チャンネル番号とタイムスタンプが#を挟んで連結された形になっています。
id,timestamp,name,ch,sk1,value
f8f781e2-929d-416c-9fa8-3286399a67de,1737289416.506,device-1,ch1,ch1#1737289416.506,1.592
7004036c-2ab8-459f-80f1-6b52c08ed3a7,1737289416.506,device-1,ch2,ch2#1737289416.506,1.696
b12cc187-17b3-4cd7-be28-4dfb085bf839,1737289416.506,device-1,ch3,ch3#1737289416.506,2.364
f989e86b-de5c-4915-bb6a-34a8a1bd9694,1737289416.506,device-1,ch4,ch4#1737289416.506,1.771
9845bbab-14b4-4d61-a064-03d2fd1e00f2,1737289416.506,device-1,ch5,ch5#1737289416.506,1.931
06ca4eae-243d-4b25-be5e-f591bfd52523,1737289416.506,device-1,ch6,ch6#1737289416.506,2.228
925ad7bb-bb97-4a62-a1c7-4a77471882e6,1737289416.506,device-1,ch7,ch7#1737289416.506,2.450
90befb22-33f8-4b91-8b11-fab9b5ade783,1737289416.506,device-1,ch8,ch8#1737289416.506,2.420
b1a1932b-ea78-4742-8f62-dcd30b92f87c,1737289416.516,device-1,ch1,ch1#1737289416.516,2.457
bbee23ee-f623-4ab4-b48a-45f9bd13cdc0,1737289416.516,device-1,ch2,ch2#1737289416.516,2.307
b80426c2-d99e-4dde-828c-2c4996def4cc,1737289416.516,device-1,ch3,ch3#1737289416.516,2.005
このデータから同様にテーブルを生成します。
$ aws dynamodb import-table --no-cli-pager \
--s3-bucket-source S3Bucket=bucket,S3KeyPrefix=test2.csv \
--input-format CSV \
--table-creation-parameters '{"TableName":"ddb-test2","KeySchema": [{"AttributeName":"id","KeyType":"HASH"},{"AttributeName":"timestamp","KeyType":"RANGE"}],\
"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"},{"AttributeName":"timestamp","AttributeType":"N"}],\
"BillingMode":"PAY_PER_REQUEST"}' \
--input-format-options '{"Csv":{"Delimiter": ","}}'
{
"ImportTableDescription": {
"ImportArn": "*",
"ImportStatus": "IN_PROGRESS",
"TableArn": "*",
"TableId": "*",
"ClientToken": "*",
"S3BucketSource": {
"S3BucketOwner": "+++++++++++++",
"S3Bucket": "bucket",
"S3KeyPrefix": "test2.csv"
},
"ErrorCount": 0,
"InputFormat": "CSV",
"InputFormatOptions": {
"Csv": {
"Delimiter": ","
}
},
"InputCompressionType": "NONE",
"TableCreationParameters": {
"TableName": "ddb-test2",
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
},
{
"AttributeName": "timestamp",
"AttributeType": "N"
}
],
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
},
{
"AttributeName": "timestamp",
"KeyType": "RANGE"
}
],
"BillingMode": "PAY_PER_REQUEST"
},
"StartTime": "2025-01-19T12:27:09.158000+00:00",
"ProcessedItemCount": 0,
"ImportedItemCount": 0
}
}
GSI追加
同様にGSIを追加します。
$ aws dynamodb update-table --no-cli-pager \
--table-name "ddb-test2" \
--attribute-definitions '[{"AttributeName":"name","AttributeType":"S"},{"AttributeName":"sk1","AttributeType":"S"}]' \
--global-secondary-index-updates '[{"Create": {"IndexName": "test-1","KeySchema": \
[{"AttributeName":"name","KeyType":"HASH"},{"AttributeName":"sk1","KeyType":"RANGE"}],\
"Projection":{"ProjectionType":"ALL"}}}]'
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
},
{
"AttributeName": "name",
"AttributeType": "S"
},
{
"AttributeName": "sk1",
"AttributeType": "S"
},
{
"AttributeName": "timestamp",
"AttributeType": "N"
}
],
"TableName": "ddb-test2-s",
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
},
{
"AttributeName": "timestamp",
"KeyType": "RANGE"
}
],
"TableStatus": "UPDATING",
"CreationDateTime": "2025-01-19T12:44:37.001000+00:00",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "*",
"TableId": "*",
"BillingModeSummary": {
"BillingMode": "PAY_PER_REQUEST",
"LastUpdateToPayPerRequestDateTime": "2025-01-19T12:46:51.490000+00:00"
},
"GlobalSecondaryIndexes": [
{
"IndexName": "test-1",
"KeySchema": [
{
"AttributeName": "name",
"KeyType": "HASH"
},
{
"AttributeName": "sk1",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "ALL"
},
"IndexStatus": "CREATING",
"Backfilling": false,
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0
},
"IndexSizeBytes": 0,
"ItemCount": 0,
"IndexArn": "*"
}
],
"DeletionProtectionEnabled": false,
"WarmThroughput": {
"ReadUnitsPerSecond": 12000,
"WriteUnitsPerSecond": 4000,
"Status": "ACTIVE"
}
}
}
これでデバイス名 + チャンネル番号を使って検索できるようになりました。
検索してみよう
$ aws dynamodb query --no-cli-pager \
--table-name "ddb-test2" \
--index-name "test-1" \
--key-condition-expression '#n = :n AND begins_with(sk1,:c)' \
--expression-attribute-name '{"#n": "name"}' \
--expression-attribute-values '{":n": {"S": "device-4"},":c": {"S": "ch3"}}' \
> result.json
得られた結果はこんなJSONになっています。
{
"Items": [
{
"value": {
"S": "2.086"
},
"sk1": {
"S": "ch3#1737291820.603"
},
"id": {
"S": "658b1f23-a54d-452f-8733-eae84ebe9676"
},
"name": {
"S": "device-4"
},
"ch": {
"S": "ch3"
},
"timestamp": {
"N": "1737291820.603"
}
},
{
"value": {
"S": "2.223"
},
"sk1": {
"S": "ch3#1737291820.613"
},
"id": {
"S": "e85fc970-47dd-49ea-aaf5-d25b0245fe0b"
},
"name": {
"S": "device-4"
},
"ch": {
"S": "ch3"
},
"timestamp": {
"N": "1737291820.613"
}
}, ......
蛇足ですが、この状態では使いづらいのでjqコマンドを使うと便利です。csv形式に変換する場合はこんな感じ;
$ jq -r '.Items[] | [.timestamp.N, .name.S, .ch.S, .value.S] | @csv' result.json
"1737291820.603","device-4","ch3","2.086"
"1737291820.613","device-4","ch3","2.223"
"1737291820.623","device-4","ch3","2.639"
"1737291820.633","device-4","ch3","1.772"
"1737291820.643","device-4","ch3","2.28"
"1737291820.653","device-4","ch3","2.175"
"1737291820.663","device-4","ch3","1.981"
"1737291820.673","device-4","ch3","2.106"
"1737291820.683","device-4","ch3","2.168"
"1737291820.693","device-4","ch3","2.782"
4万件のデータを抽出するのにかかった時間は約1.8秒でした。次に、タイムスタンプによる絞り込みを行ってみます。この例ではunix timeの1737332890.000(2025年 1月20日 月曜日 09時28分10秒 JST)からの1秒間のデータを抽出しますが、その際
- sk1が「ch3#1737332890.000」と「ch3#1737332891.000」の間
つまり「チャンネル番号#時刻」という形で検索条件を指定してやります。
$ aws dynamodb query --no-cli-pager \
--table-name "ddb-test2" \
--index-name "test-1" \
--key-condition-expression '#n = :n AND sk1 BETWEEN :c1 AND :c2' \
--expression-attribute-name '{"#n": "name"}' \
--expression-attribute-values '{":n": {"S": "device-4"},":c1": {"S": "ch3#1737332890.000"},":c2": {"S": "ch3#1737332891.000"}}' \
| jq -r '.Items[] | [.timestamp.N, .name.S, .ch.S, .value.S] | @csv'
"1737332890.006","device-4","ch3","2.414"
"1737332890.016","device-4","ch3","3.109"
"1737332890.026","device-4","ch3","3.115"
"1737332890.036","device-4","ch3","2.762"
"1737332890.046","device-4","ch3","2.923"
"1737332890.056","device-4","ch3","2.732"
...
"1737332890.996","device-4","ch3","1.204"
40件の抽出を約0.4秒で終えることができました。
この先まだまだ
ご紹介したとおり、複合ソートキーを用いることでIoTデータの検索を効率よく行うことができました。しかし、まだまだやるべきことは残っています。
特有の要件への対応
GSIを利用するメリットはまだあります。例えば測定対象がカテゴリ分けされており、チャンネルが別々のカテゴリに接続されている場合などです。特にセンサーの付け替えなどが発生し、チャンネルとカテゴリの対応が変わる場合などは有効です。
この場合、テーブルにカテゴリを追加してやります。例えば
id,timestamp,name,ch,category,value
7e302c45-9abf-45b0-8021-4f6dc0cb9a53,1737272803.776,device-1,ch1,motor,2.129
baf76549-94e4-4527-a0b2-c106002f49fa,1737272803.776,device-1,ch2,heater,1.665
4feb75c5-e353-4521-824a-6e8020c1591a,1737272803.776,device-1,ch3,motor,2.025
b787b894-80ff-4a51-8fcf-5468d576dfa5,1737272803.776,device-1,ch4,heater,2.390
04f0d5d4-1bfd-41ce-8f68-5de672517bfd,1737272803.776,device-1,ch5,heater,2.031
8e865a85-a7c8-4592-b981-0dfb7db3f7de,1737272803.776,device-1,ch6,motor,2.658
fc141054-ce22-4b6c-b178-726cb8aacf32,1737272803.776,device-1,ch7,cooler,2.458
4f780123-dda9-4bd7-ab04-78f93d3eddba,1737272803.776,device-1,ch8,motor,2.281
f0c331d3-04b8-4cd8-8854-de6add1c8687,1737272803.786,device-1,ch1,motor,2.633
58d7c167-0530-4aa1-a905-cb7a6c368c4e,1737272803.786,device-1,ch2,heater,1.743
このようにカテゴリというフィールドを作っておき、GSIと複合ソートキーを使って検索可能にしておきます。これでカテゴリごとの集計を高速に行うことができ、かつ途中でカテゴリが変わっても正しい集計が可能です。
コスト削減
GSIを利用するにはコストがかかりますが、GSIに反映される属性の数を減らすことで削減が可能です。例えばチャンネル番号とタイムスタンプを用いたGSIを作成する場合、一般的なユースケースではidやカテゴリの情報は必要ありません。GSI作成の際、これらの不要な属性を排除してやることでコスト削減が可能です。
Athenaの利用を検討する
見てきたとおり、DynamoDBを使った複雑な検索を行うには入念な準備が必要になります。場合によってはデータベースの実装が困難なケースもあるでしょう。かといって、通常のRDSを使うと運用コストが増大し、事業性に大きな影響を与えてしまいます。
そういう場合はAmazon Athenaの利用を検討してみましょう。Amazon AthenaはS3上に置かれたデータファイル(csv,jsonなど)に対し、SQLを用いた直接検索が可能になるサービスです。既にデータファイルがS3上に存在する場合には容易に試すことができますので、ぜひ一度体験してみてください。