本ブログは、オラクル・クラウドの個人シリーズ・ブログの1つです。
初めに
Oracle Cloud Infrastructure(OCI)Data Flowは、Apache Spark ™アプリケーションを実行するためのフルマネージド・サービスです。特に大規模なデータの処理に向きます。
OCIオブジェクト・ストレージ上のファイルを読み込み、Autnomous DBへのデータロード処理を例にして、OCI Data Flowでの実現方法を三回を分けて紹介したいと思います。
- Part 1 (本文): Apache Sparkを使って、ローカル環境でPythonプログラムを実行させる方法。
- Part 2: アプリケーションをOCIへデプロイし、実行させる方法。
- Part 3: プライベート・エンドポイントを経由し、ADBへデータをロードする方法。
検証環境
VM: Oracle Linux 8, VM.Standard.E4.Flex (1 OCPU, 16GB)
ADB タイプ: Autonomous Data Warehouse (パブリック・アクセス)
Java バージョン: 11
Python バージョン: 3.8
Apache Spark バージョン: 3.3.2
ステップ
1. OCI側の準備
バケットとCSVファイル
バケット(一個)を用意して、読込対象のCSVファイルをその中に保存します。この例では、次のようなファイルを利用します(ヘッダー付き、郵便番号と住所を記載)。
zipcode,address1,address2,address3
0600000,北海道,札幌市 中央区,以下に掲載がない場合
0640941,北海道,札幌市 中央区,旭ケ丘
0600041,北海道,札幌市 中央区,大通東
0600042,北海道,札幌市 中央区,大通西(1~19丁目)
0640820,北海道,札幌市 中央区,大通西(20~28丁目)
...中略...
ADBインスタンスと専用ユーザー
ADBインスタンス(一個)を新規作成します。この例では、Autonomous Data Warehouse (ADW)を利用します。
ロード処理の専用ユーザー(例:DFUSER01
)を作成しておきます(ロールDWROLE
を付与)。
パスワードをOCI Vaultのシークレットに保存します。保存後、シークレットのOCIDをメモしてください(例: ocid1.vaultsecret.oc1.ap-tokyo-1.<secret_ocid>
)。
ADB Walletファイル
OCIコンソールから、Walletファイルをダウンロードします。圧縮ファイルをそのままバケットに保存してください(解凍不要)。
2. ローカル環境のセットアップ
Java 11 のインストール
コマンド:sudo dnf install java-11-openjdk
バージョンチェック:java -version
[opc@linux8-java11-data-flow ~]$ sudo dnf install java-11-openjdk
...
Complete!
[opc@linux8-java11-data-flow ~]$ java -version
openjdk version "11.0.18" 2023-01-17 LTS
OpenJDK Runtime Environment (Red_Hat-11.0.18.0.10-2.el8_7) (build 11.0.18+10-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-11.0.18.0.10-2.el8_7) (build 11.0.18+10-LTS, mixed mode, sharing)
[opc@linux8-java11-data-flow ~]$
Python のインストール
インストール:sudo dnf module install python38
バージョンチェック:python3.8 --version
実行環境をPython3.8に指定:sudo alternatives --set python3 /usr/bin/python3.8
Spark のインストール
ダウンロードURL:https://spark.apache.org/downloads.html
ファイル:spark-3.3.2-bin-hadoop3.tgz
ローカルの展開先:/home/opc/spark/
(事前に作成しておく)
コマンド例:tar xvf spark-3.3.2-bin-hadoop3.tgz -C /home/opc/spark
PATH変数の編集
export Spark_Home=/home/opc/spark/spark-3.3.2-bin-hadoop3
export PATH=${Spark_Home}/bin:$PATH
Spark Shell の起動
spark-shell
で正常に起動できるのかを確認します。
[opc@linux8-java11-data-flow ~]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/24 11:28:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:4040
Spark context available as 'sc' (master = local[*], app id = local-1682335738728).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.18)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Quit コマンド: CTRL + D
JDBCドライバーのダウンロード
ダウンロードURL:https://www.oracle.com/database/technologies/appdev/jdbc-downloads.html
ファイル:ojdbc11-full.tar.gz
ローカルの展開先:/home/opc/download/ojdbc/
(事前に作成しておく)
OCI HDFS Connector のダウンロード
ダウンロードURL:https://github.com/oracle/oci-hdfs-connector/releases
ファイル名:oci-hdfs.zip
ローカルの展開先:/home/opc/download/oci-hdfs
展開後、下記ファイルをsparkのjarsの下に保存してださい。
保存先:/home/opc/spark/spark-3.3.2-bin-hadoop3/jars
- oci-hdfs/lib/oci-hdfs-full-3.3.4.0.1.1.jar
- oci-hdfs/third-party/lib/bcpkix-jdk15on-1.70.jar
- oci-hdfs/third-party/lib/bcprov-jdk15on-1.70.jar
Python SDK のインストール
コマンド:sudo pip3 install -U oci
OCI構成ファイルの準備
オブジェクト・ストレージにアクセスするため、認証情報を記載するOCI構成ファイルとAPIキーファイル(PEM)は必要です。
ファイル作成方法の詳細は、次の記事をご参照ください。
「OCIカスタム・メトリックでディスク使用率を監視」の「STEP1. 事前準備」
ファイルの保存先:/home/opc/.oci/config
(構成ファイルとPEMキーファイルの保存先を別々にしてもOKです。)
3. Pythonプログラムの作成
Githubに、Python、JavaとScalaの3種類のサンプルがあります。今回は、Pythonのサンプルコードを例にして検証します。
ソースの格納先:/home/opc/python/loadadw.py
編集箇所は、以下となります。
# TODO: Set all these variables.
INPUT_PATH = "oci://<bucket>@<object_storage_namespace>/<filename>.csv"
PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.ap-tokyo-1.<secret_ocid>"
TARGET_TABLE = "ZIPCODEJP"
TNSNAME = "adwdf_high"
USER = "DFUSER01"
WALLET_PATH = "oci://<bucket>@<object_storage_namespace>/<wallet_file>.zip"
オブジェクト・ストレージ・ネームスペースは、OCIコンソールから確認できますが、OCI CLIのoci os ns get
でも簡単に取得できます。(テナント名ではないので、ご注意を)
処理流れ
CSVファイルの読込 → ADB Walletファイルの取得と解凍 → シークレットからパスワードの取得 → ADBへの書込み
4. ローカル環境での実行
ターゲット・テーブルを予め作成しなくてもよいです。実行後自動に作成されます。列名は、CSVファイルのヘッダーと同様です。
実行コマンド:
spark-submit --jars /home/opc/download/ojdbc/ojdbc11.jar /home/opc/python/loadadw.py
出力ログの抜粋:
[opc@linux8-java11-data-flow ]$ spark-submit --jars /home/opc/download/ojdbc/ojdbc11.jar /home/opc/python/loadadw.py
23/04/24 12:46:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/24 12:46:17 INFO SparkContext: Running Spark version 3.3.2
23/04/24 12:46:17 INFO ResourceUtils: ==============================================================
23/04/24 12:46:17 INFO ResourceUtils: No custom resources configured for spark.driver.
23/04/24 12:46:17 INFO ResourceUtils: ==============================================================
23/04/24 12:46:17 INFO SparkContext: Submitted application: DataFlow
...中略...
Getting wallet password
Done getting wallet password
Saving processed data to jdbc:oracle:thin:@adwdf_high?TNS_ADMIN=/tmp
23/04/24 12:46:29 INFO BlockManagerInfo: Removed broadcast_0_piece0 on linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:46195 in memory (size: 34.7 KiB, free: 434.4 MiB)
23/04/24 12:46:29 INFO BlockManagerInfo: Removed broadcast_2_piece0 on linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:46195 in memory (size: 34.7 KiB, free: 434.4 MiB)
23/04/24 12:46:29 INFO BlockManagerInfo: Removed broadcast_1_piece0 on linux8-java11-data-flow.publicsubnet1.vcn1.oraclevcn.com:46195 in memory (size: 5.9 KiB, free: 434.4 MiB)
23/04/24 12:46:29 INFO FileSourceStrategy: Pushed Filters:
23/04/24 12:46:29 INFO FileSourceStrategy: Post-Scan Filters:
23/04/24 12:46:29 INFO FileSourceStrategy: Output Data Schema: struct<zipcode: string, address1: string, address2: string, address3: string ... 2 more fields>
...中略...
23/04/24 12:46:32 INFO DAGScheduler: ResultStage 1 (jdbc at NativeMethodAccessorImpl.java:0) finished in 3.120 s
23/04/24 12:46:32 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
23/04/24 12:46:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
23/04/24 12:46:32 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/04/24 12:46:32 INFO DAGScheduler: Job 1 finished: jdbc at NativeMethodAccessorImpl.java:0, took 3.130925 s
Done saving processed data to database
...中略...
23/04/24 12:46:33 INFO SparkContext: Successfully stopped SparkContext23/04/24 12:46:33 INFO ShutdownHookManager: Shutdown hook called
23/04/24 12:46:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-eb0988f9-4df7-4154-aa7d-d7fdce1ec33e/pyspark-8995996a-7003-41f4-92ce-d087fa9351ec
23/04/24 12:46:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-89bfb53f-4572-4d7d-be52-eee52e797b9c
23/04/24 12:46:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-eb0988f9-4df7-4154-aa7d-d7fdce1ec33e
[opc@linux8-java11-data-flow ]$
全体の処理時間は、およそ18秒で、データ(12万件以上)の書込み時間は、4秒未満です。
以上です。
関連記事
オラクル・クラウドの個人シリーズ・ブログ
OCI Data Flowを使ってオブジェクト・ストレージからADBへデータをロードする -- Part 2
OCI Data Flowを使ってオブジェクト・ストレージからADBへデータをロードする -- Part 3
OCI データ・フロー・ドキュメント
製品サイト
オフィシャル・ドキュメント
データ・フロー・チュートリアル