1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[Autonomous Database] dbt からデータ変換処理を行う

Posted at

はじめに

Autonomous Databaseに対して、dbtが利用できるようになっていたので、試してみました。
現在はCLIインタフェースを提供するdbt-coreのみで利用可能です。(dbt Cloudは未サポート)
dbtの一般的なメリットである、SQLの拡張やテーブルの依存関係の解釈の他、Pythonモデルのサポートによる、DataFrameでの変換処理の記述が可能になっています。

接続手順

dbtのドキュメントに沿って行っていきます。

  1. dbt-oracleのインストール

    pip install dbt-oracle
    

    こちらでdbt-coreもインストールされます。

  2. dbt-oracleの設定
    Python接続ドライバの設定を行います。python-oracledbのThinモードが推奨されています。古いcx_oracleもサポートされています。環境変数として設定しておきます。

    export ORA_PYTHON_DRIVER_TYPE=thin
    
  3. ADBへの接続情報の設定
    profile.ymlという設定ファイルにADBの接続情報を記述していきます。デフォルトだと、~/.dbt下にprofile.ymlがある前提になっています。
    必須情報となるDBT_ORACLE_USERDBT_ORACLE_PASSWORDDBT_ORACLE_SCHEMAを環境変数として設定しておきます。

    export DBT_ORACLE_USER=<username>
    export DBT_ORACLE_PASSWORD=***
    export DBT_ORACLE_SCHEMA=<username>
    

    また、ADBにはウォレットを使用したmTLSと、使用しないTLSの2つの接続方式があります。今回はTLSで接続し、接続情報としては接続文字列を使用したいと思います。

    export DBT_ORACLE_CONNECT_STRING="(description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.ap-tokyo-1.oraclecloud.com))(connect_data=(service_name=xxxx_adw_low.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))"
    

    接続文字列はOCIコンソールの[データベース接続]のTLS認証の項目からコピーします。
    image.png

    profile.yml
    adb_test:
    target: "{{ env_var('DBT_TARGET', 'dev') }}"
    outputs:
      dev:
         type: oracle
         user: "{{ env_var('DBT_ORACLE_USER') }}"
         pass: "{{ env_var('DBT_ORACLE_PASSWORD') }}"
         schema: "{{ env_var('DBT_ORACLE_SCHEMA') }}"
         connection_string: "{{ env_var('DBT_ORACLE_CONNECT_STRING') }}"
    
  4. dbtプロジェクトの作成
    dbtモデルの作成は、プロジェクトの中で行います。dbt initコマンドでadb_testというプロジェクトを作成します。このまま実行すると、先ほど作成したprofile.ymlを再度セットアップするウィザードが走ってしまうので、-sというオプションを付けます。

    dbt init adb_test -s
    

    実行すると、以下のようなadb_testというディレクトリが作成されます。

    [opc@devinsrd-889591 adb_test]$ ls -al
    total 12
    drwxrwxr-x. 9 opc opc  163 Nov  1 02:33 .
    drwxrwxr-x. 5 opc opc   83 Nov  1 02:32 ..
    drwxrwxr-x. 2 opc opc   22 Nov  1 01:54 analyses
    -rw-rw-r--. 1 opc opc 1256 Nov  1 02:32 dbt_project.yml
    -rw-rw-r--. 1 opc opc   29 Nov  1 01:54 .gitignore
    drwxrwxr-x. 2 opc opc   21 Nov  1 02:33 logs
    drwxrwxr-x. 2 opc opc   22 Nov  1 01:54 macros
    drwxrwxr-x. 3 opc opc   21 Nov  1 01:54 models
    -rw-rw-r--. 1 opc opc  571 Nov  1 01:54 README.md
    drwxrwxr-x. 2 opc opc   22 Nov  1 01:54 seeds
    drwxrwxr-x. 2 opc opc   22 Nov  1 01:54 snapshots
    drwxrwxr-x. 2 opc opc   22 Nov  1 01:54 tests
    

    各プロジェクトの設定ファイルであるdbt_project.ymlが作成されています。

    dbt_project.yml
    # Name your project! Project names should contain only lowercase characters
    # and underscores. A good package name should reflect your organization's
    # name or the intended use of these models
    name: 'adb_test'
    version: '1.0.0'
    config-version: 2
    
    # This setting configures which "profile" dbt uses for this project.
    profile: 'adb_test'
    
    # These configurations specify where dbt should look for different types of files.
    # The `model-paths` config, for example, states that models in this project can be
    # found in the "models/" directory. You probably won't need to change these!
    model-paths: ["models"]
    analysis-paths: ["analyses"]
    test-paths: ["tests"]
    seed-paths: ["seeds"]
    macro-paths: ["macros"]
    snapshot-paths: ["snapshots"]
    
    clean-targets:         # directories to be removed by `dbt clean`
      - "target"
      - "dbt_packages"
    
    
    # Configuring models
    # Full documentation: https://docs.getdbt.com/docs/configuring-models
    
    # In this example config, we tell dbt to build all models in the example/
    # directory as views. These settings can be overridden in the individual model
    # files using the `{{ config(...) }}` macro.
    models:
      adb_test:
        # Config indicated by + and applies to all files under models/example/
        example:
          +materialized: view
    
  5. 疎通確認
    各プロジェクトに移動し、dbtコマンドを実行できます。まずはADBに接続できるかdbt debugコマンドで確認します。

    [opc@devinsrd-889591 adb_test]$ dbt debug
    02:37:22  Running with dbt=1.6.6
    02:37:22  dbt version: 1.6.6
    02:37:22  python version: 3.9.16
    02:37:22  python path: /usr/bin/python3
    02:37:22  os info: Linux-5.4.17-2136.307.3.1.el8uek.x86_64-x86_64-with-glibc2.28
    02:37:22  oracle adapter: Running in thin mode
    02:37:22  Using profiles dir at /home/opc/.dbt
    02:37:22  Using profiles.yml file at /home/opc/.dbt/profiles.yml
    02:37:22  Using dbt_project.yml file at /home/opc/.dbt/adb_test/dbt_project.yml
    02:37:22  adapter type: oracle
    02:37:22  adapter version: 1.6.0
    02:37:22  Configuration:
    02:37:22    profiles.yml file [OK found and valid]
    02:37:22    dbt_project.yml file [OK found and valid]
    02:37:22  Required dependencies:
    02:37:22   - git [OK found]
    
    02:37:22  Connection:
    02:37:22    user: OMLUSER
    02:37:22    database: xxxx_ADW
    02:37:22    schema: OMLUSER
    02:37:22    protocol: None
    02:37:22    host: None
    02:37:22    port: None
    02:37:22    tns_name: None
    02:37:22    service: None
    02:37:22    connection_string: (description= (retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1521)(host=adb.ap-tokyo-1.oraclecloud.com))(connect_data=(service_name=xxxx_adw_low.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))
    02:37:22    shardingkey: []
    02:37:22    supershardingkey: []
    02:37:22    cclass: None
    02:37:22    purity: None
    02:37:22    retry_count: 1
    02:37:22    retry_delay: 3
    02:37:22    oml_cloud_service_url: None
    02:37:22  Registered adapter: oracle=1.6.0
    02:37:23    Connection test: [OK connection ok]
    
    02:37:23  All checks passed!
    

プロジェクトの実行

実際にADBにデータモデルを作成します。~/.dbt/adb_test/models/exampleにサンプルのモデルがあるので、それが実行されます。ただし、そのまま実行するとエラーになりました。ここで実際に実行されているのは、以下のようなSQLファイルです。

my_first_dbt_model.sql(デフォルト)
{{ config(materialized='table') }}

with source_data as (
    select 1 as id
    union all
    select null as id
)
select *
from source_data

from句がありません。Oracleではfrom句が必要なので(23cより不要)、以下のように修正します。

my_first_dbt_model.sql(修正後)
{{ config(materialized='table') }}

with source_data as (
    select 1 as id from dual
    union all
    select null as id from dual
)
select *
from source_data

ADB側にデータが作成されているか確認します。OMLUSERスキーマにMY_FIRST_DBT_MODELという表が作成されています。
{{ config(materialized='table') }}で生成するデータモデルの種類を表にしているためです。
image.png
また、MY_SECOND_DBT_MODELというビューも作成されています。こちらはdbt_project.yml内に+materialized: viewと設定されているため、ビューになります。
image.png

Pythonモデルの実行

SQLの他に、ADBに対してPythonモデルも実行できます。(OML4PYを使用)
これにより、ADBのデータに対するユーザー定義のPython関数の実行や、表、ビュー、アドホックSQLクエリをDataFrameとして読み込み、書き込みが可能です。

実行ユーザーはADMIN以外のユーザーで、OML_DEVELOPERロールが必要なので、あらかじめ付与しておきます。
今回は、新たにpy_dbt_testというプロジェクトを作成し、その中でPythonモデルを実行してみます。Pythonモデルを使うには、接続情報にoml_cloud_service_urlを追記します。

profiles.yml
py_dbt_test:
   target: dev
   outputs:
      dev:
         type: oracle
         user: "{{ env_var('DBT_ORACLE_USER') }}"
         pass: "{{ env_var('DBT_ORACLE_PASSWORD') }}"
         schema: "{{ env_var('DBT_ORACLE_SCHEMA') }}"
         connection_string: "{{ env_var('DBT_ORACLE_CONNECT_STRING') }}"
         oml_cloud_service_url: "https://xxxx-adw.adb.ap-tokyo-1.oraclecloudapps.com"

oml_cloud_service_urlは、Database Actions内のOracle Machine Learning RESTfulサービスから確認できます。
以下のPythonモデルを作成します。

my_first_python_model.py
def model(dbt, session):
    dbt.config(materialized="table")
    dbt.config(async_flag=True)
    dbt.config(timeout=1800)

    sql = f"""SELECT customer.cust_first_name,
       customer.cust_last_name,
       customer.cust_gender,
       customer.cust_marital_status,
       customer.cust_street_address,
       customer.cust_email,
       customer.cust_credit_limit,
       customer.cust_income_level
    FROM sh.customers customer, sh.countries country
    WHERE country.country_iso_code = ''US''
    AND customer.country_id = country.country_id"""

    # session.sync(query) will run the sql query and returns a oml.core.DataFrame
    us_potential_customers = session.sync(query=sql)

    # Compute an ad-hoc anomaly score on the credit limit
    median_credit_limit = us_potential_customers["CUST_CREDIT_LIMIT"].median()
    mean_credit_limit = us_potential_customers["CUST_CREDIT_LIMIT"].mean()
    anomaly_score = (us_potential_customers["CUST_CREDIT_LIMIT"] - median_credit_limit)/(median_credit_limit - mean_credit_limit)

    # Add a new column "CUST_CREDIT_ANOMALY_SCORE"
    us_potential_customers = us_potential_customers.concat({"CUST_CREDIT_ANOMALY_SCORE": anomaly_score.round(3)})

    # Return potential customers dataset as a oml.core.DataFrame
    return us_potential_customers

ADBにSQLを実行し、その結果をDataFrameとして、異常値列を追加する処理を行い、新たな表として返します。
正しくADB側に表が作成されていることが確認できました。
image.png

おわりに

今回は簡単にdbtでできる変換処理をSQLとPythonで確認しました。SQLでデータ変換を表現できない、もしくは膨大なコード量になる場合は、Pythonの最新のパッケージを使って実行できるのはかなり強力だと思います。
dbtで生のデータからtrainデータとtestデータを作成する変換処理を実行し、そのデータを使って、Oracle Machine Learningでデータベース内機械学習モデルを作成するパイプラインなども面白そうです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?