LoginSignup
0

[Oracle Cloud] Autonomous Databaseのデータ・パイプライン機能(DBMS_CLOUD_PIPELINE)を試してみた(データ・エクスポート編) (2023/01/08)

Last updated at Posted at 2023-01-08

はじめに

Autonomous Databaseにデータ・パイプラインの機能が追加されました。
Autonomous Databaseのデータ・パイプラインには大きく分けて2つの機能があります。

  • データ・ロード:一定間隔で継続的にオブジェクト・ストア内のファイルを表にロード
  • データ・エクスポート:一定間隔で継続的に表の内容をオブジェクト・ストアにエクスポート

今回は、データ・パイプラインを用いた表へのデータ・エクスポートを検証してみました。

データ・ロードについては @500InternalServerError さんの [OCI]Autonomous Databaseのデータ・パイプライン機能(DBMS_CLOUD_PIPELINE)を試してみた(データ・ロード編)にあります。

エクスポートデータ・パイプラインはデータベースのテーブルやクエリーの結果から、定期的に新しいデータをオブジェクトストアにエクスポートするためのデータ・パイプラインです。エクスポートパイプラインの使用例としては、以下のようなものがあります。

  • アプリケーションで生成された新しい時系列データを、定期的にデータベースからオブジェクトストアにエクスポート。

エクスポートパイプラインには3つのオプションがあります。

  • 新しいデータを追跡するために、日付またはタイムスタンプ列をキーとして、表の増分データをオブジェクトストアにエクスポートする。
  • 新しいデータを追跡するために、日付またはタイムスタンプ列をキーとして、クエリ結果の増分データをオブジェクトストアにエクスポートする。
  • 日付またはタイムスタンプ列を参照せずにデータを選択するクエリを使用してまたは表全体のデータをオブジェクトストアにエクスポートする(パイプラインは、スケジューラの実行ごとにクエリが選択したすべてのデータをエクスポートするようにする)。

事前準備

データ・パイプラインの作成

パイプラインの作成にはDBMS_CLOUD_PIPELINE.CREATE_PIPELINEプロシージャを使用します。
今回はMY_PIPELINE1という名前のデータ・エクスポートのためのパイプラインを作成します。

BEGIN
     DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
        pipeline_name => 'MY_PIPELINE1',
        pipeline_type => 'EXPORT',
        description   => 'Export order data from table into object store file'
  );
END;
/

PL/SQLプロシージャが正常に完了しました。

データ・パイプラインの属性を設定

DBMS_CLOUD_PIPELINE.SET_ATTRIBUTEプロシージャを使用して、パイプラインの属性(詳細)設定します。

例えば、エクスポート対象を表またはクエリ結果を指定するパラメータ

  • table_nameパラメータを指定すると、表の行がオブジェクトストアにエクスポートされます。
  • queryパラメータを指定すると、クエリはSELECT文を指定し、必要なデータのみがオブジェクトストアにエクスポートされるようにします。
    (table_nameパラメータまたはqueryパラメータのいずれかが必要です)
BEGIN
     DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
       pipeline_name => 'MY_PIPELINE1',
       attributes    => JSON_OBJECT('credential_name' VALUE 'DEF_CRED_NAME',
          'location' VALUE 'https://objectstorage.ap-tokyo-1.oraclecloud.com/n/<namespace>/b/<bucket名>/o/',
          'table_name' VALUE '<table名>',
          'format' VALUE '{"type": "csv"}',
          'priority' VALUE 'LOW',
          'interval' VALUE '2')
  );
END;
/

PL/SQLプロシージャが正常に完了しました。
  • location:エクスポート先オブジェクトストアの場所を指定します。

  • table_name:エクスポートするデータを含むデータベース内の表を指定します。

  • format:エクスポートするデータの形式を記述します。(csv,json,xml)

  • priority:データベースからデータを取得する際の並列度を決定します。

    • HIGH:データベースのOCPU数を使用して処理される並列ファイル数を決定します。
    • MEDIUM:Mediumサービスの同時処理数制限を使用して、同時処理数を決定します。デフォルト値は4です。
    • LOW:パイプライン・ジョブをシリアル実行します。
  • interval:パイプライン・ジョブの連続した実行間の時間間隔を分単位で指定します。デフォルトの間隔は15分です。

  • key_column:新しいデータをエクスポートする場合は、識別できる日付またはタイムスタンプの列をに指定します。
    最後の実行タイムスタンプまたは日付は、エクスポート・パイプラインによって追跡され、key_columnの値と比較されて、オブジェクトストアにエクスポートする新しいデータを識別します。

BEGIN
     DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
       pipeline_name => 'MY_PIPELINE1',
       attributes    => JSON_OBJECT('credential_name' VALUE 'DEF_CRED_NAME',
          'location' VALUE 'https://objectstorage.ap-tokyo-1.oraclecloud.com/n/<namespace>/b/<bucket名>/o/',
          'table_name' VALUE '<table名>',
          'key_column' VALUE '<timestamp列名>',
          'format' VALUE '{"type": "csv"}',
          'priority' VALUE 'LOW',
          'interval' VALUE '2')
  );
END;
/

PL/SQLプロシージャが正常に完了しました。

データ・パイプラインのテスト

データ・パイプラインのテストとしてDBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE プロシージャを実行して、パイプラインを一度だけ、オンデマンドで実行させます。 これは、繰り返されるスケジュールジョブを作成するものではありません。

BEGIN
DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE( 
        pipeline_name => 'MY_PIPELINE1'
);
END;
/

PL/SQLプロシージャが正常に完了しました。

Object Storageにファイルが出力されていることを確認します。
image.png

user_cloud_pipeline_historyビューや、user_cloud_pipelinesビューで、パイプラインの実行中ジョブの監視やトラブルシューティングを行うことが可能です。

select * from user_cloud_pipelines;

PIPELINE_ID PIPELINE_NAME            PIPELINE_TYPE STATUS  DESCRIPTION                                         CREATED                  LAST_MODIFIED            LAST_EXECUTION           OPERATION_ID STATUS_TABLE ORACLE_MAINTAINED 
----------- ------------------------ ------------- ------- --------------------------------------------------- ------------------------ ------------------------ ------------------------ ------------ ------------ ----------------- 
          3 MY_PIPELINE1             EXPORT        STOPPED Export order data from table into object store file 2023-01-08T08:09:13.151Z 2023-01-08T08:09:13.151Z 2023-01-08T08:13:37.112Z                           N                 

(オプション) パイプラインのリセット

パイプラインを開始する前に、DBMS_CLOUD_PIPELINE.RESET_PIPELINEプロシージャを使用して、パイプラインの状態をリセットすることができます。また、以下のように、PURGE_DATAオプションをTRUEに設定することでエクスポートしたオブジェクトストアのデータをパージすることもできます。

BEGIN  
DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
     pipeline_name => 'MY_PIPELINE1',
     purge_data => TRUE
);
END;
/

PL/SQLプロシージャが正常に完了しました。

データ・パイプラインを開始

DBMS_CLOUD_PIPELINE.START_PIPELINEプロシージャを使用して、パイプラインを起動します。
パイプラインが起動すると、エクスポートデータ・パイプラインとして、データがファイルとしてObject Storageバケットに出力されます。

BEGIN
      DBMS_CLOUD_PIPELINE.START_PIPELINE(
      pipeline_name => 'MY_PIPELINE1'
  );
END;
/

PL/SQLプロシージャが正常に完了しました。

毎回すべてのデータをエクスポートした結果

image.png

毎実行エクスポートされています。

増分データをエクスポート

image.png

増分データがない時間はエクスポートファイルが出力されていません。

データ・パイプラインのジョブはDatabase Actionsからも確認可能

image.png

パイプライン関連のディクショナリ

パイプラインの状態の確認

SELECT pipeline_name, pipeline_type, status, status_table
FROM USER_CLOUD_PIPELINES
WHERE pipeline_name = 'MY_PIPELINE1';

PIPELINE_NAME PIPELINE_TYPE STATUS  STATUS_TABLE 
------------- ------------- ------- ------------ 
MY_PIPELINE1  EXPORT        STARTED              


1行が選択されました。

パイプラインの実行ログの確認

SELECT start_date, pipeline_id, pipeline_name, status, error_message
FROM user_cloud_pipeline_history
WHERE pipeline_name = 'MY_PIPELINE1'
ORDER BY start_date;


START_DATE               PIPELINE_ID PIPELINE_NAME STATUS    ERROR_MESSAGE 
------------------------ ----------- ------------- --------- ------------- 
2023-01-08T08:45:12.435Z           5 MY_PIPELINE1  SUCCEEDED               
2023-01-08T08:46:12.619Z           5 MY_PIPELINE1  SUCCEEDED               
2023-01-08T08:47:12.877Z           5 MY_PIPELINE1  SUCCEEDED               
2023-01-08T08:48:12.970Z           5 MY_PIPELINE1  SUCCEEDED               
2023-01-08T08:49:13.122Z           5 MY_PIPELINE1  SUCCEEDED      

パイプラインの設定内容の確認

SELECT pipeline_name, attribute_name, attribute_value FROM user_cloud_pipeline_attributes
     WHERE pipeline_name = 'MY_PIPELINE1';

PIPELINE_NAME ATTRIBUTE_NAME  ATTRIBUTE_VALUE                                                               
------------- --------------- ----------------------------------------------------------------------------- 
MY_PIPELINE1  credential_name DEF_CRED_NAME                                                                 
MY_PIPELINE1  format          {"type": "csv"}                                                               
MY_PIPELINE1  interval        1                                                                             
MY_PIPELINE1  key_column      metrics_time                                                                  
MY_PIPELINE1  location        https://objectstorage.ap-tokyo-1.oraclecloud.com/n/<namespace>/b/<bucket名>/o/ 
MY_PIPELINE1  priority        low                                                                           
MY_PIPELINE1  table_name      <table名>

7行が選択されました。

パイプラインの停止

DBMS_CLOUD_PIPELINE.STOP_PIPELINEプロシージャを使用して、パイプラインを停止できます。

BEGIN
  DBMS_CLOUD_PIPELINE.STOP_PIPELINE(
      pipeline_name => 'MY_PIPELINE1'
  );
END;
/

PL/SQLプロシージャが正常に完了しました。

SELECT pipeline_name, status from USER_CLOUD_PIPELINES
   WHERE pipeline_name = 'MY_PIPELINE1';

PIPELINE_NAME STATUS  
------------- ------- 
MY_PIPELINE1  STOPPED 

1行が選択されました。

パイプラインの削除

パイプラインを削除するには、DBMS_CLOUD_PIPELINE.DROP_PIPELINEを使用します。

BEGIN
  DBMS_CLOUD_PIPELINE.DROP_PIPELINE(
      pipeline_name => 'MY_PIPELINE1'
  );
END;
/

PL/SQLプロシージャが正常に完了しました。


SELECT pipeline_name, status from USER_CLOUD_PIPELINES
   WHERE pipeline_name = 'MY_PIPELINE1';

行が選択されていません0行が選択されました。

おわりに

Autonomous Databaseのデータ・パイプライン機能(DBMS_CLOUD_PIPELINEパッケージ)を使用して、継続的に表データの差分をObject Storageのファイルとしてをエクスポートできることが確認できました。

参考情報

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
0