0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DynamoDBでIoTデータを検索する手法について検討してみた

Posted at

前置き

AWSの中でも特に有名なサービスのひとつとしてDynamoDBがあります。手軽に使えるNoSQLデータベースとして中心的な存在ですが、一般的なRDBとは仕様が大きく異なっているため、活用には敷居が高いという話もよく聞きます。

IoTデータとDynamoDB

一例としてこのようなIoTシステムのデータを考えてみましょう。

システムには8つのデバイスがあり、それぞれの装置が8つのチャンネルを持っています。チャンネルごとに測定値が時系列データとして保存されており、用途に応じてこれらからデータを検索してくるというのがミッションです。

DynamoDBでは、検索(Query)に利用できるキーとして

  • プライマリキー
  • ソートキー

の2つを用意することができる、という仕様があります。言い換えると、一般的なDBとは違って自由なキーで検索することは(データベースの機能だけでは)できません。ですから、事前にどのような検索を行いたいかを検討し、これらのキーを上手に設定してやることが肝心です。

IoTデータによくある検索の例としてはこのようなものがあるでしょう。

  • デバイス名、チャンネル番号を指定して、ある時間帯のデータを出力したい

このような検索をかんたんに行うためのキー設計を考えると、ちょっと困ったことになってしまいます。

  • デバイス名で検索したいので、プライマリキーをデバイス名に設定
  • 時間を範囲指定したいので、測定時間をソートキーに設定

と、ここまでは良いのですが、これではチャンネル番号で検索することができなくなります。

  • デバイス名で検索したいので、プライマリキーをデバイス名に設定
  • チャンネル番号でも検索したいので、ソートキーをチャンネル番号に設定

こういうふうに考えるかもしれませんが、DynamoDBには

  • プライマリキー + ソートキーの組み合わせがユニークでなければならない

という強い制約があるので、そもそもデータを格納することができなくなります。同一のデバイス名、同一のチャンネル番号を持つデータは時間を追って大量に増えていきますから、ユニークどころの騒ぎではありません。

更に困ったことに、ひとつのデバイスが複数の測定チャンネルを持つような構成で、かつ全てのチャンネル測定が同時に行われる(正確には、全てのチャンネルのタイムスタンプが同一になる)場合には、上記のような作戦はいっさい使えなくなります。というのは、同じデバイス名、同じタイムスタンプのデータがチャンネルの数だけ同時に存在することになるため、DynamoDBの制約が満たせなくなるからです。

さてどうしようか…というのが本記事の内容になります。

まずは基礎から

DynamoDBテーブル作成

まずは、本件を検討するためのデータベースを作ってみましょう。まずはこのようなcsvファイルを用意してみます。これは8つのデバイスのそれぞれ8つのチャンネルについて10msごとのデータを持つ、2,560,000行、214MBのcsvファイルです。

csv test.csv
id,timestamp,name,ch,value
7e302c45-9abf-45b0-8021-4f6dc0cb9a53,1737272803.776,device-1,ch1,2.129
baf76549-94e4-4527-a0b2-c106002f49fa,1737272803.776,device-1,ch2,1.665
4feb75c5-e353-4521-824a-6e8020c1591a,1737272803.776,device-1,ch3,2.025
b787b894-80ff-4a51-8fcf-5468d576dfa5,1737272803.776,device-1,ch4,2.390
04f0d5d4-1bfd-41ce-8f68-5de672517bfd,1737272803.776,device-1,ch5,2.031
8e865a85-a7c8-4592-b981-0dfb7db3f7de,1737272803.776,device-1,ch6,2.658
fc141054-ce22-4b6c-b178-726cb8aacf32,1737272803.776,device-1,ch7,2.458
4f780123-dda9-4bd7-ab04-78f93d3eddba,1737272803.776,device-1,ch8,2.281
f0c331d3-04b8-4cd8-8854-de6add1c8687,1737272803.786,device-1,ch1,2.633
58d7c167-0530-4aa1-a905-cb7a6c368c4e,1737272803.786,device-1,ch2,1.743

本来のデータに加え、固有のid(uuid4)が追加されているのが分かります。これをパーティションキーとして利用することで、DynamoDBの制約であるキーのユニーク性を保証することができるようになります。

これをS3にアップロードした後、DynamoDBにインポートしてやります。パーティションキーはid、ソートキーはタイムスタンプです。

$ aws dynamodb import-table --s3-bucket-source S3Bucket=bucket,S3KeyPrefix=test.csv \
--input-format CSV \
--table-creation-parameters '{"TableName":"ddb-test",\
"KeySchema": [{"AttributeName":"id","KeyType":"HASH"}, {"AttributeName":"timestamp","KeyType":"RANGE"}],\
"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"},{"AttributeName":"timestamp","AttributeType":"N"}],\
"BillingMode":"PAY_PER_REQUEST"}' \
--input-format-options '{"Csv":{"Delimiter": ","}}'
{
    "ImportTableDescription": {
        "ImportArn": "*",
        "ImportStatus": "IN_PROGRESS",
        "TableArn": "*",
        "TableId": "*",
        "ClientToken": "*",
        "S3BucketSource": {
            "S3BucketOwner": "++++++++++",
            "S3Bucket": "bucket",
            "S3KeyPrefix": "test_s.csv"
        },
        "ErrorCount": 0,
        "InputFormat": "CSV",
        "InputFormatOptions": {
            "Csv": {
                "Delimiter": ","
            }
        },
        "InputCompressionType": "NONE",
        "TableCreationParameters": {
            "TableName": "ddb-test-s",
            "AttributeDefinitions": [
                {
                    "AttributeName": "id",
                    "AttributeType": "S"
                },
                {
                    "AttributeName": "timestamp",
                    "AttributeType": "N"
                }
            ],
            "KeySchema": [
                {
                    "AttributeName": "id",
                    "KeyType": "HASH"
                },
                {
                    "AttributeName": "timestamp",
                    "KeyType": "RANGE"
                }
            ],
            "BillingMode": "PAY_PER_REQUEST"
        },
        "StartTime": "2025-01-19T07:31:16.721000+00:00",
        "ProcessedItemCount": 0,
        "ImportedItemCount": 0
    }
}

こんな感じのテーブルが生成されました。ただし結構な時間が(10分ほど)かかりますのでのんびりお待ちください。

中身はこういう感じです。

素朴に検索してみよう

さて準備ができたので、まずは素朴に検索してみましょう。デバイス名「device-4」のチャンネル番号3のデータを抽出してみます。

前述の通り、DynamoDBでは事前にキーとして指定したものしか検索に用いることができません。なので、素朴な方法では

  • 全件を取得
  • 受け取った側で抽出

という流れで処理を行うことになります。この方法のメリットとして

  • 設計が単純で、開発工数が少ない

反面、デメリットとして

  • 検索効率が極めて悪い

という点が挙げられます。検索ごとに全件スキャンが発生するため、処理時間、コストのいずれも非常に効率が悪くなっています。なので

  • 開発初期のテスト段階
  • 検索頻度が低く、効率が悪くても特に問題が無い場合

などは、素直にこの方法を採用することができるでしょう。ちなみに今回のデータでは件数が多すぎるため、素朴な方法では実用的に検索することが困難でした。

ちょっと工夫してみよう

GSIの活用

DynamoDBにはGSI(Global Secondary Index, グローバルセカンダリインデックス)という仕組みが用意されています。これは元々のテーブルとは別に、異なったインデックスを持つ仮想的なテーブルを作成することができるというものです。今回のケースではデバイス名をキーに含めたGSIを作成することで、元々のテーブルでは行うことができない「デバイス名による検索」ができるようにします。

$ aws dynamodb update-table --no-cli-pager \
--table-name "ddb-test" \
--attribute-definitions '[{"AttributeName":"name","AttributeType":"S"},{"AttributeName":"timestamp","AttributeType":"N"}]' \
--global-secondary-index-updates '[{"Create": {"IndexName": "test-1","KeySchema": [{"AttributeName":"name","KeyType":"HASH"},{"AttributeName":"timestamp","KeyType":"RANGE"}],\
"Projection":{"ProjectionType":"ALL"}}}]'
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "id",
                "AttributeType": "S"
            },
            {
                "AttributeName": "name",
                "AttributeType": "S"
            },
            {
                "AttributeName": "timestamp",
                "AttributeType": "N"
            }
        ],
        "TableName": "ddb-test-s",
        "KeySchema": [
            {
                "AttributeName": "id",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "timestamp",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "UPDATING",
        "CreationDateTime": "2025-01-19T07:58:57.298000+00:00",
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 0,
            "WriteCapacityUnits": 0
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "*",
        "TableId": "*",
        "BillingModeSummary": {
            "BillingMode": "PAY_PER_REQUEST",
            "LastUpdateToPayPerRequestDateTime": "2025-01-19T08:01:33.795000+00:00"
        },
        "GlobalSecondaryIndexes": [
            {
                "IndexName": "test-1",
                "KeySchema": [
                    {
                        "AttributeName": "name",
                        "KeyType": "HASH"
                    },
                    {
                        "AttributeName": "timestamp",
                        "KeyType": "RANGE"
                    }
                ],
                "Projection": {
                    "ProjectionType": "ALL"
                },
                "IndexStatus": "CREATING",
                "Backfilling": false,
                "ProvisionedThroughput": {
                    "NumberOfDecreasesToday": 0,
                    "ReadCapacityUnits": 0,
                    "WriteCapacityUnits": 0
                },
                "IndexSizeBytes": 0,
                "ItemCount": 0,
                "IndexArn": "*"
            }
        ],
        "DeletionProtectionEnabled": false,
        "WarmThroughput": {
            "ReadUnitsPerSecond": 12000,
            "WriteUnitsPerSecond": 4000,
            "Status": "ACTIVE"
        }
    }
}

これまた時間がかかりますがゆっくりと待ちましょう。GSIを作成したことにより、「デバイス名による検索」、即ちデバイス名をキーとしたQueryができるようになって検索効率がぐっと向上します。

ただ、この場合でもチャンネル番号による絞り込みはできないため、デバイス名での検索結果を全件取得した後にアプリケーション側でチャンネル番号による抽出を行う必要があります。素朴な方法ではちょっとまずい、という場合には有効ですが、まだまだ工夫の余地がありそうです。

もう一工夫してみよう

複合ソートキー

DynamoDBでは複合キーという機能があり、複数のキーからなる仮想のソートキーを設定してやることができます。この機能をGSIといっしょに使うことで様々な検索を行うことが可能です。

再びテーブル作成

テーブル作成時に用いる元データ(csvファイル)に、複合キーとして用いるためのデータを追加してやりましょう。具体的には

  • [チャンネル番号]#[タイムスタンプ]

という列を追加してやります。こんな感じのデータです。sk1という列が追加されており、チャンネル番号とタイムスタンプが#を挟んで連結された形になっています。

csv test2.csv
id,timestamp,name,ch,sk1,value
f8f781e2-929d-416c-9fa8-3286399a67de,1737289416.506,device-1,ch1,ch1#1737289416.506,1.592
7004036c-2ab8-459f-80f1-6b52c08ed3a7,1737289416.506,device-1,ch2,ch2#1737289416.506,1.696
b12cc187-17b3-4cd7-be28-4dfb085bf839,1737289416.506,device-1,ch3,ch3#1737289416.506,2.364
f989e86b-de5c-4915-bb6a-34a8a1bd9694,1737289416.506,device-1,ch4,ch4#1737289416.506,1.771
9845bbab-14b4-4d61-a064-03d2fd1e00f2,1737289416.506,device-1,ch5,ch5#1737289416.506,1.931
06ca4eae-243d-4b25-be5e-f591bfd52523,1737289416.506,device-1,ch6,ch6#1737289416.506,2.228
925ad7bb-bb97-4a62-a1c7-4a77471882e6,1737289416.506,device-1,ch7,ch7#1737289416.506,2.450
90befb22-33f8-4b91-8b11-fab9b5ade783,1737289416.506,device-1,ch8,ch8#1737289416.506,2.420
b1a1932b-ea78-4742-8f62-dcd30b92f87c,1737289416.516,device-1,ch1,ch1#1737289416.516,2.457
bbee23ee-f623-4ab4-b48a-45f9bd13cdc0,1737289416.516,device-1,ch2,ch2#1737289416.516,2.307
b80426c2-d99e-4dde-828c-2c4996def4cc,1737289416.516,device-1,ch3,ch3#1737289416.516,2.005

このデータから同様にテーブルを生成します。

$ aws dynamodb import-table --no-cli-pager \
--s3-bucket-source S3Bucket=bucket,S3KeyPrefix=test2.csv \
--input-format CSV \
--table-creation-parameters '{"TableName":"ddb-test2","KeySchema": [{"AttributeName":"id","KeyType":"HASH"},{"AttributeName":"timestamp","KeyType":"RANGE"}],\
"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"},{"AttributeName":"timestamp","AttributeType":"N"}],\
"BillingMode":"PAY_PER_REQUEST"}' \
--input-format-options '{"Csv":{"Delimiter": ","}}'
{
    "ImportTableDescription": {
        "ImportArn": "*",
        "ImportStatus": "IN_PROGRESS",
        "TableArn": "*",
        "TableId": "*",
        "ClientToken": "*",
        "S3BucketSource": {
            "S3BucketOwner": "+++++++++++++",
            "S3Bucket": "bucket",
            "S3KeyPrefix": "test2.csv"
        },
        "ErrorCount": 0,
        "InputFormat": "CSV",
        "InputFormatOptions": {
            "Csv": {
                "Delimiter": ","
            }
        },
        "InputCompressionType": "NONE",
        "TableCreationParameters": {
            "TableName": "ddb-test2",
            "AttributeDefinitions": [
                {
                    "AttributeName": "id",
                    "AttributeType": "S"
                },
                {
                    "AttributeName": "timestamp",
                    "AttributeType": "N"
                }
            ],
            "KeySchema": [
                {
                    "AttributeName": "id",
                    "KeyType": "HASH"
                },
                {
                    "AttributeName": "timestamp",
                    "KeyType": "RANGE"
                }
            ],
            "BillingMode": "PAY_PER_REQUEST"
        },
        "StartTime": "2025-01-19T12:27:09.158000+00:00",
        "ProcessedItemCount": 0,
        "ImportedItemCount": 0
    }
}

GSI追加

同様にGSIを追加します。

$ aws dynamodb update-table --no-cli-pager \
--table-name "ddb-test2" \
--attribute-definitions '[{"AttributeName":"name","AttributeType":"S"},{"AttributeName":"sk1","AttributeType":"S"}]' \
--global-secondary-index-updates '[{"Create": {"IndexName": "test-1","KeySchema": \
[{"AttributeName":"name","KeyType":"HASH"},{"AttributeName":"sk1","KeyType":"RANGE"}],\
"Projection":{"ProjectionType":"ALL"}}}]'
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "id",
                "AttributeType": "S"
            },
            {
                "AttributeName": "name",
                "AttributeType": "S"
            },
            {
                "AttributeName": "sk1",
                "AttributeType": "S"
            },
            {
                "AttributeName": "timestamp",
                "AttributeType": "N"
            }
        ],
        "TableName": "ddb-test2-s",
        "KeySchema": [
            {
                "AttributeName": "id",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "timestamp",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "UPDATING",
        "CreationDateTime": "2025-01-19T12:44:37.001000+00:00",
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 0,
            "WriteCapacityUnits": 0
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "*",
        "TableId": "*",
        "BillingModeSummary": {
            "BillingMode": "PAY_PER_REQUEST",
            "LastUpdateToPayPerRequestDateTime": "2025-01-19T12:46:51.490000+00:00"
        },
        "GlobalSecondaryIndexes": [
            {
                "IndexName": "test-1",
                "KeySchema": [
                    {
                        "AttributeName": "name",
                        "KeyType": "HASH"
                    },
                    {
                        "AttributeName": "sk1",
                        "KeyType": "RANGE"
                    }
                ],
                "Projection": {
                    "ProjectionType": "ALL"
                },
                "IndexStatus": "CREATING",
                "Backfilling": false,
                "ProvisionedThroughput": {
                    "NumberOfDecreasesToday": 0,
                    "ReadCapacityUnits": 0,
                    "WriteCapacityUnits": 0
                },
                "IndexSizeBytes": 0,
                "ItemCount": 0,
                "IndexArn": "*"
            }
        ],
        "DeletionProtectionEnabled": false,
        "WarmThroughput": {
            "ReadUnitsPerSecond": 12000,
            "WriteUnitsPerSecond": 4000,
            "Status": "ACTIVE"
        }
    }
}

これでデバイス名 + チャンネル番号を使って検索できるようになりました。

検索してみよう

$ aws dynamodb query --no-cli-pager \
--table-name "ddb-test2" \
--index-name "test-1" \
--key-condition-expression '#n = :n AND begins_with(sk1,:c)' \
--expression-attribute-name '{"#n": "name"}' \
--expression-attribute-values '{":n": {"S": "device-4"},":c": {"S": "ch3"}}' \
> result.json

得られた結果はこんなJSONになっています。

json result.json
{
    "Items": [
        {
            "value": {
                "S": "2.086"
            },
            "sk1": {
                "S": "ch3#1737291820.603"
            },
            "id": {
                "S": "658b1f23-a54d-452f-8733-eae84ebe9676"
            },
            "name": {
                "S": "device-4"
            },
            "ch": {
                "S": "ch3"
            },
            "timestamp": {
                "N": "1737291820.603"
            }
        },
        {
            "value": {
                "S": "2.223"
            },
            "sk1": {
                "S": "ch3#1737291820.613"
            },
            "id": {
                "S": "e85fc970-47dd-49ea-aaf5-d25b0245fe0b"
            },
            "name": {
                "S": "device-4"
            },
            "ch": {
                "S": "ch3"
            },
            "timestamp": {
                "N": "1737291820.613"
            }
        }, ......

蛇足ですが、この状態では使いづらいのでjqコマンドを使うと便利です。csv形式に変換する場合はこんな感じ;

$ jq -r '.Items[] | [.timestamp.N, .name.S, .ch.S, .value.S] | @csv' result.json
"1737291820.603","device-4","ch3","2.086"
"1737291820.613","device-4","ch3","2.223"
"1737291820.623","device-4","ch3","2.639"
"1737291820.633","device-4","ch3","1.772"
"1737291820.643","device-4","ch3","2.28"
"1737291820.653","device-4","ch3","2.175"
"1737291820.663","device-4","ch3","1.981"
"1737291820.673","device-4","ch3","2.106"
"1737291820.683","device-4","ch3","2.168"
"1737291820.693","device-4","ch3","2.782"

4万件のデータを抽出するのにかかった時間は約1.8秒でした。次に、タイムスタンプによる絞り込みを行ってみます。この例ではunix timeの1737332890.000(2025年 1月20日 月曜日 09時28分10秒 JST)からの1秒間のデータを抽出しますが、その際

  • sk1が「ch3#1737332890.000」と「ch3#1737332891.000」の間

つまり「チャンネル番号#時刻」という形で検索条件を指定してやります。

$ aws dynamodb query --no-cli-pager \
--table-name "ddb-test2" \
--index-name "test-1" \
--key-condition-expression '#n = :n AND sk1 BETWEEN :c1 AND :c2' \
--expression-attribute-name '{"#n": "name"}' \
--expression-attribute-values '{":n": {"S": "device-4"},":c1": {"S": "ch3#1737332890.000"},":c2": {"S": "ch3#1737332891.000"}}' \
| jq -r '.Items[] | [.timestamp.N, .name.S, .ch.S, .value.S] | @csv'
"1737332890.006","device-4","ch3","2.414"
"1737332890.016","device-4","ch3","3.109"
"1737332890.026","device-4","ch3","3.115"
"1737332890.036","device-4","ch3","2.762"
"1737332890.046","device-4","ch3","2.923"
"1737332890.056","device-4","ch3","2.732"
...
"1737332890.996","device-4","ch3","1.204"

40件の抽出を約0.4秒で終えることができました。

この先まだまだ

ご紹介したとおり、複合ソートキーを用いることでIoTデータの検索を効率よく行うことができました。しかし、まだまだやるべきことは残っています。

特有の要件への対応

GSIを利用するメリットはまだあります。例えば測定対象がカテゴリ分けされており、チャンネルが別々のカテゴリに接続されている場合などです。特にセンサーの付け替えなどが発生し、チャンネルとカテゴリの対応が変わる場合などは有効です。

この場合、テーブルにカテゴリを追加してやります。例えば

id,timestamp,name,ch,category,value
7e302c45-9abf-45b0-8021-4f6dc0cb9a53,1737272803.776,device-1,ch1,motor,2.129
baf76549-94e4-4527-a0b2-c106002f49fa,1737272803.776,device-1,ch2,heater,1.665
4feb75c5-e353-4521-824a-6e8020c1591a,1737272803.776,device-1,ch3,motor,2.025
b787b894-80ff-4a51-8fcf-5468d576dfa5,1737272803.776,device-1,ch4,heater,2.390
04f0d5d4-1bfd-41ce-8f68-5de672517bfd,1737272803.776,device-1,ch5,heater,2.031
8e865a85-a7c8-4592-b981-0dfb7db3f7de,1737272803.776,device-1,ch6,motor,2.658
fc141054-ce22-4b6c-b178-726cb8aacf32,1737272803.776,device-1,ch7,cooler,2.458
4f780123-dda9-4bd7-ab04-78f93d3eddba,1737272803.776,device-1,ch8,motor,2.281
f0c331d3-04b8-4cd8-8854-de6add1c8687,1737272803.786,device-1,ch1,motor,2.633
58d7c167-0530-4aa1-a905-cb7a6c368c4e,1737272803.786,device-1,ch2,heater,1.743

このようにカテゴリというフィールドを作っておき、GSIと複合ソートキーを使って検索可能にしておきます。これでカテゴリごとの集計を高速に行うことができ、かつ途中でカテゴリが変わっても正しい集計が可能です。

コスト削減

GSIを利用するにはコストがかかりますが、GSIに反映される属性の数を減らすことで削減が可能です。例えばチャンネル番号とタイムスタンプを用いたGSIを作成する場合、一般的なユースケースではidやカテゴリの情報は必要ありません。GSI作成の際、これらの不要な属性を排除してやることでコスト削減が可能です。

Athenaの利用を検討する

見てきたとおり、DynamoDBを使った複雑な検索を行うには入念な準備が必要になります。場合によってはデータベースの実装が困難なケースもあるでしょう。かといって、通常のRDSを使うと運用コストが増大し、事業性に大きな影響を与えてしまいます。

そういう場合はAmazon Athenaの利用を検討してみましょう。Amazon AthenaはS3上に置かれたデータファイル(csv,jsonなど)に対し、SQLを用いた直接検索が可能になるサービスです。既にデータファイルがS3上に存在する場合には容易に試すことができますので、ぜひ一度体験してみてください。

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?