AWS
DynamoDB
EMR

Data Pipelineを使ってS3にDUMPしたDynamoDBテーブルをEMRを使って処理する方法

相当ニッチなことを書きます。

DynamoDBのデータをエクスポートするとData Pipelineが起動してS3へ1行1Jsonのデータが出力されます

sample
{"id":"{"s":"00001"},"name":{"s":"鷺沢 文香"}}

みたいなのです。

通常は、Data Pipelineを使用して書き戻すので特に何か考える必要は無いのですが、コレを敢えてEMRを使用して書き戻す方法を書きます。
具体的には、Data Pipelineで何がされているかと言った流れです。

Step1: JSONデータの取り込み

まず、HiveのテーブルにS3に保存されたJSONデータを取り込みます。

取り込み(マッピング)
CREATE EXTERNAL TABLE dynamodb_json(
 id    STRUCT<s:STRING>,
 name  STRUCT<s:STRING>
)
ROW FORMAT
SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://<path to dynamodb data>';

このように外部テーブルをマッピングします。
この状態でqueryを発行すると・・・

query
SELECT id,name FROM dynamodb_json LIMIT 1;
{"s":"00001"}   {"s":"鷺沢 文香"}

こんな感じでマッピングされます。
値だけをちゃんと出力するには、

query
SELECT 
    id.s,
    name.s 
FROM dynamodb_json LIMIT 1;

という風にオブジェクトにアクセスすることで必要な値を取得することができます。
S3に書き出されたデータをHiveに取り込むことができました。

Step2: DynamoDBテーブルとHDFSを紐づける

DynamoDBTable: my_idol
HiveTavle: my_idol

テーブルのマッピング
CREATE EXTERNAL TABLE IF NOT EXISTS my_idol (
  id              STRING,
  name            STRING
  )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES (
"dynamodb.table.name" = "my_idol",
"dynamodb.column.mapping" = "id:id,name:name"
);

これで、EMRからqueryを発行することでDynamoDBに問い合わせを行うことが出来るようになりました。

sample
SELECT name FROM my_idol WHERE id='00001';
結果
鷺沢 文香

同名で作成したのでわかりにくいかもしれませんが、これはHiveの腹持ちテーブルに問い合わせを行うことで(そのテーブルの実態がDynamoDBなので)データを取得しています。

Step3 DynamoDBの更新

S3からインポートしたデータを使ってDynamoDBのテーブルに更新をかけます

一括更新
SET hive.execution.engine=mr;
SET dynamodb.throughput.write.percent=1.0;

INSERT OVERWRITE TABLE my_idol
SELECT
  id.s,
  name.s
FROM dynamodb_json;

emrをデフォルトで起動するとtezが動くのですが、色々試した結果DynamoDBへの書き込みを行うときはmrのほうが体感で早かったのでこっちを指定しています。

あとは終わるまで待つだけ。