皆さんこんにちは。@best_not_bestです。
今回は担当している業務に沿った技術を紹介します。
概要
協調フィルタリングを用いて、あるユーザーがある商品を購入するスコアを算出します。計算量が多く、大規模なデータだと処理に時間がかかるため、PySparkで分散処理を行います。
環境
- マシン/OS
- MacBook Pro (Retina, 15-inch, Mid 2014)
- OS X Yosemite 10.10.5
- Python
- Python 3.5.2 :: Anaconda 4.1.1 (x86_64)
- Pythonパッケージ
- 特になし
- Spark
- 2.0.0
Spark側でPython 3.xを呼び出せるよう、spark-env.shに以下を追記しておきます。
export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
手順
- 学習データを取得する
- (必要に応じて)学習データを加工する
- モデルを作成する
- スコアを予測する
学習データを取得する
データベース等から学習データを取得し、「user」「item」「rating」の3カラムを持つCSVファイルを作成します。例えば「過去1ヶ月分の商品購入データ」を用いる場合、「ユーザーID」「商品ID」「その商品を購入したかどうか(未購入:0/購入:1)」といったデータになります。以下、CSVファイルの例です。
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
rating
の名前の通り、「ユーザーがその商品にどれだけ評価値を付けたかどうか」が本来の使い方になりますが、上記の通り「商品を購入したかどうか」、または「ページにアクセスしたかどうか」といったデータでも実装は可能です。前者の場合は「ユーザーがその商品を購入するスコアはどのくらいか」、後者は「ユーザーがそのページにアクセスするどのくらいか」を予測するモデルになります。
学習データを加工する
ユーザーIDや商品IDがint32
の最大値(2,147,483,647)までしか扱えないため、それを超えるIDがある場合にIDを改めてナンバリングし直します。また整数値しか扱えないので、文字列含まれる場合も同様にナンバリングし直します。
IDが整数値かつint32
の最大値を超えない場合は、この工程は飛ばしてください。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""processing training data."""
from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
class ProcessTrainingData(object):
"""get training data from Redshift, and add sequence number to data."""
def __get_action_log(
self,
sqlContext: SQLContext,
unprocessed_data_file_path: str
) -> DataFrame:
"""get data."""
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(unprocessed_data_file_path)
return df
def run(
self,
unprocessed_data_file_path: str,
training_data_dir_path: str
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('process_training_data')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# get data
df = self.__get_action_log(sqlContext, unprocessed_data_file_path)
# make sequence number of users
unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
)
# make sequence number of items
unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
)
# add sequence number of users, sequence number of items to data
df = df.join(
unique_users_df,
df['user'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
df = df.join(
unique_items_df,
df['item'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# save
saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
df.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
学習データのCSVを読み込み、zipWithIndex()
でナンバリングし直したユーザーIDと商品IDのカラムを追加し、別ファイルとして保存します。
以下の様に実行します。
ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)
パラメータは以下の通りです。
- unprocessed_data_file_path: 学習データファイルパス
- training_data_dir_path : ナンバリングし直したデータファイルの保存先ディレクトリパス
前述の通り、unprocessed_data_file_path
には以下のカラムを持つCSVファイルを設定します。
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
実行すると、training_data_dir_path
に以下のカラムを持つCSVファイルが出力されます。
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
モデルを作成する
協調フィルタリングのモデルを作成し保存します。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""create collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreateCfModel(object):
"""create collaborative filtering model."""
def run(
self,
processed_training_data_file_path: str,
model_dir_path: str,
rank: int,
max_iter: int,
implicit_prefs: str,
alpha: float,
num_user_blocks: int,
num_item_blocks: int
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_cf_model')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='unique_user_id',
itemCol='unique_item_id'
)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# fitting
model = als.fit(df)
# save
saved_data_dir_path = model_dir_path + 'als_model'
model.write().overwrite().save(saved_data_dir_path)
return True
前項でナンバリングし直す必要がなかった場合は、# create model
、# load training data
の箇所をCSVファイルのカラム名に合わせ、以下のように修正してください。
# create model
als = ALS(
rank=int(rank),
maxIter=int(max_iter),
implicitPrefs=bool(implicit_prefs),
alpha=float(alpha),
numUserBlocks=int(num_user_blocks),
numItemBlocks=int(num_item_blocks),
userCol='user',
itemCol='item'
)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
以下の様に実行します。
ccm = CreateCfModel()
ccm.run(
processed_training_data_file_path,
model_dir_path,
rank,
max_iter,
implicit_prefs,
alpha,
num_user_blocks,
num_item_blocks
)
パラメータは以下の通りです。rank
、max_iter
、implicit_prefs
、alpha
、num_user_blocks
、num_item_blocks
はPySparkのALSのパラメータになります。
- processed_training_data_file_path: 学習データファイルパス
- model_dir_path : モデル保存先ディレクトリパス
- rank : Marix Factorizationのランク
- max_iter : 最大反復回数
- implicit_prefs : 潜在的選好
- alpha : アルファ値
- num_user_blocks : ユーザーブロック数
- num_item_blocks : アイテムブロック数
前項でナンバリングした場合、processed_training_data_file_path
に以下のカラムを持つCSVファイルを設定します。
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
ナンバリングしなかった場合は、以下のカラムを持つCSVファイルを設定します。
user | item | rating |
---|---|---|
1xxxxxxxx2 | 3xxxxxxxx5 | 1 |
1xxxxxxxx9 | 3xxxxxxxx5 | 1 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 |
スコアを予測する
保存したモデルを読み込み、ユーザーIDと商品IDの組み合わせからスコアを予測します。学習データとは別に、ユーザーIDと商品IDそれぞれの一覧データを準備してください。予測結果は「全結果」と、「ユーザーごとにスコア上位N件の結果」をCSVファイルとして保存します。
今回は割り切って、元の学習データに存在しない( = 行動ログが存在しない)ユーザーIDと商品IDは予測対象としません。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""predict score from collaborative filtering model."""
from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class CreatePredictedScore(object):
"""predict score from collaborative filtering model."""
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
user_data_file_path: str,
item_data_file_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load user data
users_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(user_data_file_path)
users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_df = sqlContext\
.read\
.format('csv')\
.options(header='false')\
.load(item_data_file_path)
items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(users_df)
del(users_id_rdd)
del(users_id_df)
del(items_df)
del(items_id_rdd)
del(items_id_df)
# load training data
custom_schema = StructType([
StructField('user', StringType(), True),
StructField('item', StringType(), True),
StructField('rating', FloatType(), True),
StructField('unique_user_id', IntegerType(), True),
StructField('unique_item_id', IntegerType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# users
unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
unique_users_df = sqlContext.createDataFrame(
unique_users_rdd,
('user', 'unique_user_id')
).dropDuplicates()
unique_users_df.cache()
# items
unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
unique_items_df = sqlContext.createDataFrame(
unique_items_rdd,
('item', 'unique_item_id')
).dropDuplicates()
unique_items_df.cache()
# delete unnecessary variables
del(training_df)
del(unique_users_rdd)
del(unique_items_rdd)
# add unique user id
joined_df = joined_df.join(
unique_users_df,
joined_df['user_id'] == unique_users_df['user'],
'inner'
).drop(unique_users_df['user'])
# add unique item id
joined_df = joined_df.join(
unique_items_df,
joined_df['item_id'] == unique_items_df['item'],
'inner'
).drop(unique_items_df['item'])
# load model
model = ALSModel.load(model_file_path)
# predict score
predictions = model.transform(joined_df)
all_predict_data = predictions\
.select('user_id', 'item_id', 'prediction')\
.filter('prediction > 0')
# save
# all score
saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
all_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
# limited score
data_limit = int(data_limit)
if data_limit > 0:
all_predict_data.registerTempTable('predictions')
sql = 'SELECT user_id, item_id, prediction ' \
+ 'FROM ( ' \
+ ' SELECT user_id, item_id, prediction, dense_rank() ' \
+ ' OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
+ ' FROM predictions ' \
+ ') tmp WHERE rank <= %d' % (data_limit)
limited_predict_data = sqlContext.sql(sql)
saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
limited_predict_data.write\
.format('csv')\
.mode('overwrite')\
.options(header='true')\
.save(saved_data_file_path)
return True
予測対象の組み合わせは以下の手順で作成しています。
- ユーザーIDと商品IDの全組み合わせを、それぞれの一覧データから作成
- 学習データを読み込み、1.の組み合わせにナンバリングし直したIDを追加、ナンバリング出来なかったユーザーIDと商品IDは削除
最初の項でナンバリングし直す必要がなかった場合は、run
メソッドを以下のように修正してください。
def run(
self,
model_file_path: str,
predict_data_dir_path: str,
processed_training_data_file_path: str,
data_limit: int=-1
) -> bool:
"""execute."""
# make spark context
spark = SparkSession\
.builder\
.appName('create_predicted_score')\
.config('spark.sql.crossJoin.enabled', 'true')\
.config('spark.debug.maxToStringFields', 500)\
.getOrCreate()
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
# load training data
custom_schema = StructType([
StructField('user', IntegerType(), True),
StructField('item', IntegerType(), True),
StructField('rating', FloatType(), True),
])
training_df = sqlContext\
.read\
.format('csv')\
.options(header='true')\
.load(processed_training_data_file_path, schema=custom_schema)
# load user data
users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
users_id_df = sqlContext.createDataFrame(users_id_rdd)
# load item data
items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
items_id_df = sqlContext.createDataFrame(items_id_rdd)
# cross join user_id and item_id
joined_df = users_id_df.join(items_id_df)
joined_df.cache()
# delete unnecessary variables
del(training_df)
del(users_id_rdd)
del(users_id_df)
del(items_id_rdd)
del(items_id_df)
# load model
model = ALSModel.load(model_file_path)
(以下同じ)
以下の様に実行します。
cps = CreatePredictedScore()
cps.run(
model_file_path,
predict_data_dir_path,
user_data_file_path,
item_data_file_path,
processed_training_data_file_path,
data_limit
)
パラメータは以下の通りです。data_limit
でスコア上位N件のNを指定しています。0以下を指定した場合、上位N件のデータは作成しません。
- model_file_path : モデルファイルパス
- predict_data_dir_path : 予測データ保存先ディレクトリパス
- user_data_file_path : ユーザーIDの一覧ファイルパス(ナンバリングし直した場合のみ)
- item_data_file_path : 商品IDの一覧ファイルパス(ナンバリングし直した場合のみ)
- processed_training_data_file_path: 学習データファイルパス
- data_limit : ユーザごとのデータ保存数上限(default: 上限なし)
user_data_file_path
には1列目にユーザーIDを持つCSVファイルを設定します。
ヘッダなしのファイルを使用しています。
item_data_file_path
には1列目に商品IDを持つCSVファイルを設定します。同様にヘッダなしのファイルを使用しています。
processed_training_data_file_path
には以下のカラムを持つCSVファイルを設定します。
user | item | rating | unique_user_id | unique_item_id |
---|---|---|---|---|
1xxxxxxxx7 | 3xxxxxxxx3 | 1 | 57704 | 32419 |
1xxxxxxxx8 | 3xxxxxxxx3 | 0 | 115460 | 32419 |
1xxxxxxxx6 | 3xxxxxxxx3 | 1 | 48853 | 32419 |
predict_data_dir_path
には以下のカラムを持つCSVファイルが出力されます。
user_id | user_id | prediction |
---|---|---|
1xxxxxxxx3 | 3xxxxxxxx4 | 0.15594198 |
1xxxxxxxx3 | 3xxxxxxxx0 | 0.19135818 |
1xxxxxxxx3 | 3xxxxxxxx8 | 0.048197098 |
prediction
が予測値となります。
まとめ
PySparkでALSを用いた協調フィルタリングを実装しました。メソッド分けしてなく、読み辛くてすいません・・・。
次回はモデルの評価方法について説明します。