JSON
CSV
DynamoDB
DataPipeline
Athena

AWS DynamoDB から Data Pipeline 使ってエクスポートした JSON ファイルから Athena を使って CSV で全データや集計データを s3 に出力する。

More than 1 year has passed since last update.

対象の DynamoDB テーブル

確認

aws dynamodb describe-table --table-name testtable01
{
    "Table": {
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:999999999999:table/testtable01",
        "AttributeDefinitions": [
            {
                "AttributeName": "id",
                "AttributeType": "S"
            },
            {
                "AttributeName": "timestamp",
                "AttributeType": "N"
            }
        ],
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "WriteCapacityUnits": 6,
            "LastIncreaseDateTime": 1511722689.859,
            "ReadCapacityUnits": 1412,
            "LastDecreaseDateTime": 1511724276.347
        },
        "TableSizeBytes": 713551555,
        "TableName": "testtable01",
        "TableStatus": "ACTIVE",
        "KeySchema": [
            {
                "KeyType": "HASH",
                "AttributeName": "id"
            },
            {
                "KeyType": "RANGE",
                "AttributeName": "timestamp"
            }
        ],
        "ItemCount": 10216301,
        "CreationDateTime": 1505361453.146
    }
}
aws dynamodb get-item --table-name testtable01 --key '{"id":{"S":"40148031"},"timestamp":{"N":"1506051935"}}'
{
    "Item": {
        "jst": {
            "S": "2017-09-22T12:45:34+09:00"
        },
        "timestamp": {
            "N": "1506051935"
        },
        "value": {
            "N": "871"
        },
        "id": {
            "S": "40148031"
        },
        "user": {
            "S": "40148031"
        }
    }
}

こんな感じのテーブル

id timestamp user value jst
40148031 1506051935 40148031 871 2017-09-22T12:45:34+09:00

対象のJSON

{"timestamp":{"n":"1510053720"},"user":{"s":"51260761"},"id":{"s":"51260761"},"value":{"n":"20"},"jst":{"s":"2017-07-07T20:21:48+09:00"}}
{"timestamp":{"n":"1509943350"},"user":{"s":"99531284"},"id":{"s":"99531284"},"value":{"n":"18"},"jst":{"s":"2017-07-06T13:42:14+09:00"}}
{"timestamp":{"n":"1510734948"},"user":{"s":"60704661"},"id":{"s":"60704661"},"value":{"n":"34"},"jst":{"s":"2017-07-15T17:35:32+09:00"}}
{"timestamp":{"n":"1509301681"},"user":{"s":"22945310"},"id":{"s":"22945310"},"value":{"n":"38"},"jst":{"s":"2017-07-30T03:27:43+09:00"}}
{"timestamp":{"n":"1510668309"},"user":{"s":"14038924"},"id":{"s":"14038924"},"value":{"n":"49"},"jst":{"s":"2017-07-14T23:04:57+09:00"}}

Athena でテーブル作成クエリ

create external table if not exists dynamodb.testtable01_atn (
  `timestamp` struct<n:string>,
  `user`      struct<s:string>,
  `id`        struct<s:string>,
  `value`     struct<n:string>,
  `jst`       struct<s:string> 
)
row format serde 'org.openx.data.jsonserde.JsonSerDe'
with serdeproperties (
  'serialization.format' = '1'
) location 's3://dynamodb-export-bucket/export/2017-11-24-12-18-30/'
tblproperties ('has_encrypted_data'='false')
;

全件CSV出力クエリ

select 
  timestamp.n as timestamp,
  user.s      as user,
  id.s        as id,
  value.n     as value,
  jst.s       as jst
from 
  testtable01_atn
order by
  jst,
  user
;

日付(文字列)毎にカウントした値をCSVで出力クエリ

select 
  SUBSTR(jst.s, 1, 10) as jst,
  count(*)             as count
from
  testtable01_atn
group by
  1
order by
  jst
;