マネージドなApache Spark環境「OCI Data Flow(データ・フロー)」サービスを用いて、クラウドストレージ上のデータを効率よく処理・分析する一連の仕組みを構築してみます。
本記事では、OCI Data Flowを使って、まずSpark環境を「動かして結果を見る」 ことで、ETL や分析、機械学習の処理がどのように流れていくのかを確認していきます。
データサイエンスやビッグデータ処理に興味はあるものの、環境構築にハードルを感じている人が、処理の流れを一通り体験するための入門的な内容です。
サンプルのデータやプログラムを利用して実施することができるので、ぜひ試してみてください!
*OCIドキュメントとして提供されているデータ・フローのハンズオン内容に沿って実施していきます(参考)。
OCI Data Flowとは
例えば、数百万行、数千万行といった、Excelでは開けないほど巨大なデータを効率よく集計・加工したいとします。
こうした処理には、一般的に「Apache Spark」のような強力なシステムを動かすためのサーバー群を自前で構築・管理する必要がありますが、特に最初の段階では大きなハードルになります。
OCI Data Flow は、そうした クラスタ管理や実行基盤の準備を意識せずに Spark ジョブを実行できるサービスです。
サーバーの準備や設定をすべてクラウド側にお任せして、「自分はやりたい処理(プログラム)を放り込むだけ」 でSparkのパワーを活用できます。また、処理が終わったサーバーは自動で消えるので(サーバーレス)、実行時間分のみの課金で利用できます。
やってみること
ベルリンのAirbnb物件データ(Berlin Airbnb dataset)を使用し、大量の物件情報の中から 「価格に対して価値が高い(お得な)物件を予測して特定するパイプラインを作成します。
本記事では、以下の3つのステップを通して、データエンジニアリングとデータサイエンスの標準的な流れを体験します。
| フェーズ | 手法 | 役割と目的 |
|---|---|---|
| 1. データの最適化 | ETL(Java) | 読み込み効率の悪い CSV データを、分析に最適な 高速・軽量な Parquet 形式へ変換します。 |
| 2. 現状の把握 | Analysis(SQL) | 変換したデータに対し、使い慣れた SQL を用いて素早く統計(平均価格など)を確認します。 |
| 3. 妥当な価格の予測 | ML(Python) | 機械学習(線形回帰)を用いて物件の適正価格を算出し、相場よりもお買い得な物件を自動で特定します。 |
では、早速事前準備から始めていきましょう!
0. 事前準備
OCI Data Flow を使用するための Object Storage の作成やデータの格納、ポリシーの設定を行います。
*このパートで実施する内容は、データ・フローのアプリケーション作成時に前提条件としてコンソール画面に表示される項目を含みます。
0-1. ネームスペース名の確認
ネームスペースを確認します。
右上のアイコンからプロファイル > テナンシをクリックします。
テナンシ詳細画面でオブジェクト・ストレージ・ネームスペースを確認します。
この後利用していくので、メモとして控えておきます。
0-2. サンプルデータとプログラムの用意
以下のサンプルデータセットとプログラムを用意します。全部で4つダウンロードしておきます。
① Berlin Airbnbデータ・データセット(.csvファイル)
👉 こちらからダウンロード
※クリエイティブ・コモンズCC0 1.0ユニバーサル(CC0 1.0)の「Public Domain Dedication」ライセンスに基づいてKaggle Webサイトからダウンロードされています。
② ETL 処理を行う Java アプリケーションコード(.jarファイル)
* 1. Java を利用したETLで利用します
👉 こちらからダウンロード
③ データのプロファイリングを行うSparkSQL(.sql)
* 2. SparkSQL によるデータの簡易的なプロファイリングで利用します
👉 こちらからダウンロード
④ 機械学習を行うPythonスクリプト(.pyファイル)
* 3. PySpark を使用した機械学習で利用します
👉 こちらからダウンロード
0-3. Object Storage バケットの作成
計4つのバケットを作成します。
まず、Data Flow でアプリケーションを作成する前に、以下の 2 つのバケットを作成しておく必要があります。バケット名は指定されているものを使います。
-
dataflow-logs:Data Flow がアプリケーションの実行毎にログ(標準出力と標準エラー出力の両方)を格納するためのバケット -
dataflow-warehouse:SparkSQL アプリケーションのデータウェアハウス用のバケット
さらに、上記とは別に今回の検証用に以下のバケットを作成しておきます。(バケット名は任意です。)
-
bucket-dataflow-samples:サンプルのデータセットを格納するバケット -
dataflow-tutorial-result:ETL 処理で変換したParquetファイルを格納するためのバケット
0-4. バケットへのファイルアップロード
0-2. サンプルデータとプログラムの用意 でダウンロードしたファイルを、
OCI Object Storage のバケットへアップロードします。
-
① データファイル
kaggle_berlin_airbnb_listings_summary.csv- アップロード先:
dataflow-warehouse
-
②③④ プログラムファイル
- 上記 CSV 以外の 3 ファイル
- アップロード先:
bucket-dataflow-samples
0-5. ポリシーの設定
Data Flow でアプリケーションを管理・実行するためにいくつかのポリシーを設定する必要があります。
OCI コンソール画面左上のハンバーガーメニューから、アイデンティティとセキュリティ > ポリシー を選択します。
「ポリシーの作成」をクリックし、以下のように入力した上で Data Flow 用のポリシーを作成します。
-
名前:
data_flow_policy -
ポリシーユースケース:
データ・フロー -
共通ポリシー・テンプレート:
データ・フロー管理者がすべてのアプリケーションと実行を管理
- グループ:Data Flow を実行するユーザーが含まれているグループ
- 場所:Data Flow 関連のリソース(Data Flow, Object Storage etc.)が配置されるコンパートメント
上記のとおり作成することで、以下のようなポリシーが設定されます。
Allow group 'undefined'/'{group name}' to read buckets in compartment dataflow
Allow group 'undefined'/'{group name}' to manage dataflow-family in compartment dataflow
Allow group 'undefined'/'{group name}' to manage objects in compartment dataflow where ALL {target.bucket.name='dataflow-logs', any {request.permission='OBJECT_CREATE', request.permission='OBJECT_INSPECT'}}
ここまでで準備は整いました!
1. Java を利用したETL
このパートでは、データエンジニアリングの基礎となる 「データパイプライン」 の構築を実施していきます。
■ このパートの目的
提供されている「Berlin Airbnb」データセットlistings_summary.csv の素のデータ(CSV形式)を、後の分析処理で扱いやすい Parquet形式 へ変換し、Object Storageへ格納します。
これは、データエンジニアリングにおける ETL の初期段階にあたります。
-
Extract(抽出):
listings_summary.csvを読み込む。 - Transform(変換): CSVからParquet形式へ構造を変換する(データクレンジングの第一歩)。
- Load(格納): 変換したデータを Object Storage へ保存する。
1-1. Javaアプリケーションを作成する
それでは、実際にアプリケーションを作成して、変換プログラムを動かしてみましょう。
「アナリティクスとAI」から「データ・フロー」を選択します。

「アプリケーションの作成」をクリックします。
(データ・フローを設定するには前提条件が必要です。と警告が出ていますが、事前準備で行った内容です。)
ウィザードに従って入力していきます。
今回は名前をTutorial 1、それ以外はでデフォルトの値で作成します。
アプリケーション構成までスクロールし、以下のように入力します。
-
アプリケーション構成
-
言語:
Java
-
言語:
-
ファイルの選択
-
ファイル・バケット:
bucket-dataflow-samples -
ファイル名:
oow-lab-2019-java-etl-1.0-SNAPSHOT.jar
-
ファイル・バケット:
ファイルURL(URI)手動指定
自身のテナント外にある公開バケットなどを指定する場合、「ファイル・パスを手動で入力します」を有効にして、URI形式で入力する必要があります。
Data Flow は 内部で Hadoop Distributed File System(HDFS) コネクタ を使用してObject Storage へアクセスします。
その際、oci:// から始まる特定のURIを使用して指定します。
URLとURIの差異
-
URL:
https://.../n/ネームスペース/b/バケット/o/(パス+)オブジェクト名 -
URI:
oci://バケット@ネームスペース/(パス+)オブジェクト名
例:
oci://<your_bucket_name>@<your_namespace>/oow_2019_dataflow_lab/usercontent/oow-lab-2019-java-etl-1.0-SNAPSHOT.jar
-
アプリケーション構成(続き)
- メイン・クラス名:
convert.Convert - 引数:
${input} ${output}
- メイン・クラス名:
-
アーカイブURI
-
アーカイブ・バケット:
dataflow-logs
-
アーカイブ・バケット:
-
アプリケーション・ログの場所
-
オブジェクト・ストレージ・バケット:
oci://dataflow-logs@<your_namespace>(デフォルトのまま)
-
オブジェクト・ストレージ・バケット:
データ・フローのアプリケーションが作成できました。
1-2. Javaアプリケーションの実行
作成したアプリケーション「Tutorial 1」の詳細画面を開きます。
「実行」をクリックします。
パラメータの値を以下のとおり入力します。
確認した上で「実行」をクリックします。
-
パラメータ
-
input:
oci://dataflow-warehouse@<your_namespace>/kaggle_berlin_airbnb_listings_summary.csv -
output:
oci://dataflow-tutorial-result@<your_namespace>/optimized_listings
-
input:
実行すると、「実行」画面でステータスが「受け入れ済」であることを確認できます。
数分するとステータスが「進行中」に変更されました。指定された値などに問題があると、ここで「失敗」となることもあります。
ステータスが「成功」になりました!
1-3. Spark UIで進行状況を確認する
ステータスが「進行中」もしくは「成功」になった「実行」の 「アクション」メニュー(︙) から「Spark UI」を選択します。
Apache Spark UI に自動的にリダイレクトされます。これは、デバッグやパフォーマンス・チューニングに便利です。
1-4. 実行結果を確認する
「実行」の「モニタリング」からログを確認することができます。
spark_application_stdout.log.gzファイルを選択すると、ログ出力「Conversion was successful」が表示されます。
出力オブジェクト・ストレージ・バケット dataflow-tutorial-resultを確認してみます。
変換後のParquetファイルが作成されています。
これで、Java を使用した ETL は完了です。
【参考】 ETL 処理を行う Java アプリケーションについて
実行されているJavaアプリケーションのコードは以下のような流れで行われています。
- エントリーポイントの起動:Sparkアプリケーションとして処理を開始します。
- スキーマ(形式)の定義:読み込むCSVデータの各列(名前、価格、地区など)のデータ型を定義します。
- データの読み込み:Object Storage上にあるCSVファイルを、Sparkのデータセットとして読み込みます。
- フォーマット変換と書き出し:読み込んだデータをParquet形式に変換し、指定されたバケットへ書き出します。
2. SparkSQL によるデータの簡易的なプロファイリング
前パートで「下ごしらえ(ETL)」が終わったデータを使って、いよいよ中身の分析を行っていきます。ここでは、SparkSQL という機能を使って、クラウド上のデータに対して直接SQLクエリを実行します。
■ このパートの目的
SQLスクリプトを実行して、データセットの基本的なプロファイリングを実行します。
データのプロファイリングとは、持っているデータがどのような中身なのか、特徴や傾向を素早く把握する作業のことです。
本格的な分析や機械学習を始める前に、まずは以下の統計数値を算出してデータの全体像を把握します。
- 宿泊施設の数
- 価格の範囲(最小/最大価格)
- 相場(平均値)
2-1. SQLアプリケーションの作成
OCI コンソール画面左上のハンバーガーメニューを展開し、アナリティクスと AI > データ・フローと選択します。「アプリケーションの作成」を押し、以下のように入力して Spark アプリケーションを作成します。
-
一般情報
-
名前:
Tutorial 2 - それ以外の項目はデフォルト
-
名前:
-
アプリケーション構成
-
言語:
SQL -
パラメータ・オプション
-
名前:
location -
値:
oci://dataflow-tutorial-result@<your_namespace>/optimized_listings
-
名前:
-
言語:
-
アーカイブURI
-
アーカイブ・バケット:
dataflow-logs
-
アーカイブ・バケット:
-
アプリケーション・ログの場所
-
オブジェクト・ストレージ・バケット:
oci://dataflow-logs@<your_namespace>(デフォルトのまま)
-
オブジェクト・ストレージ・バケット:
アプリケーションが作成できました。
2-2. SQLアプリケーションの実行
作成した「Tutorial 2」を開きます。
「実行」をクリックします。
パラメータの値を確認し、「実行」します。
(アプリケーションの作成でデフォルトとして指定した値が表示されているはずです。)
「実行」のステータスが「受け入れ済」になって、その後「成功」になるのを待ちます。
2-3. 実行結果を確認する
「実行」の完了後、実行詳細画面から「モニタリング」を選択し実行ログのリストを表示します。spark_application_stdout.log.gzの内容を確認してみます。
出力が次の値と同じ結果になっているか確認します。
異なる順序で行が表示されることがありますが、値は以下のものと一致しているか確認します。
このデータセットでは、Neukollnの平均定価は最低の$46.57で、Charlottenburg-Wilmersdorfは最大の$114.27であることが分かりました(※ソース・データセットの価格はEURではなくUSDです)。
【参考】 実行した SparkSQL について
ここで 実行した SparkSQLスクリプトでは以下のような処理を行っています。
- 外部テーブルの定義と読み込み:Step 1で作成したParquetデータを、SQLで操作できる「テーブル」として定義し、直接かつ高速に読み取れるようにします。
- 統計情報の集計処理:SQLの GROUP BY を用いて、地区ごとの物件数や価格の統計(最小・最大・平均)を一括で算出し、データの全体像を把握します。
この集計により、どの地区が最も物件が多く、価格帯がどうなっているかというマーケットの全体像を把握できました。
3. PySparkを使用した機械学習
これまでに「下ごしらえ(ETL)」し、「現状把握(プロファイリング)」したデータを使って、機械学習(Machine Learning) を行っていきます。
■ このパートの目的
Step 1 で作成したParquet 形式のデータセット を読み込み、Spark の機械学習アルゴリズムを用いて、多数の Airbnb リストの中から 「価格に対して価値が高い(お得な)物件」 を識別する予測モデルを実行します。
3-1. PySparkアプリケーションの作成
- 名前:
Tutorial 3 - リソース構成:
Spark 3.0.2 (Scala 2.12, Python 3.6, Java 8, Hadoop 3.2.0, Oracle Linux 7)※ここまではデフォルトの最新バージョンで作成していましたが、サンプルで提供している機械学習プログラムoow_lab_2019_pyspark_ml.pyが対応していないので、ここでは以前のバージョンで作成します。 - 言語:
Python - ファイル名:
oow_lab_2019_pyspark_ml.py -
- パラメータ
- 引数:
${location} - 値:
oci://dataflow-tutorial-result@<your_namespace>/optimized_listings
- 引数:
- パラメータ
3-2. PySparkアプリケーションの実行
「実行」をクリックします。
パラメータの値を確認し、「実行」します。
(アプリケーションの作成でデフォルトとして指定した値が表示されているはずです。)
3-3. 実行結果を確認する
実行の完了後、spark_application_stdout.log.gzファイルを開き、メモ帳などで結果を確認してみます。
この結果から、リスト ID: 690578が 面積が 4639 平方フィートで定価$35.00 に比べて、予測価格$313.70であり、最適な価格であることが分かりました。
【参考】 実行されている PySpark コードについて
このステップで実行した Python スクリプト(oow_lab_2019_pyspark_ml.py)の中身を詳しく見ていきましょう。このコードでは、Apache Spark の機械学習ライブラリ(MLlib)を使用して、データのクリーニングから予測までを一気に行っています。
| 番号 | 処理の内容 | 役割 |
|---|---|---|
| 1 | データのロード | Step1で出力したParquetファイルを読み込みます。 |
| 2 | ビューの作成 | DataFrameをSQLで操作できるように、Spark アプリケーションが終了するまで保持する一時的なビューを作成します。 |
| 3 | クレンジング | 面積(square_feet)や価格が0のデータは学習の邪魔になるため、除外してデータの精度を高めます。 |
| 4 | ベクトル化 | 機械学習アルゴリズムが扱えるように、数値を「ベクトル(束)」形式にまとめます。 |
| 5 | モデル学習 | 「広さと価格の関係性」を線形回帰アルゴリズムで学習させ、数式化します。 |
| 6 | 価格予測 | 作成したモデルを使い、実際のデータに対して「本来の適正価格」を算出させます。 |
| 7 | 最適物件の特定 | SQLで「実際の価格 − 予測価格」を計算し、マイナスが大きい(=相場より安い)順に並べて結果を表示します。 |
線形回帰により、面積に対する適正価格が算出され、本来より300ドル近く安く出品されている超お得物件を見つけ出すことができました!
まとめ
今回は OCI Data Flow を使って、Object Storage 上のデータを Apache Spark で処理する一連の流れを実際に動かしてみました。
- Java(ETL) で CSV を Parquet に変換し、分析しやすい形に整える
- SQL(SparkSQL) でデータの傾向を把握する
- Python(PySpark ML) で予測モデルを動かし、条件に合うデータを抽出する
このように、ETL・分析・機械学習という異なる処理を段階的につなげることで、
データがどのように加工され、最終的な結果につながっていくのかを一通り確認することができました。
Spark の仕組みやチューニングを深く理解する前に、OCI Data Flowを利用してまずは「動かして結果を見る」ことで、データ処理全体の流れを掴むことができそうです。
参考





































