はじめに
前回の記事 では、OCI Data Flow のクイックスタート その2 をやりました。今回は引き続き その3 をやっていきます。
Qiita 記事 Index
その1 Java
https://qiita.com/sugimount/items/d67f8aa02f0af89299de
その2 SQL
https://qiita.com/sugimount/items/1d8fe15ddbd58f0cdd76
その3 Python
https://qiita.com/sugimount/items/9a4c87493b569f174ba0
クイックスタート手順
以下のドキュメントに書かれている手順を参考にすすめていきます
その1 のクイックスタートで作成した、Parquet形式のファイルを使用します。このファイルを使って、シンプルな機械学習タスクを実行します。Spark の機械学習機能を使って、Airbnb の宿泊リストのなかから、最良の物件を探していきます。
線形回帰という統計手法を使って、Airbnbの宿泊リストから価格を予測します。予測した価格から、価格を差し引くことによって、最もお得だと考えられる宿泊施設を見つけ出します。
今回のモデルは非常にシンプルなモデルとなっており、実際の業務で使うときには様々ことがらを考慮することが大事です。今回はクイックスタートということで、シンプル進めて行きます。
アプリケーションの定義と実行
Data Flow のページで Create Application を押して、クイックスタート用のアプリケーションを作成します
以下のパラメータを入力します。ファイルURLには、チュートリアルとして用意されているSQLファイルを指定します。SQLファイルの中身は後程確認します。
- 言語 :
Python
- 名前 :
Tutorial Example 3
- ファイルURL :
oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/oow_lab_2019_pyspark_ml.py
- location :
oci://<bucket>@<namespace>/optimized_listings
-
oci://dataflow-logs@nryjxkqe0mhq/optimized_listings
(環境に合わせて、bucket名やnamespaceを変更してください)
-
パラメータを入力したら、作成を押します
実行します
実行します
受入れ済となります。実行完了まで待ちます
実行完了となったので、Application の STDOUT(標準出力)を確認します
上をクリックすると、view という名前のファイルがダウンロード
| id| name|features|price| prediction| value|
+--------+--------------------+--------+-----+------------------+-------------------+
| 690578| Quiet Central Cosy|[4639.0]| 35.0|313.70263597898196|-278.70263597898196|
| 789554|big familia house...|[2260.0]| 70.0|172.44759624397358|-102.44759624397358|
| 669330|modern and cosy 4...|[1722.0]| 50.0|140.50341323538404| -90.50341323538404|
| 230611|4 bedroom holiday...|[1722.0]| 65.0|140.50341323538404| -75.50341323538404|
|28706284|Catsitting in pre...|[1023.0]| 30.0| 98.99972564615713| -68.99972564615713|
| 1275721|Sonniger Altbau B...|[1539.0]| 65.0|129.63764094807573| -64.63764094807573|
| 1353286|Huge quiet conven...|[1399.0]| 60.0| 121.3250282692606| -61.3250282692606|
| 880130|Berlin is a nice ...|[1076.0]| 44.0|102.14664330313714| -58.14664330313714|
| 494492|Amazing Jewel clo...|[1076.0]| 45.0|102.14664330313714| -57.14664330313714|
| 658491|Lovely loft right...| [969.0]| 39.0| 95.79343218432844| -56.79343218432844|
| 37004|WONDERFUL ROOM fo...| [646.0]| 25.0| 76.61504721820498| -51.61504721820498|
| 681132|City flat for 2 p...| [915.0]| 44.0| 92.58713872249976| -48.58713872249976|
| 501735|Beautiful 3 room ...| [807.0]| 39.0| 86.17455179884237| -47.17455179884237|
| 1325593|The green oasis i...| [538.0]| 25.0| 70.2024602945476|-45.202460294547606|
| 2255195|COSY KREUZKOLLN/A...| [807.0]| 41.0| 86.17455179884237| -45.17455179884237|
| 1018784|Large bright 2Bd ...| [915.0]| 48.0| 92.58713872249976| -44.58713872249976|
| 617861|Cool Berlin Kreuz...| [861.0]| 45.0| 89.38084526067107|-44.380845260671066|
| 44083|Riverside View Ap...| [807.0]| 42.0| 86.17455179884237| -44.17455179884237|
| 245991|World stroller's ...| [484.0]| 23.0| 66.99616683271891| -43.99616683271891|
| 298056|3bedrooms in Kreu...| [753.0]| 40.0| 82.96825833701368| -42.96825833701368|
+--------+--------------------+--------+-----+------------------+-------------------+
only showing top 20 rows
Python の中身を確認
それでは、実際の Python コードを確認していきます。以下のURLで公開されています。機械学習は専門的ではないので、「なんとなくこんな感じの処理をしているのかなぁ」と調査した内容となっています。書いている内容が全て正しい保証はないので、ご参考程度でお読みください。
from __future__ import print_function
import sys
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SQLContext
def main():
spark = SparkContext()
sql_context = SQLContext(spark)
# Prep the data.
# We eliminate nulls and zero values as they would skew the model.
listings_df = sql_context.read.format('parquet').load(sys.argv[1])
listings_df.createGlobalTempView("listings")
clean_df = sql_context.sql("select id, name, square_feet, price from global_temp.listings where square_feet > 0 and price > 0")
# Prepare the data to feed into the model.
assembler = VectorAssembler(inputCols = ['square_feet'], outputCol = 'features')
assembled = assembler.transform(clean_df)
df = assembled.select(['id', 'name', 'features', 'price'])
# Train the model.
lr = LinearRegression(featuresCol = 'features', labelCol='price')
model = lr.fit(df)
# Make predictions.
predictions = model.transform(df)
# Sort by the gap between prediction and price.
predictions.createGlobalTempView("predictions")
values = sql_context.sql('select *, price - prediction as value from global_temp.predictions order by price - prediction')
values.show()
if __name__ == '__main__':
main()
main関数が定義されています。Object Storage の中に格納されている Parquet形式ファイルを取得しているのがここの箇所です。sys.argv[1] には、Data Flow のパラメータで定義した oci://dataflow-logs@nryjxkqe0mhq/optimized_listings
文字列が格納されています。
listings_df = sql_context.read.format('parquet').load(sys.argv[1])
Object Storage の、dataflow-logs バケットには指定された Dir に対応したオブジェクトが格納されています。optimized_listings
配下の Parquet ファイルが全てスキャンされて読み込まれるのでしょう。おそらく。
Parquet ファイルを読み込んだあとに、Global Temporary View を作成する箇所がここです。Spark アプリケーションが終了するまで保持して、全てのセッションで共有するテンポラリビューです。詳細を知りたい時は、ドキュメントを参照しましょう。
listing という名前の Global Temporary View を作成しています。
listings_df.createGlobalTempView("listings")
次に、Parquet ファイルの中から、Zero や Null データを除外した、クリーンなデータを取得します。SQL の WHERE 句で、square_feet
と price
が 0 以上のデータを取得しています。
clean_df = sql_context.sql("select id, name, square_feet, price from global_temp.listings where square_feet > 0 and price > 0")
pyspark.ml.feature
から import した VectorAssembler を使って、square_feet 値をベクトル化し、features列に格納しているようです。
# Prepare the data to feed into the model.
assembler = VectorAssembler(inputCols = ['square_feet'], outputCol = 'features')
assembled = assembler.transform(clean_df)
df = assembled.select(['id', 'name', 'features', 'price'])
pyspark.ml.regression
から Import した線形回帰(LinearRegression)をつかって、モデルを作っているようです。
# Train the model.
lr = LinearRegression(featuresCol = 'features', labelCol='price')
model = lr.fit(df)
価格の予測を作っているようです
# Make predictions.
predictions = model.transform(df)
予測価格と、実際の価格の差を使って、データをソートしているようです。実際の価格(price) から、機械学習で予測した価格(prediction) を引いたときに、マイナスの値が最も大きい宿泊施設をお勧め施設として扱っています。
# Sort by the gap between prediction and price.
predictions.createGlobalTempView("predictions")
values = sql_context.sql('select *, price - prediction as value from global_temp.predictions order by price - prediction')
values.show() で、SQLの結果を標準出力に出しています。その結果、Data Flow の log として確認が出来ます。
values.show()
参考URL
Apache Spark Website
https://spark.apache.org/
日本語 : Spark SQL, データフレーム および データセット ガイド
http://mogile.web.fc2.com/spark/spark210/sql-programming-guide.html