LoginSignup
4
0

More than 1 year has passed since last update.

[OCI]Autonomous Databaseのデータ・パイプライン機能(DBMS_CLOUD_PIPELINE)を試してみた(データ・ロード編)

Last updated at Posted at 2022-12-22

はじめに

Autonomous Databaseにデータ・パイプラインの機能が追加されました。
Autonomous Databaseのデータ・パイプラインには大きく分けて2つの機能があります。
・データ・ロード
 一定間隔で継続的にオブジェクト・ストア内のファイルを表にロード
・データ・エクスポート
 一定間隔で継続的に表の内容をオブジェクト・ストアにエクスポート

今回は、データ・パイプラインを用いた表へのデータ・ロードを検証してみました。

1. 事前準備

データ・パイプラインを用いたデータ・ロードには、オブジェクト・ストアのバケットにあるファイルと、そのデータのロード先となる表が必要です。

今回は、オブジェクト・ストレージのPipelineTestというバケットに以下のような内容のemployee1.csvをアップロードし、ロード先の表としてemployee表を用意しました。

employee1.csv
鈴木さん,38,1000000
田中さん,51,2500000

スクリーンショット 2022-12-22 19.22.36.png

SQL> CREATE TABLE EMPLOYEE(
  2  	 name	  VARCHAR2(128),
  3  	 age	  NUMBER,
  4  	 salary   NUMBER
  5  );

表が作成されました。

SQL> 

2. パイプラインの作成

準備が整ったので、早速パイプラインを作成します。
パイプラインの作成にはDBMS_CLOUD_PIPELINE.CREATE_PIPELINEプロシージャを使用します。
今回はMY_PIPELINEという名前のデータ・ロードのためのパイプラインを作成します。

SQL> BEGIN
  2  	  DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
  3  	      pipeline_name => 'MY_PIPELINE',
  4  	      pipeline_type => 'LOAD',
  5  	      description   => 'Load employee data from object store into a table'
  6  	  );
  7  END;
  8  /

PL/SQLプロシージャが正常に完了しました。

SQL> 

パイプラインの詳細設定にはDBMS_CLOUD_PIPELINE.SET_ATTRIBUTEプロシージャを使用します。
今回はMY_PIPELINEという名前のデータ・ロードのためのパイプラインを作成します。
各パラメータは以下のような値を設定します。

パラメータ
pipeline_name 設定するパイプライン名
credential_name クラウド・ストレージへのアクセスに使用するクレデンシャル
location クラウド・ストレージのURL
table_name データのロード先の表の名前
format ファイルのフォーマット
priority 処理の優先度(パラレル実行の並列度)
interval データ・ロードの実行間隔
SQL> BEGIN
  2  	  DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
  3  	      pipeline_name => 'MY_PIPELINE',
  4  	      attributes    => JSON_OBJECT(
  5  		  'credential_name' VALUE 'OCI$RESOURCE_PRINCIPAL',
  6  		  'location' VALUE 'https://objectstorage.ap-tokyo-1.oraclecloud.com/n/xxxxxxxxxxxx/b/PipelineTest/o/',
  7  		  'table_name' VALUE 'employee',
  8  		  'format' VALUE '{"type": "csv"}',
  9  		  'priority' VALUE 'MEDIUM',
 10  		  'interval' VALUE '15')
 11  	 );
 12  END;
 13  /

PL/SQLプロシージャが正常に完了しました。

SQL>

パイプラインMY_PIPELINEの詳細設定が完了しました。
パイプラインは作成して詳細設定をしただけでは動作しません。
そのため、employee表はまだ空です。

SQL> col name for a30
SQL> SELECT * FROM employee;

レコードが選択されませんでした。

SQL> 

3. パイプラインのテスト実行

クラウド・ストレージ内のファイルのファイル・フォーマット等が問題ないか確認するために、パイプラインのテスト実行を行うためのプロシージャDBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCEが用意されています。
pipeline_nameにテスト実行を行うパイプラインの名前を指定して、DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCEを実行します。

SQL> BEGIN
  2  	 DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
  3  	     pipeline_name => 'MY_PIPELINE'
  4  );
  5  END;
  6  /

PL/SQLプロシージャが正常に完了しました。

SQL> 

テスト実行が正常に完了したので、employee表のデータを確認してみます。

SQL> SELECT * FROM employee;

NAME				                  AGE     SALARY
------------------------------ ---------- ----------
鈴木さん                               38    1000000
田中さん                               51    2500000

2行が選択されました。

SQL> 

クラウド・ストレージ内のemployee1.csvの内容がemployee表にロードされていることが確認できました。

テスト実行ですので、DBMS_CLOUD_PIPELINE.RESET_PIPELINEプロシージャを使用して、一度パイプラインをリセットし、employee表にロードされたデータを削除します。

SQL> BEGIN
  2  	DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
  3  	    pipeline_name => 'MY_PIPELINE',
  4  	    purge_data => TRUE
  5  );
  6  END;
  7  /

PL/SQLプロシージャが正常に完了しました。

SQL> 

employee表のデータを確認してみます。

SQL> SELECT * FROM employee;

レコードが選択されませんでした。

SQL> 

employee表の内容が、パイプラインをテスト実行する前の状態に戻りました。

4. パイプラインの開始

テスト実行によって、データのフォーマットやオブジェクト・ストアへのアクセスに問題が無いことがわかったので、データ・ロードが継続的に行われるようにDBMS_CLOUD_PIPELINE.START_PIPELINEプロシージャを使用してパイプラインの実行を開始します。

SQL> set time on
10:21:50 SQL> BEGIN
10:21:50   2  	DBMS_CLOUD_PIPELINE.START_PIPELINE(
10:21:50   3  	    pipeline_name => 'MY_PIPELINE'
10:21:50   4  	);
10:21:50   5  END;
10:21:50   6  /

PL/SQLプロシージャが正常に完了しました。

10:21:50 SQL>

パイプラインが正常に開始されました。
employee表のデータを確認してみます。

10:21:57 SQL> SELECT * FROM employee;

NAME				                  AGE     SALARY
------------------------------ ---------- ----------
鈴木さん                               38    1000000
田中さん                               51    2500000

2行が選択されました。

10:21:58 SQL>

employee表にemployee1.csvのデータがロードされたことが確認できました。

パイプラインが正しく実行されるか確認するために、バケットにemployee2.csvとemployee3.csvをオブジェクト・ストレージにアップロードします。

employee2.csv
渡辺さん,22,1000000
松本さん,41,3000000
employee3.csv
高橋さん,28,1800000
山口さん,33,3300000

スクリーンショット 2022-12-22 19.23.23.png

15分間隔でパイプラインが動作するように設定したので、15分以上待ちます。

再度、employee表のデータを確認します。

10:38:51 SQL> SELECT * FROM employee;

NAME				             AGE     SALARY
------------------------------ ---------- ----------
鈴木さん                               38    1000000
田中さん                               51    2500000
渡辺さん                               22    1000000
松本さん                               41    3000000
高橋さん                               28    1800000
山口さん                               33    3300000

6行が選択されました。

10:38:51 SQL>

バケットに追加したemployee2.csvとemployee3.csvの内容がemployee表に追加されていることがわかりました。

5. パイプライン関連のディクショナリ

・USER_CLOUD_PIPELINES/DBA_CLOUD_PIPELINES
こちらのディクショナリ・ビューで、パイプラインの状態や各ファイルのロードの状態を格納している表の名前等を確認できます。

SQL> SELECT pipeline_name, pipeline_type, status, status_table
  2  FROM USER_CLOUD_PIPELINES
  3  WHERE pipeline_name = 'MY_PIPELINE';

PIPELINE_NAME	PIPELINE_TYPE	STATUS	   STATUS_TABLE
--------------- --------------- ---------- ------------------------------
MY_PIPELINE	    LOAD		    STARTED    PIPELINE$18$61_STATUS

1行が選択されました。

SQL>

status_table列の表(ここではPIPELINE$18$61_STATUS)から、オブジェクト・ストアから各ファイルをロードした時の状態(COMPLETED/FAILD)とFAILDの場合のエラーの情報が確認できます。

SQL> SELECT id, name, status, error_code, error_message
  2  FROM PIPELINE$18$61_STATUS;

	    ID NAME 			              STATUS     ERROR_CODE ERROR_MESSAGE
---------- ------------------------------ ---------- ---------- --------------------
	     1 employee1.csv		          COMPLETED
	     2 employee2.csv		          COMPLETED
	     3 employee3.csv		          COMPLETED

3行が選択されました。

SQL>

・USER_CLOUD_PIPELINE_HISTORY/DBA_CLOUD_PIPELINE_HISTORY
こちらのディクショナリ・ビューで、パイプラインの実行ログが確認できます。

SQL> SELECT start_date, pipeline_id, pipeline_name, status, error_message
  2  FROM user_cloud_pipeline_history
  3  WHERE pipeline_name = 'MY_PIPELINE'
  4  ORDER BY start_date;

START_DATE			                PIPELINE_ID PIPELINE_NAME	STATUS	   ERROR_MESSAGE
----------------------------------- ----------- --------------- ---------- --------------------
22-12-22 19:21:55.880970 +09:00 	         18 MY_PIPELINE	    SUCCEEDED
22-12-22 19:36:55.952198 +09:00 	         18 MY_PIPELINE	    SUCCEEDED
22-12-22 19:51:55.144443 +09:00 	         18 MY_PIPELINE	    SUCCEEDED
22-12-22 20:06:55.144830 +09:00 	         18 MY_PIPELINE	    SUCCEEDED
22-12-22 20:21:55.057670 +09:00 	         18 MY_PIPELINE	    SUCCEEDED
22-12-22 20:36:55.208550 +09:00 	         18 MY_PIPELINE	    SUCCEEDED

6行が選択されました。

SQL>

・USER_CLOUD_PIPELINE_ATTRIBUTES/USER_CLOUD_PIPELINE_ATTRIBUTES
こちらのディクショナリ・ビューで、パイプラインの設定内容が確認できます。

SQL> SELECT pipeline_name, attribute_name, attribute_value
  2  FROM user_cloud_pipeline_attributes
  3  WHERE pipeline_name = 'MY_PIPELINE';

PIPELINE_NAME	ATTRIBUTE_NAME	     ATTRIBUTE_VALUE
--------------- -------------------- --------------------------------------------------------------------------------
MY_PIPELINE	    credential_name      OCI$RESOURCE_PRINCIPAL
MY_PIPELINE	    format		         {"type": "csv"}
MY_PIPELINE	    interval	         15
MY_PIPELINE	    location	         https://objectstorage.ap-tokyo-1.oraclecloud.com/n/xxxxx/b/PipelineTest/o
MY_PIPELINE	    priority	         medium
MY_PIPELINE	    table_name	         employee

6行が選択されました。

SQL>

6. パイプラインの停止

パイプラインを停止するには、DBMS_CLOUD_PIPELINE.STOP_PIPELINEを使用します。

SQL> BEGIN
  2      DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
  3          pipeline_name => 'MY_PIPELINE'
  4      );
  5  END;
  6  /

PL/SQLプロシージャが正常に完了しました。

SQL> 

パイプラインの状態を確認します。

SQL> SELECT pipeline_name, pipeline_type, status, status_table
  2  FROM USER_CLOUD_PIPELINES
  3  WHERE pipeline_name = 'MY_PIPELINE';

PIPELINE_NAME	PIPELINE_TYPE	STATUS	   STATUS_TABLE
--------------- --------------- ---------- ------------------------------
MY_PIPELINE	    LOAD		    STOPPED    PIPELINE$18$61_STATUS

1行が選択されました。

SQL>

パイプラインが停止状態(STOPPED)になりました。

7. パイプラインの削除

パイプラインを削除するには、DBMS_CLOUD_PIPELINE.DROP_PIPELINEを使用します。

SQL> BEGIN
  2      DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
  3          pipeline_name => 'MY_PIPELINE'
  4      );
  5  END;
  6  /

PL/SQLプロシージャが正常に完了しました。

SQL> 

パイプラインが削除されたか確認します。

SQL> SELECT pipeline_name, pipeline_type, status, status_table
  2  FROM USER_CLOUD_PIPELINES
  3  WHERE pipeline_name = 'MY_PIPELINE';

レコードが選択されませんでした。

SQL> 

パイプラインが削除されたことが確認できました。

まとめ

Autonomous Databaseのデータ・パイプライン機能(DBMS_CLOUD_PIPELINEパッケージ)を使用して、オブジェクト・ストア内のファイルを継続的に差分データ・ロードできることが確認できました。

参考資料

About Data Pipelines on Autonomous Database

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0