embulk-output-commandからgsutilコマンドにデータを渡してストリーミングアップロードしてディスクレスにしてみた話。
embulkってなにって方はこちらから。
追記
2020-09-13
この記事の gsutil と embulk-output-command 行っていたストリーミングデータ転送処理をEmbulkプラグインで書きました。
https://github.com/irotoris/embulk-output-gcs_streaming
なんでやったの
EmbulkでMySQLやOracleからデータをBigQueryにデータ連携する際、データレイクとしてひとまずGoogle Cloud Storage(GCS)にファイルをアップロードしています。
最初はembulk-output-gcsを使ってたんですけど、1TBのテーブルとか出てきてマシンのストレージが枯渇しちゃったんですよね。
そこで、embulk-output-commandからgsutilコマンドにデータを渡して、ストリーミングアップロードしてディスクレスにしてみました。
GCSのストリーミングアップロードに関してはこちら。
https://cloud.google.com/storage/docs/streaming
どうやったの
環境
- ubuntu:bionic
- openjdk-8-jdk
- Embulk v0.9.17
- embulk-input-mysql v0.10.1
- embulk-output-gcs v0.4.4
- embulk-output-command v0.1.4
- gsutil 4.47
embulkはGKEでコンテナ実行
Before (embulk-outout-gcs)
embulk-output-gcsを使うとこんな感じ。
ローカルの/tmpにファイルがに書き込まれて、GCSアップロード後に削除されているのが分かります。
in:
type: mysql
host: localhost
port: 3306
database: test
user: mysql_user
password: xxxxxxx
query: select * from test_table
out:
type: gcs
auth_method: compute_engine
bucket: bucket_name
path_prefix: mysql/test/test_table
file_ext: .csv
formatter:
type: csv
...
2020-02-15 13:11:17.756 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2020-02-15 13:11:17.790 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2020-02-15 13:11:19.186 +0000 [INFO] (0016:task-0000): Fetch size is 10000. Using server-side prepared statement.
2020-02-15 13:11:19.187 +0000 [INFO] (0016:task-0000): Connecting to jdbc:mysql://localhost:3306/test options {useCompression=true, socketTimeout=1800000, useSSL=false, user=admin, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2020-02-15 13:11:19.289 +0000 [INFO] (0016:task-0000): SQL: select * from test
2020-02-15 13:11:19.311 +0000 [INFO] (0016:task-0000): > 0.01 seconds
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Local Hash(MD5): XTIT3ANlo6QqZnwNBdzGCA== / Remote Hash(MD5): XTIT3ANlo6QqZnwNBdzGCA==
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Delete generated file: /tmp/embulk/2020-02-15 13-11-16.996 UTC/0016_task-0000_5418765600230876857.tmp > true
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Uploaded 'bucket_name/mysql/test/test_table.000.01.csv' to 930bytes
...
After (embulk-output-command + gsutil)
embulk-output-commandからgsutilでストリーミングアップロードするとこんな感じ。
inputの設定は同じなので省略してます。
gsutilが標準入力からデータを受け取ってアップロードしているログが出てます。
out:
command: "gsutil cp - gs://bucket_name/mysql/test/test_table/data.$INDEX.$SEQID.csv"
formatter:
type: csv
type: command
...
2020-02-15 13:09:59.514 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2020-02-15 13:09:59.543 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2020-02-15 13:09:59.559 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.634 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.638 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.643 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.700 +0000 [INFO] (0016:task-0000): Fetch size is 10000. Using server-side prepared statement.
2020-02-15 13:09:59.701 +0000 [INFO] (0016:task-0000): Connecting to jdbc:mysql://localhost:3306/test options {useCompression=true, socketTimeout=1800000, useSSL=false, user=admin, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2020-02-15 13:09:59.814 +0000 [INFO] (0016:task-0000): SQL: select * from test_table
2020-02-15 13:09:59.838 +0000 [INFO] (0016:task-0000): > 0.01 seconds
Copying from <STDIN>...
Copying from <STDIN>... 0.0 B]
Copying from <STDIN>... 0.0 B]
Copying from <STDIN>... 0.0 B]
/ [1 files][ 0.0 B/ 0.0 B]
Operation completed over 1 objects.
/ [1 files][ 0.0 B/ 0.0 B]
Operation completed over 1 objects.
/ [1 files][ 0.0 B/ 0.0 B]
Operation completed over 1 objects.
/ [1 files][ 0.0 B/ 0.0 B]
Operation completed over 1 objects.
...
Pros / Cons
Pros
- ディスクレスでembulkでクラウドストレージにアップロードできる
- SSDって値段高いよね。
- コンテナ on k8sで動かそうとした日にはPersistent Volumeがでてきて管理が面倒です。
- ステートレス最高。
Cons
- gsutilのストリーミングアップロードはチェックサム計算をしてくれない
- なので自分でデータの検証を書く必要がります。もはやローカルにファイル持ってないので、チェックサムによる検証はできません。いまはBigQueryにロードできるかどうか(CSVファイルとして正しいか)と、MySQLとBigQueryのレコード数が一致しているかを見ています。
性能
OracleからGCSに入れる処理で比較。
- ネットワーク経路:
Oracle---(DedicatedIntterConnect)---GKE--(PrivateAccess)---GCS
- コンピュート: 8 vCPU / 7.5GB Mem / 50GB SSD
- データ量: 2,000,000レコード / 7ファイル / 5.2GB / CSV
処理 | 時間 | コマンド |
---|---|---|
embulk-output-gcs | 6m18.374s | embulk run job.yaml |
embulk-output-command + gsutil cp(streaming) | 3m5.471s | embulk run job.yaml |
embulk-output-file + gsutil cp(bulk) | 3m15.211s | embulk run job.yaml && gsutil -m cp -r /tmp/embulk gs://bucket_name/tmp/embulk |
embulk-output-gcsは内部で使ってるGCSのAPIバージョンが古いので、結構遅かった。
なので、embulkでファイル化したあとに普通にgsutil cp
(bulk upload)した処理も計測。
コンピュートリソースやネットワーク構成、input側も影響するので一概には言えませんが、5.2GBくらいだったらストリーミングアップロードにしても問題なさそうでした。
ちなみに2TBのテーブルでやったときは50時間ほどかかったので、時間は線形増加にはならなさそう。
メモ
embulk-output-commandは、ScatterExecutorの恩恵を受ける。
outputタスク数(デフォルトだとCPUコア数、設定で指定可能)の分だけ並列に処理してくれる。
Embulk の LocalExecutor プラグインの振る舞いについて整理
あときっとembulk-output-s3もawscli使えば同じようにストリーミングアップロードができると思う。
検証中にgsutilコマンドがうまく動かないと思ったらこれだった。
https://github.com/googleapis/google-api-python-client/issues/803
httplib2をダウングレードして解決。
$ pip install httplib2==0.15.0