はじめに
BigQueryからAmazon RDSのPostgreSQLへのデータ転送およびテーブルの洗い替えにおいてEmbulkを利用しました。
当初、転送速度が思ったよりも出てませんでしたが、inputをembulk-input-bigquery
からembulk-input-gcs
へ変更することで大きく速度を改善することが出来ました。
転送対象のデータについて
転送対象となるBigQueryデータに関しては、約1.6億レコードです。
改善前のパフォーマンス
パフォーマンス改善前の実行のログを一部抜粋しました。
10:46:06
からデータ転送が始まり、16:58:42
にテーブルの洗い替えが完了しています。
データ連携とテーブルの更新でおよそ6時間ほどかかっています。
2024-08-09 04:31:52.899 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2024-08-09 04:31:52.916 +0900 [INFO] (0001:transaction): JDBC Driver = /var/lib/embulk/vendor/bundle/jruby/2.3.0/gems/embulk-output-postgresql-0.8.2/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2024-08-09 04:31:52.920 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 04:31:52.977 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-09 04:31:52.979 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 04:31:52.980 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2024-08-09 04:31:52.980 +0900 [INFO] (0001:transaction): Using insert_direct mode
2024-08-09 04:31:53.006 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS test_table_temp;
DROP TABLE IF EXISTS test_table_old;
CREATE TABLE test_table_temp AS TABLE test_table WITH NO DATA;
2024-08-09 10:46:06.628 +0900 [INFO] (0001:transaction): > 22453.62 seconds
2024-08-09 10:46:06.642 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "public"."test_table_temp" ("column1" BIGINT, "column2" BIGINT, "column4" TIMESTAMP WITH TIME ZONE, "column4" BIGINT, "column5" BIGINT, "created_at" TIMESTAMP WITH TIME ZONE)
2024-08-09 10:46:06.643 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 10:46:06.703 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2024-08-09 10:46:06.735 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.747 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.748 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.749 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.754 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.762 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.763 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.763 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.767 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.776 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.778 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.778 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.780 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.797 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.798 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.798 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:48:54.709 +0900 [INFO] (embulk-output-executor-0): Loading 273,296 rows (16,818,071 bytes)
2024-08-09 10:48:54.731 +0900 [INFO] (embulk-output-executor-1): Loading 273,296 rows (16,820,172 bytes)
2024-08-09 10:48:54.746 +0900 [INFO] (embulk-output-executor-2): Loading 273,296 rows (16,816,496 bytes)
2024-08-09 10:48:54.767 +0900 [INFO] (embulk-output-executor-3): Loading 273,296 rows (16,818,972 bytes)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-1): > 0.77 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-0): > 0.79 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-2): > 0.75 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.753 +0900 [INFO] (embulk-output-executor-3): > 0.99 seconds (loaded 273,296 rows in total)
2024-08-09 16:51:42.571 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2024-08-09 16:51:42.571 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 16:51:42.578 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-09 16:51:42.579 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 16:51:42.580 +0900 [INFO] (0001:transaction): SQL: BEGIN;
ALTER TABLE test_table rename to test_table_old;
ALTER INDEX test_table_pkey RENAME TO test_table_pkey_old;
ALTER TABLE test_table_temp ADD CONSTRAINT test_table_pkey primary key (col1, col2, col3);
ALTER INDEX idx_testtable_gid_agw RENAME TO idx_testtable_gid_agw_old;
CREATE INDEX idx_testtable_gid_agw ON test_table_temp USING btree (col2, col3);
ALTER TABLE test_table_temp rename to test_table;
DROP TABLE test_table_old;
COMMIT;
2024-08-09 16:58:42.760 +0900 [INFO] (0001:transaction): > 420.18 seconds
2024-08-09 16:58:42.776 +0900 [INFO] (main): Committed.
2024-08-09 16:58:42.777 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
ボトルネックの調査
Embulkのログを見てみたところ、以下のログがありました。
2024-08-09 04:31:52.899 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
それぞれの用語と各値の計算については以下の記事が参考になりますのでご参照ください。
今回の場合、max_threads
が8となっていますが、output tasks
では4となっています。
そのため、本来使えるスレッド数より少ないスレッド数でoutput taskが実行されています。
実際のログを見ても、embulk-output-executor-0
~embulk-output-executor-3
までしか無く、4スレッドでoutput taskが実行されていることがわかります。
このoutput tasksはinput tasksの値によって決まり、input tasksはinputプラグインの種類によって決まります。FileInputプラグインの場合はファイル数、Inputプラグインの場合は1となります。
当初利用していたembulk-input-bigquery
を見てみると、InputPluginベースのプラグインとなっています。この場合、input tasks = 1となります。
embulk-input-bigquery
からembulk-input-gcs
へ変更
max_thread
までスレッドを利用するため、output tasksの数を増やしたい。そのためにinput tasksの数を増やす必要があります。FileInputプラグインの場合はファイル数 = input tasksになるため、inputをFileInputプラグインに変えるのが良さそうです。
embulk-input-gcs
は、FileInputPluginベースのプラグインとなっているため、inputをembulk-input-gcs
へ変えることにしました。
https://github.com/embulk/embulk-input-gcs/blob/a86ca685a6d40483e512c135d4a4ac911da65252/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java#L19-L20
inputをembulk-input-gcs
に変えるにあたって、事前にBigQueryからCloud Storageへテーブルデータをエクスポートさせておく必要があります。エクスポートに関しては以下をご参照ください。
なお、エクスポートさせるテーブルデータが大きい場合、Cloud Storageへのエクスポート時に複数ファイルの分割されます。
以下がembulk-input-gcs
の設定ファイルになります。
in:
type: gcs
bucket: test_bucket
path_prefix: test_table/20240827
auth_method: json_key
json_keyfile:
content: |
{{env.gcp_credential}}
parser:
type: json
columns:
- { name: col1, type: long }
- { name: col2, type: long }
- { name: col3, type: timestamp, format: '%Y-%m-%d %H:%M:%S %Z' }
- { name: col4, type: long }
- { name: col5, type: long }
- { name: created_at, type: string }
path_prefix
で連携対象のファイルのprefixを指定できるため、複数ファイルに分割されていても問題なく連携可能です。
改善後のパフォーマンス
以下がinputをembulk-input-gcs
へ変更後の実行ログになります。
2024-08-27 04:31:28.854 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=300
2024-08-27 04:31:28.873 +0900 [INFO] (0001:transaction): JDBC Driver = /var/lib/embulk/vendor/bundle/jruby/2.3.0/gems/embulk-output-postgresql-0.8.2/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2024-08-27 04:31:28.878 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:28.924 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): Using insert_direct mode
2024-08-27 04:31:28.962 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS test_table_temp;
DROP TABLE IF EXISTS test_table_old;
CREATE TABLE test_table_temp AS TABLE test_table WITH NO DATA;
2024-08-27 04:31:28.992 +0900 [INFO] (0001:transaction): > 0.03 seconds
2024-08-27 04:31:29.722 +0900 [INFO] (0001:transaction): {done: 0 / 300, running: 0}
2024-08-27 04:31:29.753 +0900 [INFO] (0023:task-0003): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.753 +0900 [INFO] (0021:task-0001): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0024:task-0004): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0027:task-0007): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0025:task-0005): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0022:task-0002): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0026:task-0006): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0020:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
...
2024-08-27 04:32:13.773 +0900 [INFO] (0022:task-0010): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:13.774 +0900 [INFO] (0026:task-0011): > 0.00 seconds
2024-08-27 04:32:13.774 +0900 [INFO] (0026:task-0011): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:14.020 +0900 [INFO] (0021:task-0001): Loading 7,749 rows (479,791 bytes)
2024-08-27 04:32:14.028 +0900 [INFO] (0020:task-0000): Loading 9,333 rows (578,218 bytes)
2024-08-27 04:32:14.049 +0900 [INFO] (0027:task-0007): > 0.60 seconds (loaded 547,088 rows in total)
2024-08-27 04:32:14.190 +0900 [INFO] (0021:task-0001): > 0.17 seconds (loaded 554,837 rows in total)
2024-08-27 04:32:14.193 +0900 [INFO] (0021:task-0012): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:32:14.200 +0900 [INFO] (0021:task-0012): SQL: SET search_path TO "public"
2024-08-27 04:32:14.201 +0900 [INFO] (0021:task-0012): > 0.00 seconds
2024-08-27 04:32:14.202 +0900 [INFO] (0021:task-0012): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:14.213 +0900 [INFO] (0027:task-0007): Loading 8,445 rows (523,202 bytes)
2024-08-27 04:32:14.226 +0900 [INFO] (0025:task-0005): > 0.60 seconds (loaded 547,088 rows in total)
2024-08-27 04:32:14.226 +0900 [INFO] (0020:task-0000): > 0.20 seconds (loaded 556,421 rows in total)
2024-08-27 04:32:14.227 +0900 [INFO] (0001:transaction): {done: 6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done: 6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done: 6 / 300, running: 7}
...
2024-08-27 04:56:15.342 +0900 [INFO] (0001:transaction): {done:300 / 300, running: 0}
2024-08-27 04:56:15.343 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:56:15.350 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-27 04:56:15.350 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-27 04:56:15.351 +0900 [INFO] (0001:transaction): SQL: BEGIN;
ALTER TABLE test_table rename to test_table_old;
ALTER INDEX test_table_pkey RENAME TO test_table_pkey_old;
ALTER TABLE test_table_temp ADD CONSTRAINT test_table_pkey primary key (col1, col2, col3);
ALTER INDEX idx_testtable_gid_agw RENAME TO idx_testtable_gid_agw_old;
CREATE INDEX idx_testtable_gid_agw ON test_table_temp USING btree (col2, col3);
ALTER TABLE test_table_temp rename to test_table;
DROP TABLE test_table_old;
COMMIT;
2024-08-27 05:02:09.497 +0900 [INFO] (0001:transaction): > 354.15 seconds
2024-08-27 05:02:12.915 +0900 [INFO] (main): Committed.
2024-08-27 05:02:12.915 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"test_table/20240827_043058/1133830/000000000299.json"},"out":{}}
ログの時間を見ると、04:31:28
から連携が開始し、05:02:12
に連携が完了しており、30分程度でテーブルの洗い替えが完了しています。
ログを詳しく見てみると、max_thread
が8で、tasks
が300となっています。
また、ログからmaxの8スレッドでDBコネクションを貼っていることがわかります。
Using local thread executor with max_threads=8 / tasks=300
2024-08-27 04:31:29.753 +0900 [INFO] (0023:task-0003): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.753 +0900 [INFO] (0021:task-0001): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0024:task-0004): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0027:task-0007): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0025:task-0005): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0022:task-0002): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0026:task-0006): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0020:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
また、runningの数を見ても7とmax_thread
近くまでスレッドを使えていることがわかります。
2024-08-27 04:32:14.227 +0900 [INFO] (0001:transaction): {done: 6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done: 6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done: 6 / 300, running: 7}
まとめ
Embulkのinputをembulk-input-bigquery
からembulk-input-gcs
へ変更をすることでパフォーマンス改善が出来ました。
DWHとしてBigQueryを利用しており、BigQuery上で集計したデータをアプリケーションのDBに連携させたい場面があると思いますが、その際のパフォーマンスを改善したい方の参考になれば幸いです。