LoginSignup
36
39

More than 5 years have passed since last update.

大規模データを確実にMySQL/Redshiftに入れる—Treasure Dataのクエリ結果書き出し機能の実装

Last updated at Posted at 2014-12-14

こんにちは。古橋です。今日はいつものはてなブログから趣向を変えて、QiitaでTDアドベントカレンダー14日目の投稿です。

Hiveのクエリ結果をRDBに書き出したい

MapReduceはメモリに収まりきらないデータをJOINしたり集計したりできる信頼性の高いアーキテクチャですが、どうしても1発のクエリを実行するのに時間がかかるので、人間がいじりながら使う可視化ツールに直接繋ぎ込むには向いていません。

そこで Prestoを使って集計する 方法もありますが、やはりMapReduceの方が向いているケースもあります。例えば、

  • Webサイトに一度は来てくれたのに、その後1週間アクセスのない人が、最後に見ていったページはどこだろう?
  • 過去にアイテムAを買った人が良く買っている別のアイテムは何だろう? (バスケット分析のクエリ例

といった、巨大テーブル同士のJOINや自己結合が必要なケースは、やはりMapReduce + Hiveの出番ですね。バッチ集計は時間がかかっても良いので、その結果はMySQLやRedshiftに保存しておこう、という使い方です。

そこで気になるのは、MapReduce + Hiveでバッチ解析した結果をMySQLやRedshiftなどに書き出しておいて、それらをインタラクティブな可視化ツールに繋ぎ込むことはできないだろうか? という方法です。そう、Treasure Dataなら簡単にできてしまいます…!

ココを押して、書き出し先DBの情報を入れるだけ:
クエリ結果の書き出し

MySQL & PostgreSQL への追記

さて、ときに数GBを超える集計結果を、MySQLやRedshiftに確実に投入するにはどうしたら良いか。

Treasure Dataのクエリの結果をMySQLやPostgreSQLに書き出すとき、いくつかのモードを選べます。そのうちappendモードでは、データをテーブルの末尾に追記します。ここで問題になるのは、書き込み中にエラーが起きたらどうなるのか? ということです。

一番困るのは、中途半端なデータが書き込まれて、一部のデータがダブってしまうケースです。特に大きなクエリ結果を書き込む場合、データの転送途中でエラーが発生することはあり得る問題です。

そこでappendモードでは、まず一時的なテーブルを作り、そこに少しずつデータを書き込んでいきます。すべてのデータの転送が完了したら、最後に一気に目的のテーブルに書き込みます:

appendモード

最後のINSERTはトランザクショナルに行われるので、中途半端なデータが書き込まれたままになることはありません。また、一時テーブルにはインデックスを作成しません。このため高速にロードすることができます。

あわせて読みたい:Something awesome in InnoDB -- the insert buffer

冪等な更新 — updateモード

データの追記は冪等な処理ではないので、どうしてもリトライ可能な処理ではありません。→リトライと冪等性のデザインパターン - Blog by Sadayuki Furuhashi
一方、PRIMARY KEY が重複したら上書きする update モードは、何度でもリトライできる信頼性の高い方法です。

updateモードでも、一時テーブルにデータをロードするまでは同じです。最後にINSERTする代わりに REPLACE INTO文を実行することで、重複した行を上書きします:

updateモード

PostgreSQLではREPLACE文もMERGE文もサポートされていないので、PRIMARY KEYやUNIQUE KEYを指定する代わりにキー名を書き出し時にオプションで指定する必要がありますが、INSERTとUPDATEを発行することで同じ挙動を再現しています。

Redshiftの場合

さて、書き出し先がRedshiftの場合はどうするか?

Redshiftへ大規模データを高速かつ確実に書き出すには AWS Blog (Best Practices for Micro-Batch Loading on Amazon Redshift) で解説されているように、結構骨が折れる最適化が必要でした:

Treasure Dataでクエリを実行して結果をRedshiftへ書き出す

並列アップロード & COPY

まず、Treasure DataのサーバはUS East (N.Virginia)リージョンにあります。書き出し先のRedshiftがAsia Pacific (Tokyo)リージョンにあれば、Asia PacificリージョンのS3 bucketにデータをアップロードしていきます。
アップロードは小さなチャンクに区切って行われ、1チャンクのアップロードが完了次第、RedshiftへCOPYを始めます。この各チャンクのアップロードやCOPYは並列して実行されます。

ここで問題になるのは、S3へのデータ転送には無視できない確率でエラーが起きることです。そこで、このアップロードはリトライを実装してあり、確実にデータ転送されるまで2度、3度とリトライします。

長い文字列

また、Redshiftではtext型は自動的に varchar(256) に変換され、256より長い文字列を弾くようになっています。また、COPYするときに1行でも varchar(256) に収まらない文字列があると、COPY全体が失敗してしまいます。これは結構ハマりました。
Treasure Dataから結果を書き込むときは、まず文字列型に明示的に varchar(65535) を使うことで、長い文字列でも保存できるようにしています。また長すぎる文字列は長さを切り詰めることで、COPY全体が失敗してしまわないように防いでいます。

STATUPDATE OFF

さらに、デフォルトのRedshiftはCOPYが走るたびに統計情報の再計算を行って、データを再度圧縮したりします。これはかなり大きなオーバーヘッドで、毎回この処理が走ると通常のSELECT文にまで影響が出てしまいます。
そこでTreasure Dataからの書き出しでは、STATUPDATE や COMPUPDATE を無効化して高速にバルクロードし、統計情報の再計算が走るタイミングを自由にコントロールできるようにしています。

まとめ

Treasure Dataのクエリ結果をデータベースに書き出す機能の実装方法について紹介しました。
ここでは触れませんでしたが、HiveだけでなくPrestoのクエリ結果を書き出したり、書き出し先にもFTPサーバやTableau Server、Google Spreadsheet、Amazon S3 などを選ぶこともできます。

Treasure Dataでは、データベースや分散処理に興味があるエンジニアを募集中ですよ。

36
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
39