6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZOZOAdvent Calendar 2024

Day 15

Embulkを使ったBigQueryからのデータ連携のパフォーマンス改善

Last updated at Posted at 2024-12-14

はじめに

BigQueryからAmazon RDSのPostgreSQLへのデータ転送およびテーブルの洗い替えにおいてEmbulkを利用しました。
当初、転送速度が思ったよりも出てませんでしたが、inputをembulk-input-bigqueryからembulk-input-gcsへ変更することで大きく速度を改善することが出来ました。

転送対象のデータについて

転送対象となるBigQueryデータに関しては、約1.6億レコードです。
スクリーンショット_2024-08-16_13_39_46.png

改善前のパフォーマンス

パフォーマンス改善前の実行のログを一部抜粋しました。
10:46:06からデータ転送が始まり、16:58:42にテーブルの洗い替えが完了しています。
データ連携とテーブルの更新でおよそ6時間ほどかかっています。

2024-08-09 04:31:52.899 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2024-08-09 04:31:52.916 +0900 [INFO] (0001:transaction): JDBC Driver = /var/lib/embulk/vendor/bundle/jruby/2.3.0/gems/embulk-output-postgresql-0.8.2/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2024-08-09 04:31:52.920 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 04:31:52.977 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-09 04:31:52.979 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 04:31:52.980 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2024-08-09 04:31:52.980 +0900 [INFO] (0001:transaction): Using insert_direct mode
2024-08-09 04:31:53.006 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS test_table_temp;
DROP TABLE IF EXISTS test_table_old;
CREATE TABLE test_table_temp AS TABLE test_table WITH NO DATA;

2024-08-09 10:46:06.628 +0900 [INFO] (0001:transaction): > 22453.62 seconds
2024-08-09 10:46:06.642 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "public"."test_table_temp" ("column1" BIGINT, "column2" BIGINT, "column4" TIMESTAMP WITH TIME ZONE, "column4" BIGINT, "column5" BIGINT, "created_at" TIMESTAMP WITH TIME ZONE)
2024-08-09 10:46:06.643 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 10:46:06.703 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2024-08-09 10:46:06.735 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.747 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.748 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.749 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.754 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.762 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.763 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.763 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.767 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.776 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.778 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.778 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.780 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.797 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.798 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.798 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:48:54.709 +0900 [INFO] (embulk-output-executor-0): Loading 273,296 rows (16,818,071 bytes)
2024-08-09 10:48:54.731 +0900 [INFO] (embulk-output-executor-1): Loading 273,296 rows (16,820,172 bytes)
2024-08-09 10:48:54.746 +0900 [INFO] (embulk-output-executor-2): Loading 273,296 rows (16,816,496 bytes)
2024-08-09 10:48:54.767 +0900 [INFO] (embulk-output-executor-3): Loading 273,296 rows (16,818,972 bytes)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-1): > 0.77 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-0): > 0.79 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-2): > 0.75 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.753 +0900 [INFO] (embulk-output-executor-3): > 0.99 seconds (loaded 273,296 rows in total)

2024-08-09 16:51:42.571 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2024-08-09 16:51:42.571 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 16:51:42.578 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-09 16:51:42.579 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 16:51:42.580 +0900 [INFO] (0001:transaction): SQL: BEGIN;
ALTER TABLE test_table rename to test_table_old;
ALTER INDEX test_table_pkey RENAME TO test_table_pkey_old;
ALTER TABLE test_table_temp ADD CONSTRAINT test_table_pkey primary key (col1, col2, col3);
ALTER INDEX idx_testtable_gid_agw RENAME TO idx_testtable_gid_agw_old;
CREATE INDEX idx_testtable_gid_agw ON test_table_temp USING btree (col2, col3);
ALTER TABLE test_table_temp rename to test_table;
DROP TABLE test_table_old;
COMMIT;

2024-08-09 16:58:42.760 +0900 [INFO] (0001:transaction): > 420.18 seconds
2024-08-09 16:58:42.776 +0900 [INFO] (main): Committed.
2024-08-09 16:58:42.777 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

ボトルネックの調査

Embulkのログを見てみたところ、以下のログがありました。

2024-08-09 04:31:52.899 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4

それぞれの用語と各値の計算については以下の記事が参考になりますのでご参照ください。

今回の場合、max_threadsが8となっていますが、output tasksでは4となっています。
そのため、本来使えるスレッド数より少ないスレッド数でoutput taskが実行されています。

実際のログを見ても、embulk-output-executor-0~embulk-output-executor-3までしか無く、4スレッドでoutput taskが実行されていることがわかります。

このoutput tasksはinput tasksの値によって決まり、input tasksはinputプラグインの種類によって決まります。FileInputプラグインの場合はファイル数、Inputプラグインの場合は1となります。
当初利用していたembulk-input-bigqueryを見てみると、InputPluginベースのプラグインとなっています。この場合、input tasks = 1となります。

embulk-input-bigqueryからembulk-input-gcsへ変更

max_threadまでスレッドを利用するため、output tasksの数を増やしたい。そのためにinput tasksの数を増やす必要があります。FileInputプラグインの場合はファイル数 = input tasksになるため、inputをFileInputプラグインに変えるのが良さそうです。

embulk-input-gcsは、FileInputPluginベースのプラグインとなっているため、inputをembulk-input-gcsへ変えることにしました。
https://github.com/embulk/embulk-input-gcs/blob/a86ca685a6d40483e512c135d4a4ac911da65252/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java#L19-L20

inputをembulk-input-gcsに変えるにあたって、事前にBigQueryからCloud Storageへテーブルデータをエクスポートさせておく必要があります。エクスポートに関しては以下をご参照ください。

なお、エクスポートさせるテーブルデータが大きい場合、Cloud Storageへのエクスポート時に複数ファイルの分割されます。

以下がembulk-input-gcsの設定ファイルになります。

in:
  type: gcs
  bucket: test_bucket
  path_prefix: test_table/20240827
  auth_method: json_key
  json_keyfile:
    content: |
      {{env.gcp_credential}}
  parser:
    type: json
    columns:
      - { name: col1, type: long }
      - { name: col2, type: long }
      - { name: col3, type: timestamp, format: '%Y-%m-%d %H:%M:%S %Z' }
      - { name: col4, type: long }
      - { name: col5, type: long }
      - { name: created_at, type: string }

path_prefixで連携対象のファイルのprefixを指定できるため、複数ファイルに分割されていても問題なく連携可能です。

改善後のパフォーマンス

以下がinputをembulk-input-gcsへ変更後の実行ログになります。

2024-08-27 04:31:28.854 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=300
2024-08-27 04:31:28.873 +0900 [INFO] (0001:transaction): JDBC Driver = /var/lib/embulk/vendor/bundle/jruby/2.3.0/gems/embulk-output-postgresql-0.8.2/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2024-08-27 04:31:28.878 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:28.924 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): Using insert_direct mode
2024-08-27 04:31:28.962 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS test_table_temp;
DROP TABLE IF EXISTS test_table_old;
CREATE TABLE test_table_temp AS TABLE test_table WITH NO DATA;

2024-08-27 04:31:28.992 +0900 [INFO] (0001:transaction): > 0.03 seconds
2024-08-27 04:31:29.722 +0900 [INFO] (0001:transaction): {done:  0 / 300, running: 0}
2024-08-27 04:31:29.753 +0900 [INFO] (0023:task-0003): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.753 +0900 [INFO] (0021:task-0001): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0024:task-0004): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0027:task-0007): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0025:task-0005): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0022:task-0002): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0026:task-0006): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0020:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}

...

2024-08-27 04:32:13.773 +0900 [INFO] (0022:task-0010): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:13.774 +0900 [INFO] (0026:task-0011): > 0.00 seconds
2024-08-27 04:32:13.774 +0900 [INFO] (0026:task-0011): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:14.020 +0900 [INFO] (0021:task-0001): Loading 7,749 rows (479,791 bytes)
2024-08-27 04:32:14.028 +0900 [INFO] (0020:task-0000): Loading 9,333 rows (578,218 bytes)
2024-08-27 04:32:14.049 +0900 [INFO] (0027:task-0007): > 0.60 seconds (loaded 547,088 rows in total)
2024-08-27 04:32:14.190 +0900 [INFO] (0021:task-0001): > 0.17 seconds (loaded 554,837 rows in total)
2024-08-27 04:32:14.193 +0900 [INFO] (0021:task-0012): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:32:14.200 +0900 [INFO] (0021:task-0012): SQL: SET search_path TO "public"
2024-08-27 04:32:14.201 +0900 [INFO] (0021:task-0012): > 0.00 seconds
2024-08-27 04:32:14.202 +0900 [INFO] (0021:task-0012): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:14.213 +0900 [INFO] (0027:task-0007): Loading 8,445 rows (523,202 bytes)
2024-08-27 04:32:14.226 +0900 [INFO] (0025:task-0005): > 0.60 seconds (loaded 547,088 rows in total)
2024-08-27 04:32:14.226 +0900 [INFO] (0020:task-0000): > 0.20 seconds (loaded 556,421 rows in total)
2024-08-27 04:32:14.227 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}

... 

2024-08-27 04:56:15.342 +0900 [INFO] (0001:transaction): {done:300 / 300, running: 0}
2024-08-27 04:56:15.343 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:56:15.350 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-27 04:56:15.350 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-27 04:56:15.351 +0900 [INFO] (0001:transaction): SQL: BEGIN;
ALTER TABLE test_table rename to test_table_old;
ALTER INDEX test_table_pkey RENAME TO test_table_pkey_old;
ALTER TABLE test_table_temp ADD CONSTRAINT test_table_pkey primary key (col1, col2, col3);
ALTER INDEX idx_testtable_gid_agw RENAME TO idx_testtable_gid_agw_old;
CREATE INDEX idx_testtable_gid_agw ON test_table_temp USING btree (col2, col3);
ALTER TABLE test_table_temp rename to test_table;
DROP TABLE test_table_old;
COMMIT;

2024-08-27 05:02:09.497 +0900 [INFO] (0001:transaction): > 354.15 seconds
2024-08-27 05:02:12.915 +0900 [INFO] (main): Committed.
2024-08-27 05:02:12.915 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"test_table/20240827_043058/1133830/000000000299.json"},"out":{}}

ログの時間を見ると、04:31:28から連携が開始し、05:02:12に連携が完了しており、30分程度でテーブルの洗い替えが完了しています。

ログを詳しく見てみると、max_threadが8で、tasksが300となっています。
また、ログからmaxの8スレッドでDBコネクションを貼っていることがわかります。

Using local thread executor with max_threads=8 / tasks=300

2024-08-27 04:31:29.753 +0900 [INFO] (0023:task-0003): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.753 +0900 [INFO] (0021:task-0001): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0024:task-0004): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0027:task-0007): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0025:task-0005): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0022:task-0002): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0026:task-0006): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0020:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}

また、runningの数を見ても7とmax_thread近くまでスレッドを使えていることがわかります。

2024-08-27 04:32:14.227 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}

まとめ

Embulkのinputをembulk-input-bigqueryからembulk-input-gcsへ変更をすることでパフォーマンス改善が出来ました。

DWHとしてBigQueryを利用しており、BigQuery上で集計したデータをアプリケーションのDBに連携させたい場面があると思いますが、その際のパフォーマンスを改善したい方の参考になれば幸いです。

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?