9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Embulkでローカルディスクを使わずにクラウドストレージにアップロードする

Last updated at Posted at 2020-02-16

embulk-output-commandからgsutilコマンドにデータを渡してストリーミングアップロードしてディスクレスにしてみた話。

embulkってなにって方はこちらから。

追記

2020-09-13

この記事の gsutil と embulk-output-command 行っていたストリーミングデータ転送処理をEmbulkプラグインで書きました。
https://github.com/irotoris/embulk-output-gcs_streaming

なんでやったの

EmbulkでMySQLやOracleからデータをBigQueryにデータ連携する際、データレイクとしてひとまずGoogle Cloud Storage(GCS)にファイルをアップロードしています。

mysql-to-bq-embulk.png

最初はembulk-output-gcsを使ってたんですけど、1TBのテーブルとか出てきてマシンのストレージが枯渇しちゃったんですよね。

そこで、embulk-output-commandからgsutilコマンドにデータを渡して、ストリーミングアップロードしてディスクレスにしてみました。

GCSのストリーミングアップロードに関してはこちら。
https://cloud.google.com/storage/docs/streaming

どうやったの

環境

  • ubuntu:bionic
  • openjdk-8-jdk
  • Embulk v0.9.17
  • embulk-input-mysql v0.10.1
  • embulk-output-gcs v0.4.4
  • embulk-output-command v0.1.4
  • gsutil 4.47

embulkはGKEでコンテナ実行

Before (embulk-outout-gcs)

embulk-output-gcsを使うとこんな感じ。
ローカルの/tmpにファイルがに書き込まれて、GCSアップロード後に削除されているのが分かります。

job.yaml
in:
  type: mysql
  host: localhost
  port: 3306
  database: test
  user: mysql_user
  password: xxxxxxx
  query: select * from test_table
out:
  type: gcs
  auth_method: compute_engine
  bucket: bucket_name
  path_prefix: mysql/test/test_table
  file_ext: .csv
  formatter:
    type: csv
...
2020-02-15 13:11:17.756 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2020-02-15 13:11:17.790 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2020-02-15 13:11:19.186 +0000 [INFO] (0016:task-0000): Fetch size is 10000. Using server-side prepared statement.
2020-02-15 13:11:19.187 +0000 [INFO] (0016:task-0000): Connecting to jdbc:mysql://localhost:3306/test options {useCompression=true, socketTimeout=1800000, useSSL=false, user=admin, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2020-02-15 13:11:19.289 +0000 [INFO] (0016:task-0000): SQL: select * from test
2020-02-15 13:11:19.311 +0000 [INFO] (0016:task-0000): > 0.01 seconds
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Local Hash(MD5): XTIT3ANlo6QqZnwNBdzGCA== / Remote Hash(MD5): XTIT3ANlo6QqZnwNBdzGCA==
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Delete generated file: /tmp/embulk/2020-02-15 13-11-16.996 UTC/0016_task-0000_5418765600230876857.tmp > true
2020-02-15 13:11:20.679 +0000 [INFO] (0016:task-0000): Uploaded 'bucket_name/mysql/test/test_table.000.01.csv' to 930bytes
...

After (embulk-output-command + gsutil)

embulk-output-commandからgsutilでストリーミングアップロードするとこんな感じ。
inputの設定は同じなので省略してます。
gsutilが標準入力からデータを受け取ってアップロードしているログが出てます。

job.yaml
out:
  command: "gsutil cp - gs://bucket_name/mysql/test/test_table/data.$INDEX.$SEQID.csv"
  formatter:
    type: csv
  type: command
...
2020-02-15 13:09:59.514 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2020-02-15 13:09:59.543 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2020-02-15 13:09:59.559 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.634 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.638 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.643 +0000 [INFO] (0016:task-0000): Using command [sh, -c, gsutil cp - gs://bucket_name/mysql/test/test_table.$INDEX.$SEQID.csv]
2020-02-15 13:09:59.700 +0000 [INFO] (0016:task-0000): Fetch size is 10000. Using server-side prepared statement.
2020-02-15 13:09:59.701 +0000 [INFO] (0016:task-0000): Connecting to jdbc:mysql://localhost:3306/test options {useCompression=true, socketTimeout=1800000, useSSL=false, user=admin, useLegacyDatetimeCode=false, tcpKeepAlive=true, useCursorFetch=true, connectTimeout=300000, password=***, zeroDateTimeBehavior=convertToNull}
2020-02-15 13:09:59.814 +0000 [INFO] (0016:task-0000): SQL: select * from test_table
2020-02-15 13:09:59.838 +0000 [INFO] (0016:task-0000): > 0.01 seconds
Copying from <STDIN>...
Copying from <STDIN>...   0.0 B]                                                
Copying from <STDIN>...   0.0 B]                                                
Copying from <STDIN>...   0.0 B]                                                
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
...

Pros / Cons

Pros

  • ディスクレスでembulkでクラウドストレージにアップロードできる
    • SSDって値段高いよね。
    • コンテナ on k8sで動かそうとした日にはPersistent Volumeがでてきて管理が面倒です。
    • ステートレス最高。

Cons

  • gsutilのストリーミングアップロードはチェックサム計算をしてくれない
    • なので自分でデータの検証を書く必要がります。もはやローカルにファイル持ってないので、チェックサムによる検証はできません。いまはBigQueryにロードできるかどうか(CSVファイルとして正しいか)と、MySQLとBigQueryのレコード数が一致しているかを見ています。

性能

OracleからGCSに入れる処理で比較。

  • ネットワーク経路: Oracle---(DedicatedIntterConnect)---GKE--(PrivateAccess)---GCS
  • コンピュート: 8 vCPU / 7.5GB Mem / 50GB SSD
  • データ量: 2,000,000レコード / 7ファイル / 5.2GB / CSV
処理 時間 コマンド
embulk-output-gcs 6m18.374s embulk run job.yaml
embulk-output-command + gsutil cp(streaming) 3m5.471s embulk run job.yaml
embulk-output-file + gsutil cp(bulk) 3m15.211s embulk run job.yaml && gsutil -m cp -r /tmp/embulk gs://bucket_name/tmp/embulk

embulk-output-gcsは内部で使ってるGCSのAPIバージョンが古いので、結構遅かった。
なので、embulkでファイル化したあとに普通にgsutil cp (bulk upload)した処理も計測。
コンピュートリソースやネットワーク構成、input側も影響するので一概には言えませんが、5.2GBくらいだったらストリーミングアップロードにしても問題なさそうでした。

ちなみに2TBのテーブルでやったときは50時間ほどかかったので、時間は線形増加にはならなさそう。

メモ

embulk-output-commandは、ScatterExecutorの恩恵を受ける。
outputタスク数(デフォルトだとCPUコア数、設定で指定可能)の分だけ並列に処理してくれる。
Embulk の LocalExecutor プラグインの振る舞いについて整理

あときっとembulk-output-s3もawscli使えば同じようにストリーミングアップロードができると思う。

検証中にgsutilコマンドがうまく動かないと思ったらこれだった。
https://github.com/googleapis/google-api-python-client/issues/803
httplib2をダウングレードして解決。

$ pip install httplib2==0.15.0
9
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?