はじめに
前回の Advent Calendar の投稿では, 簡単に Dataproc の特徴について説明を行いましたが、今回は, 実際に Dataproc を使ったレコメンドシステムの実装について解説したいと思います。
実装するシステムの概要
【追記】
下記では、BigQuery から直接読み込む実装にしていますが、データ量が大きい場合(数百MB以上)は、テキストデータに比べて読み込みに大幅に時間がかかるため、BigQuery から GCS へ一旦エクスポートし、CSVファイルとして読み込むことをお勧めします。
本投稿では、ユーザーのアイテムに対するLikeや閲覧履歴から、ユーザーに対してアイテムをレコメンドするシステムを実装します。ライブラリとしては、Matrix Factorization を Alternating Least Square という手法で計算するライブラリを使っています。Matrix Factorization に関しては下記の記事が詳しいです。
基本的には、下記のリンク先にある、MLlib の Collaborative Filtering の通りです。
[MLlib - Collaborative Filtering]
(http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering)
ただし、通常 Spark でデータを読み込む際には、データベースに入っているデータをテキストデータとしてエクスポートし、HDFS へ格納してから Spark で読み込むことが多いと思いますが、Dataproc では BigQuery から直接データを読み書きすることが可能なため、簡単にデータのやり取りが可能です。
従って、本投稿においても、直接 BigQuery からデータを読み込み、レコメンドモデルを構築してアイテムスコアを計算した後に、再び結果を直接 BigQuery へ書き込むという実装を行いたいと思います。
BigQuery の設定に関しては、下記のリンクの通りですが、SBTの設定やブロック数の設定等いくつかハマったポイントがあったので、その辺りを重点的に説明できればと思います。
[Using the BigQuery Connector with Spark]
(https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example)
事前準備
プログラムのインストール
Dataproc を利用するにあたり, Google Cloud SDK と Scala が必要になりますので, 下記のリンク等を参考にインストールを行ってください。
フォルダ構成
プログラムのインストールが終了したら, 下記のフォルダ構成でファイルを作成してください。Scala のプログラムは、src/main/scala/calc_recommend_items.scala に記載し、built.sbt の情報を元に SBT でビルドを行い、exec.sh でクラスタ立ち上げて、ジョブを Dataproc の Spark へ送るという構成になっています。
calc_recommend_items
├ exec.sh
├ built.sbt
└ src/
└ main/
└ scala/
└ calc_recommend_items.scala
SBT の設定
Scala のビルドツールである Scala Built Tool (SBT) の設定ファイルである built.sbt に関しては、下記の通り記載してください。ただし、ライブラリのバージョン等は必要に応じて適宜書き換えてください。
name := "Calc Recommend Items"
version := "0.1"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.2",
"org.apache.spark" %% "spark-mllib" % "1.5.2",
"com.google.cloud.bigdataoss" % "bigquery-connector" % "0.7.3-hadoop2")
参考リンク
Bigquery Connector.jar » 0.7.3-hadoop2
データの準備
データに関しては、下記のデータセット、テーブル名、スキーマで BigQuery へ登録されているものとします。
データセット: work
テーブル: user_item_matrix
スキーマ
name | type |
---|---|
user_id | STRING |
item_id | STRING |
score | INTEGER |
注意点としては、user_id と item_id に関しては、STRING でデータが入っていますが、計算時はこれらをスパース行列として扱うため、INTEGERに変換できる形で入っている必要があります。
データ例
user_id | item_id | score |
---|---|---|
1 | 3 | 1.0 |
1 | 5 | 2.0 |
5 | 10 | 1.0 |
実行コードの解説
上記の用意ができたら、calc_recommend_items.scala へ下記のコードを入力します。基本的にそれほど難しいところはないと思いますが、いくつかハマるポイントがあったので、そちらに関しては下記で個別に説明します。
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
object CalcRecommendItems{
// 入力パラメータ
val fullyQualifiedInputTableId = "work.user_item_matrix"
// 出力パラメータ
val projectId = "project_id"
val outputDatasetId = "dataset"
val outputTableId = "table"
val outputTableSchema =
"[{'name':'user_id','type':'STRING'},{'name':'item_id','type': 'STRING'},{'name': 'rating','type': 'FLOAT'}]"
// JSONをListに変換するヘルパー
def convertToList(record: JsonObject) : Array[String] = {
val user_id = record.get("user_id").getAsString
val item_id = record.get("item_id").getAsString
val score = record.get("score").getAsString
return Array(user_id, item_id, score)
}
// RatingオブジェクトをJSONに変換するヘルパー
def convertToJson(rate: Rating) : JsonObject = {
val user_id = rate.user.toString
val item_id = rate.product.toString
val rating = rate.rating.toDouble
val jsonObject = new JsonObject()
jsonObject.addProperty("user_id", user_id)
jsonObject.addProperty("item_id", item_id)
jsonObject.addProperty("rating", rating)
return jsonObject
}
def calc_recommend_items(rank: Int=300, numBlocks: Int=15, iterations: Int=10, numItems: Int=50, lambda: Double = 0.01, alpha: Double = 0.01) {
// SparkContext インスタンスを作成して必要情報を取得
val sc = new SparkContext()
val conf = sc.hadoopConfiguration
val bucket = conf.get("fs.gs.system.bucket")
// インプットの設定
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
val inputTmpDir = s"gs://${bucket}/hadoop/tmp/bigquery/tmp_user_item_matrix_export"
conf.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, inputTmpDir)
// アウトプットの設定
BigQueryConfiguration.configureBigQueryOutput(
conf, projectId, outputDatasetId, outputTableId, outputTableSchema)
conf.set(
"mapreduce.job.outputformat.class",
classOf[BigQueryOutputFormat[_,_]].getName)
// BigQueryからデータを読み込む
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject]).cache
// BigQueryから読み込んだデータをRatingに変換
val ratings = tableData.map(x => convertToList(x._2)).map(_ match { case Array(user, product, rating) =>
Rating(user.toInt, product.toInt, rating.toDouble)
})
// ALS で レコメンドモデルを構築
val model = ALS.trainImplicit(ratings, rank, iterations, alpha, numBlocks, lambda)
// ユーザーごとの推薦アイテムを計算
val recommendItemsList = model.recommendProductsForUsers(numItems)
// BigQuery へ書き込みできる形式に変換
val outRDD = recommendItemsList.map(x => x._2).flatMap(x => x).map(x => (null,convertToJson(x)))
// BigQuery へ書き込み
outRDD.saveAsNewAPIHadoopDataset(conf)
val inputTmpDirPath = new Path(inputTmpDir)
inputTmpDirPath.getFileSystem(conf).delete(inputTmpDirPath, true)
}
def main(args: Array[String]){
calc_recommend_items()
}
}
BigQuery から読み込んだデータを変換
BigQuery から読み込んだレコードは、下記のようなレコード形式で帰ってくるので、下記の様に変換するヘルパーを作って対応し、それを Rating に渡しています。
変換例:(1, {"user_id":"1","item_id":"3","rating":1.0}) ⇒ Array(1, 3, 1.0)
// BigQueryから読み込んだデータをRatingに変換
val ratings = tableData.map(x => convertToList(x._2)).map(_ match { case Array(user, product, rating) =>
Rating(user.toInt, product.toInt, rating.toDouble)
})
numBlocks を明示的に指定
Dataproc で ALS を使ってモデルをトレーニングする際は、必ず numBlock でブロック数を明示的にする必要があります。ドキュメントでは、デフォルトは -1 となっていて、いい感じにブロック数を指定してくれるとありますが、少なくとも Dataproc の YARN 環境ではブロック数が2つとかになってしまい、リソースを使い切れないという問題がありました。
また、データが明示的なレーティングでなく、行動履歴等の明示的な評価データでない場合 trainImplicit という方を使う必要があります。詳しくは下記の論文が詳しいです。
Collaborative Filtering for Implicit Feedback Datasets
// ALS で レコメンドモデルを構築
val model = ALS.trainImplicit(ratings, rank, iterations, alpha, numBlocks, lambda)
ユーザーごとのアイテムを変換
ユーザーごとに上位のアイテムを返す、recommendProductsForUsers というメソッドを使います。この返り値が、下記のような形式で返ってくるので、この convertToJson というヘルパーを作成して Rating を JSON 形式に変換することで対応しています。
// ユーザーごとの推薦アイテムを計算
val recommendItemsList = model.recommendProductsForUsers(numItems)
// BigQuery へ書き込みできる形式に変換
val outRDD = recommendItemsList.map(x => x._2).flatMap(x => x).map(x => (null,convertToJson(x)))
バッチの実行
上記のコードが作成できたら、下記のようなシェルスクリプトを作成して実行することで、一時的にクラスタを立ち上げてレコメンドの計算を実行することができます。
#!/bin/bash
# Scala プログラムをビルド
sbt package
# クラスタを作成
gcloud beta dataproc clusters create dataproc01 --bucket bucket \
--zone asia-east1-a \
--master-machine-type n1-standard-16\
--worker-machine-type n1-standard-16 \
--num-workers 5
# Spark ジョブを実行
gcloud beta dataproc jobs submit spark \
--cluster dataproc01 \
--class CalcRecommendItems \
--properties spark.dynamicAllocation.enabled=false,spark.executor.cores=5,spark.executor.memory=8g,spark.executor.instances=15 \
--jars ./target/scala-2.10/calc-recommend-items_2.10-0.1.jar
# クラスタを削除
gcloud beta dataproc clusters delete -q dataproc02
参考
[Using the BigQuery Connector with Spark]
(https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example)
[MLlib - Collaborative Filtering]
(http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering)