Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZOZOAdvent Calendar 2024

Day 15


Last updated at Posted at 2024-12-14


BigQueryからAmazon RDSのPostgreSQLへのデータ転送およびテーブルの洗い替えにおいてEmbulkを利用しました。





2024-08-09 04:31:52.899 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2024-08-09 04:31:52.916 +0900 [INFO] (0001:transaction): JDBC Driver = /var/lib/embulk/vendor/bundle/jruby/2.3.0/gems/embulk-output-postgresql-0.8.2/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2024-08-09 04:31:52.920 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 04:31:52.977 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-09 04:31:52.979 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 04:31:52.980 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2024-08-09 04:31:52.980 +0900 [INFO] (0001:transaction): Using insert_direct mode
2024-08-09 04:31:53.006 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS test_table_temp;
DROP TABLE IF EXISTS test_table_old;
CREATE TABLE test_table_temp AS TABLE test_table WITH NO DATA;

2024-08-09 10:46:06.628 +0900 [INFO] (0001:transaction): > 22453.62 seconds
2024-08-09 10:46:06.642 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "public"."test_table_temp" ("column1" BIGINT, "column2" BIGINT, "column4" TIMESTAMP WITH TIME ZONE, "column4" BIGINT, "column5" BIGINT, "created_at" TIMESTAMP WITH TIME ZONE)
2024-08-09 10:46:06.643 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 10:46:06.703 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2024-08-09 10:46:06.735 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.747 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.748 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.749 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.754 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.762 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.763 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.763 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.767 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.776 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.778 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.778 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:46:06.780 +0900 [INFO] (0022:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 10:46:06.797 +0900 [INFO] (0022:task-0000): SQL: SET search_path TO "public"
2024-08-09 10:46:06.798 +0900 [INFO] (0022:task-0000): > 0.00 seconds
2024-08-09 10:46:06.798 +0900 [INFO] (0022:task-0000): Copy SQL: COPY "public"."test_table_temp" ("column1", "column2", "column3", "column4", "column5", "created_at") FROM STDIN
2024-08-09 10:48:54.709 +0900 [INFO] (embulk-output-executor-0): Loading 273,296 rows (16,818,071 bytes)
2024-08-09 10:48:54.731 +0900 [INFO] (embulk-output-executor-1): Loading 273,296 rows (16,820,172 bytes)
2024-08-09 10:48:54.746 +0900 [INFO] (embulk-output-executor-2): Loading 273,296 rows (16,816,496 bytes)
2024-08-09 10:48:54.767 +0900 [INFO] (embulk-output-executor-3): Loading 273,296 rows (16,818,972 bytes)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-1): > 0.77 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-0): > 0.79 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.500 +0900 [INFO] (embulk-output-executor-2): > 0.75 seconds (loaded 273,296 rows in total)
2024-08-09 10:48:55.753 +0900 [INFO] (embulk-output-executor-3): > 0.99 seconds (loaded 273,296 rows in total)

2024-08-09 16:51:42.571 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2024-08-09 16:51:42.571 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-09 16:51:42.578 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-09 16:51:42.579 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-09 16:51:42.580 +0900 [INFO] (0001:transaction): SQL: BEGIN;
ALTER TABLE test_table rename to test_table_old;
ALTER INDEX test_table_pkey RENAME TO test_table_pkey_old;
ALTER TABLE test_table_temp ADD CONSTRAINT test_table_pkey primary key (col1, col2, col3);
ALTER INDEX idx_testtable_gid_agw RENAME TO idx_testtable_gid_agw_old;
CREATE INDEX idx_testtable_gid_agw ON test_table_temp USING btree (col2, col3);
ALTER TABLE test_table_temp rename to test_table;
DROP TABLE test_table_old;

2024-08-09 16:58:42.760 +0900 [INFO] (0001:transaction): > 420.18 seconds
2024-08-09 16:58:42.776 +0900 [INFO] (main): Committed.
2024-08-09 16:58:42.777 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}



2024-08-09 04:31:52.899 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4


今回の場合、max_threadsが8となっていますが、output tasksでは4となっています。
そのため、本来使えるスレッド数より少ないスレッド数でoutput taskが実行されています。

実際のログを見ても、embulk-output-executor-0~embulk-output-executor-3までしか無く、4スレッドでoutput taskが実行されていることがわかります。

このoutput tasksはinput tasksの値によって決まり、input tasksはinputプラグインの種類によって決まります。FileInputプラグインの場合はファイル数、Inputプラグインの場合は1となります。
当初利用していたembulk-input-bigqueryを見てみると、InputPluginベースのプラグインとなっています。この場合、input tasks = 1となります。


max_threadまでスレッドを利用するため、output tasksの数を増やしたい。そのためにinput tasksの数を増やす必要があります。FileInputプラグインの場合はファイル数 = input tasksになるため、inputをFileInputプラグインに変えるのが良さそうです。


inputをembulk-input-gcsに変えるにあたって、事前にBigQueryからCloud Storageへテーブルデータをエクスポートさせておく必要があります。エクスポートに関しては以下をご参照ください。

なお、エクスポートさせるテーブルデータが大きい場合、Cloud Storageへのエクスポート時に複数ファイルの分割されます。


  type: gcs
  bucket: test_bucket
  path_prefix: test_table/20240827
  auth_method: json_key
    content: |
    type: json
      - { name: col1, type: long }
      - { name: col2, type: long }
      - { name: col3, type: timestamp, format: '%Y-%m-%d %H:%M:%S %Z' }
      - { name: col4, type: long }
      - { name: col5, type: long }
      - { name: created_at, type: string }




2024-08-27 04:31:28.854 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=300
2024-08-27 04:31:28.873 +0900 [INFO] (0001:transaction): JDBC Driver = /var/lib/embulk/vendor/bundle/jruby/2.3.0/gems/embulk-output-postgresql-0.8.2/default_jdbc_driver/postgresql-9.4-1205-jdbc41.jar
2024-08-27 04:31:28.878 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:28.924 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): Using JDBC Driver PostgreSQL 9.4 JDBC4.1 (build 1205)
2024-08-27 04:31:28.926 +0900 [INFO] (0001:transaction): Using insert_direct mode
2024-08-27 04:31:28.962 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS test_table_temp;
DROP TABLE IF EXISTS test_table_old;
CREATE TABLE test_table_temp AS TABLE test_table WITH NO DATA;

2024-08-27 04:31:28.992 +0900 [INFO] (0001:transaction): > 0.03 seconds
2024-08-27 04:31:29.722 +0900 [INFO] (0001:transaction): {done:  0 / 300, running: 0}
2024-08-27 04:31:29.753 +0900 [INFO] (0023:task-0003): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.753 +0900 [INFO] (0021:task-0001): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0024:task-0004): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0027:task-0007): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0025:task-0005): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0022:task-0002): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0026:task-0006): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0020:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}


2024-08-27 04:32:13.773 +0900 [INFO] (0022:task-0010): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:13.774 +0900 [INFO] (0026:task-0011): > 0.00 seconds
2024-08-27 04:32:13.774 +0900 [INFO] (0026:task-0011): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:14.020 +0900 [INFO] (0021:task-0001): Loading 7,749 rows (479,791 bytes)
2024-08-27 04:32:14.028 +0900 [INFO] (0020:task-0000): Loading 9,333 rows (578,218 bytes)
2024-08-27 04:32:14.049 +0900 [INFO] (0027:task-0007): > 0.60 seconds (loaded 547,088 rows in total)
2024-08-27 04:32:14.190 +0900 [INFO] (0021:task-0001): > 0.17 seconds (loaded 554,837 rows in total)
2024-08-27 04:32:14.193 +0900 [INFO] (0021:task-0012): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:32:14.200 +0900 [INFO] (0021:task-0012): SQL: SET search_path TO "public"
2024-08-27 04:32:14.201 +0900 [INFO] (0021:task-0012): > 0.00 seconds
2024-08-27 04:32:14.202 +0900 [INFO] (0021:task-0012): Copy SQL: COPY "public"."test_table_temp" ("col1", "col2", "col3", "col4", "col5", "created_at") FROM STDIN
2024-08-27 04:32:14.213 +0900 [INFO] (0027:task-0007): Loading 8,445 rows (523,202 bytes)
2024-08-27 04:32:14.226 +0900 [INFO] (0025:task-0005): > 0.60 seconds (loaded 547,088 rows in total)
2024-08-27 04:32:14.226 +0900 [INFO] (0020:task-0000): > 0.20 seconds (loaded 556,421 rows in total)
2024-08-27 04:32:14.227 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}


2024-08-27 04:56:15.342 +0900 [INFO] (0001:transaction): {done:300 / 300, running: 0}
2024-08-27 04:56:15.343 +0900 [INFO] (0001:transaction): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:56:15.350 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2024-08-27 04:56:15.350 +0900 [INFO] (0001:transaction): > 0.00 seconds
2024-08-27 04:56:15.351 +0900 [INFO] (0001:transaction): SQL: BEGIN;
ALTER TABLE test_table rename to test_table_old;
ALTER INDEX test_table_pkey RENAME TO test_table_pkey_old;
ALTER TABLE test_table_temp ADD CONSTRAINT test_table_pkey primary key (col1, col2, col3);
ALTER INDEX idx_testtable_gid_agw RENAME TO idx_testtable_gid_agw_old;
CREATE INDEX idx_testtable_gid_agw ON test_table_temp USING btree (col2, col3);
ALTER TABLE test_table_temp rename to test_table;
DROP TABLE test_table_old;

2024-08-27 05:02:09.497 +0900 [INFO] (0001:transaction): > 354.15 seconds
2024-08-27 05:02:12.915 +0900 [INFO] (main): Committed.
2024-08-27 05:02:12.915 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"test_table/20240827_043058/1133830/000000000299.json"},"out":{}}



Using local thread executor with max_threads=8 / tasks=300

2024-08-27 04:31:29.753 +0900 [INFO] (0023:task-0003): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.753 +0900 [INFO] (0021:task-0001): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0024:task-0004): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0027:task-0007): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0025:task-0005): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0022:task-0002): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0026:task-0006): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}
2024-08-27 04:31:29.754 +0900 [INFO] (0020:task-0000): Connecting to DB_ENDPOINT options {user=DB_USER, password=***, tcpKeepAlive=true, loginTimeout=300, socketTimeout=36000}


2024-08-27 04:32:14.227 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}
2024-08-27 04:32:14.228 +0900 [INFO] (0001:transaction): {done:  6 / 300, running: 7}





Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?