0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OCI Data Flowを使ってオブジェクト・ストレージからADBへデータをロードする -- Part 1

Last updated at Posted at 2023-04-27

本ブログは、オラクル・クラウドの個人シリーズ・ブログの1つです。

初めに

Oracle Cloud Infrastructure(OCI)Data Flowは、Apache Spark ™アプリケーションを実行するためのフルマネージド・サービスです。特に大規模なデータの処理に向きます。

OCIオブジェクト・ストレージ上のファイルを読み込み、Autnomous DBへのデータロード処理を例にして、OCI Data Flowでの実現方法を三回を分けて紹介したいと思います。

  • Part 1 (本文): Apache Sparkを使って、ローカル環境でPythonプログラムを実行させる方法。
  • Part 2: アプリケーションをOCIへデプロイし、実行させる方法。
  • Part 3: プライベート・エンドポイントを経由し、ADBへデータをロードする方法。

検証環境
VM: Oracle Linux 8, VM.Standard.E4.Flex (1 OCPU, 16GB)
ADB タイプ: Autonomous Data Warehouse (パブリック・アクセス)
Java バージョン: 11
Python バージョン: 3.8
Apache Spark バージョン: 3.3.2

ステップ

1. OCI側の準備

バケットとCSVファイル

バケット(一個)を用意して、読込対象のCSVファイルをその中に保存します。この例では、次のようなファイルを利用します(ヘッダー付き、郵便番号と住所を記載)。

zipcode,address1,address2,address3
0600000,北海道,札幌市 中央区,以下に掲載がない場合
0640941,北海道,札幌市 中央区,旭ケ丘
0600041,北海道,札幌市 中央区,大通東
0600042,北海道,札幌市 中央区,大通西(1~19丁目)
0640820,北海道,札幌市 中央区,大通西(20~28丁目)
...中略...

ADBインスタンスと専用ユーザー

ADBインスタンス(一個)を新規作成します。この例では、Autonomous Data Warehouse (ADW)を利用します。

ロード処理の専用ユーザー(例:DFUSER01)を作成しておきます(ロールDWROLEを付与)。

パスワードをOCI Vaultのシークレットに保存します。保存後、シークレットのOCIDをメモしてください(例: ocid1.vaultsecret.oc1.ap-tokyo-1.<secret_ocid>)。

ADB Walletファイル

OCIコンソールから、Walletファイルをダウンロードします。圧縮ファイルをそのままバケットに保存してください(解凍不要)。

2. ローカル環境のセットアップ

Java 11 のインストール

コマンド:sudo dnf install java-11-openjdk
バージョンチェック:java -version

[opc@linux8-java11-data-flow ~]$ sudo dnf install java-11-openjdk
...
Complete!
[opc@linux8-java11-data-flow ~]$ java -version
openjdk version "11.0.18" 2023-01-17 LTS
OpenJDK Runtime Environment (Red_Hat-11.0.18.0.10-2.el8_7) (build 11.0.18+10-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-11.0.18.0.10-2.el8_7) (build 11.0.18+10-LTS, mixed mode, sharing)
[opc@linux8-java11-data-flow ~]$

Python のインストール

インストール:sudo dnf module install python38
バージョンチェック:python3.8 --version
実行環境をPython3.8に指定:sudo alternatives --set python3 /usr/bin/python3.8

Spark のインストール

ダウンロードURL:https://spark.apache.org/downloads.html
image.png
ファイル:spark-3.3.2-bin-hadoop3.tgz
ローカルの展開先:/home/opc/spark/ (事前に作成しておく)
コマンド例:tar xvf spark-3.3.2-bin-hadoop3.tgz -C /home/opc/spark

PATH変数の編集

export Spark_Home=/home/opc/spark/spark-3.3.2-bin-hadoop3
export PATH=${Spark_Home}/bin:$PATH

Spark Shell の起動
spark-shellで正常に起動できるのかを確認します。

[opc@linux8-java11-data-flow ~]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/24 11:28:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:4040
Spark context available as 'sc' (master = local[*], app id = local-1682335738728).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.18)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Quit コマンド: CTRL + D

JDBCドライバーのダウンロード

ダウンロードURL:https://www.oracle.com/database/technologies/appdev/jdbc-downloads.html
image.png
ファイル:ojdbc11-full.tar.gz
ローカルの展開先:/home/opc/download/ojdbc/ (事前に作成しておく)

OCI HDFS Connector のダウンロード

ダウンロードURL:https://github.com/oracle/oci-hdfs-connector/releases
ファイル名:oci-hdfs.zip
ローカルの展開先:/home/opc/download/oci-hdfs

展開後、下記ファイルをsparkのjarsの下に保存してださい。
保存先:/home/opc/spark/spark-3.3.2-bin-hadoop3/jars

  • oci-hdfs/lib/oci-hdfs-full-3.3.4.0.1.1.jar
  • oci-hdfs/third-party/lib/bcpkix-jdk15on-1.70.jar
  • oci-hdfs/third-party/lib/bcprov-jdk15on-1.70.jar

Python SDK のインストール

コマンド:sudo pip3 install -U oci

OCI構成ファイルの準備

オブジェクト・ストレージにアクセスするため、認証情報を記載するOCI構成ファイルAPIキーファイル(PEM)は必要です。

ファイル作成方法の詳細は、次の記事をご参照ください。
「OCIカスタム・メトリックでディスク使用率を監視」の「STEP1. 事前準備」

ファイルの保存先:/home/opc/.oci/config
(構成ファイルとPEMキーファイルの保存先を別々にしてもOKです。)

3. Pythonプログラムの作成

Githubに、Python、JavaとScalaの3種類のサンプルがあります。今回は、Pythonのサンプルコードを例にして検証します。

ソースの格納先:/home/opc/python/loadadw.py
編集箇所は、以下となります。

    # TODO: Set all these variables.
    INPUT_PATH = "oci://<bucket>@<object_storage_namespace>/<filename>.csv"
    PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.ap-tokyo-1.<secret_ocid>"
    TARGET_TABLE = "ZIPCODEJP"
    TNSNAME = "adwdf_high"
    USER = "DFUSER01"
    WALLET_PATH = "oci://<bucket>@<object_storage_namespace>/<wallet_file>.zip"

オブジェクト・ストレージ・ネームスペースは、OCIコンソールから確認できますが、OCI CLIのoci os ns getでも簡単に取得できます。(テナント名ではないので、ご注意を)

処理流れ
CSVファイルの読込 → ADB Walletファイルの取得と解凍 → シークレットからパスワードの取得 → ADBへの書込み

4. ローカル環境での実行

ターゲット・テーブルを予め作成しなくてもよいです。実行後自動に作成されます。列名は、CSVファイルのヘッダーと同様です。

実行コマンド:
spark-submit --jars /home/opc/download/ojdbc/ojdbc11.jar /home/opc/python/loadadw.py

出力ログの抜粋:

[opc@linux8-java11-data-flow ]$ spark-submit --jars /home/opc/download/ojdbc/ojdbc11.jar /home/opc/python/loadadw.py
23/04/24 12:46:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/24 12:46:17 INFO SparkContext: Running Spark version 3.3.2
23/04/24 12:46:17 INFO ResourceUtils: ==============================================================
23/04/24 12:46:17 INFO ResourceUtils: No custom resources configured for spark.driver.
23/04/24 12:46:17 INFO ResourceUtils: ==============================================================
23/04/24 12:46:17 INFO SparkContext: Submitted application: DataFlow
...中略...
Getting wallet password
Done getting wallet password
Saving processed data to jdbc:oracle:thin:@adwdf_high?TNS_ADMIN=/tmp
23/04/24 12:46:29 INFO BlockManagerInfo: Removed broadcast_0_piece0 on linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:46195 in memory (size: 34.7 KiB, free: 434.4 MiB)
23/04/24 12:46:29 INFO BlockManagerInfo: Removed broadcast_2_piece0 on linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:46195 in memory (size: 34.7 KiB, free: 434.4 MiB)
23/04/24 12:46:29 INFO BlockManagerInfo: Removed broadcast_1_piece0 on linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:46195 in memory (size: 5.9 KiB, free: 434.4 MiB)
23/04/24 12:46:29 INFO FileSourceStrategy: Pushed Filters:
23/04/24 12:46:29 INFO FileSourceStrategy: Post-Scan Filters:
23/04/24 12:46:29 INFO FileSourceStrategy: Output Data Schema: struct<zipcode: string, address1: string, address2: string, address3: string ... 2 more fields>
...中略...
23/04/24 12:46:32 INFO DAGScheduler: ResultStage 1 (jdbc at NativeMethodAccessorImpl.java:0) finished in 3.120 s
23/04/24 12:46:32 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/24 12:46:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
23/04/24 12:46:32 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/04/24 12:46:32 INFO DAGScheduler: Job 1 finished: jdbc at NativeMethodAccessorImpl.java:0, took 3.130925 s
Done saving processed data to database
...中略...
23/04/24 12:46:33 INFO SparkContext: Successfully stopped SparkContext23/04/24 12:46:33 INFO ShutdownHookManager: Shutdown hook called
23/04/24 12:46:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-eb0988f9-4df7-4154-aa7d-d7fdce1ec33e/pyspark-8995996a-7003-41f4-92ce-d087fa9351ec
23/04/24 12:46:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-89bfb53f-4572-4d7d-be52-eee52e797b9c
23/04/24 12:46:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-eb0988f9-4df7-4154-aa7d-d7fdce1ec33e
[opc@linux8-java11-data-flow ]$

全体の処理時間は、およそ18秒で、データ(12万件以上)の書込み時間は、4秒未満です。

テーブルの中身を確認
image.png

以上です。


関連記事
オラクル・クラウドの個人シリーズ・ブログ
OCI Data Flowを使ってオブジェクト・ストレージからADBへデータをロードする -- Part 2
OCI Data Flowを使ってオブジェクト・ストレージからADBへデータをロードする -- Part 3


OCI データ・フロー・ドキュメント
製品サイト
オフィシャル・ドキュメント
データ・フロー・チュートリアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?