LoginSignup
10
25

More than 5 years have passed since last update.

皆さんこんにちは。@best_not_bestです。
今回は担当している業務に沿った技術を紹介します。

概要

協調フィルタリングを用いて、あるユーザーがある商品を購入するスコアを算出します。計算量が多く、大規模なデータだと処理に時間がかかるため、PySparkで分散処理を行います。

環境

  • マシン/OS
    • MacBook Pro (Retina, 15-inch, Mid 2014)
    • OS X Yosemite 10.10.5
  • Python
    • Python 3.5.2 :: Anaconda 4.1.1 (x86_64)
  • Pythonパッケージ
    • 特になし
  • Spark
    • 2.0.0

Spark側でPython 3.xを呼び出せるよう、spark-env.shに以下を追記しておきます。

export PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3

手順

  1. 学習データを取得する
  2. (必要に応じて)学習データを加工する
  3. モデルを作成する
  4. スコアを予測する

学習データを取得する

データベース等から学習データを取得し、「user」「item」「rating」の3カラムを持つCSVファイルを作成します。例えば「過去1ヶ月分の商品購入データ」を用いる場合、「ユーザーID」「商品ID」「その商品を購入したかどうか(未購入:0/購入:1)」といったデータになります。以下、CSVファイルの例です。

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

ratingの名前の通り、「ユーザーがその商品にどれだけ評価値を付けたかどうか」が本来の使い方になりますが、上記の通り「商品を購入したかどうか」、または「ページにアクセスしたかどうか」といったデータでも実装は可能です。前者の場合は「ユーザーがその商品を購入するスコアはどのくらいか」、後者は「ユーザーがそのページにアクセスするどのくらいか」を予測するモデルになります。

学習データを加工する

ユーザーIDや商品IDがint32の最大値(2,147,483,647)までしか扱えないため、それを超えるIDがある場合にIDを改めてナンバリングし直します。また整数値しか扱えないので、文字列含まれる場合も同様にナンバリングし直します。
IDが整数値かつint32の最大値を超えない場合は、この工程は飛ばしてください。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""processing training data."""

from datetime import datetime
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

class ProcessTrainingData(object):
    """get training data from Redshift, and add sequence number to data."""

    def __get_action_log(
        self,
        sqlContext: SQLContext,
        unprocessed_data_file_path: str
    ) -> DataFrame:
        """get data."""
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(unprocessed_data_file_path)

        return df

    def run(
        self,
        unprocessed_data_file_path: str,
        training_data_dir_path: str
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('process_training_data')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # get data
        df = self.__get_action_log(sqlContext, unprocessed_data_file_path)

        # make sequence number of users
        unique_users_rdd = df.rdd.map(lambda l: l[0]).distinct().zipWithIndex()
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        )

        # make sequence number of items
        unique_items_rdd = df.rdd.map(lambda l: l[1]).distinct().zipWithIndex()
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        )

        # add sequence number of users, sequence number of items to data
        df = df.join(
            unique_users_df,
            df['user'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])
        df = df.join(
            unique_items_df,
            df['item'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # save
        saved_data_file_path = training_data_dir_path + 'cf_training_data.csv'
        df.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        return True

学習データのCSVを読み込み、zipWithIndex()でナンバリングし直したユーザーIDと商品IDのカラムを追加し、別ファイルとして保存します。
以下の様に実行します。

ptd = ProcessTrainingData()
ptd.run(unprocessed_data_file_path, training_data_dir_path)

パラメータは以下の通りです。

  • unprocessed_data_file_path: 学習データファイルパス
  • training_data_dir_path : ナンバリングし直したデータファイルの保存先ディレクトリパス

前述の通り、unprocessed_data_file_pathには以下のカラムを持つCSVファイルを設定します。

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

実行すると、training_data_dir_pathに以下のカラムを持つCSVファイルが出力されます。

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

モデルを作成する

協調フィルタリングのモデルを作成し保存します。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""create collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreateCfModel(object):
    """create collaborative filtering model."""

    def run(
        self,
        processed_training_data_file_path: str,
        model_dir_path: str,
        rank: int,
        max_iter: int,
        implicit_prefs: str,
        alpha: float,
        num_user_blocks: int,
        num_item_blocks: int
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_cf_model')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # create model
        als = ALS(
            rank=int(rank),
            maxIter=int(max_iter),
            implicitPrefs=bool(implicit_prefs),
            alpha=float(alpha),
            numUserBlocks=int(num_user_blocks),
            numItemBlocks=int(num_item_blocks),
            userCol='unique_user_id',
            itemCol='unique_item_id'
        )

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)

        # fitting
        model = als.fit(df)

        # save
        saved_data_dir_path = model_dir_path + 'als_model'
        model.write().overwrite().save(saved_data_dir_path)

        return True

前項でナンバリングし直す必要がなかった場合は、# create model# load training dataの箇所をCSVファイルのカラム名に合わせ、以下のように修正してください。

# create model
als = ALS(
    rank=int(rank),
    maxIter=int(max_iter),
    implicitPrefs=bool(implicit_prefs),
    alpha=float(alpha),
    numUserBlocks=int(num_user_blocks),
    numItemBlocks=int(num_item_blocks),
    userCol='user',
    itemCol='item'
)

# load training data
custom_schema = StructType([
    StructField('user', IntegerType(), True),
    StructField('item', IntegerType(), True),
    StructField('rating', FloatType(), True),
])

以下の様に実行します。

ccm = CreateCfModel()
ccm.run(
    processed_training_data_file_path,
    model_dir_path,
    rank,
    max_iter,
    implicit_prefs,
    alpha,
    num_user_blocks,
    num_item_blocks
)

パラメータは以下の通りです。rankmax_iterimplicit_prefsalphanum_user_blocksnum_item_blocksPySparkのALSのパラメータになります。

  • processed_training_data_file_path: 学習データファイルパス
  • model_dir_path : モデル保存先ディレクトリパス
  • rank : Marix Factorizationのランク
  • max_iter : 最大反復回数
  • implicit_prefs : 潜在的選好
  • alpha : アルファ値
  • num_user_blocks : ユーザーブロック数
  • num_item_blocks : アイテムブロック数

前項でナンバリングした場合、processed_training_data_file_pathに以下のカラムを持つCSVファイルを設定します。

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

ナンバリングしなかった場合は、以下のカラムを持つCSVファイルを設定します。

user item rating
1xxxxxxxx2 3xxxxxxxx5 1
1xxxxxxxx9 3xxxxxxxx5 1
1xxxxxxxx8 3xxxxxxxx3 0

スコアを予測する

保存したモデルを読み込み、ユーザーIDと商品IDの組み合わせからスコアを予測します。学習データとは別に、ユーザーIDと商品IDそれぞれの一覧データを準備してください。予測結果は「全結果」と、「ユーザーごとにスコア上位N件の結果」をCSVファイルとして保存します。
今回は割り切って、元の学習データに存在しない( = 行動ログが存在しない)ユーザーIDと商品IDは予測対象としません。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""predict score from collaborative filtering model."""

from datetime import datetime
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

class CreatePredictedScore(object):
    """predict score from collaborative filtering model."""

    def run(
        self,
        model_file_path: str,
        predict_data_dir_path: str,
        user_data_file_path: str,
        item_data_file_path: str,
        processed_training_data_file_path: str,
        data_limit: int=-1
    ) -> bool:
        """execute."""
        # make spark context
        spark = SparkSession\
            .builder\
            .appName('create_predicted_score')\
            .config('spark.sql.crossJoin.enabled', 'true')\
            .config('spark.debug.maxToStringFields', 500)\
            .getOrCreate()
        sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

        # load user data
        users_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(user_data_file_path)
        users_id_rdd = users_df.rdd.map(lambda l: Row(user_id=l[0]))
        users_id_df = sqlContext.createDataFrame(users_id_rdd)

        # load item data
        items_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='false')\
            .load(item_data_file_path)
        items_id_rdd = items_df.rdd.map(lambda l: Row(item_id=l[0]))
        items_id_df = sqlContext.createDataFrame(items_id_rdd)

        # cross join user_id and item_id
        joined_df = users_id_df.join(items_id_df)
        joined_df.cache()

        # delete unnecessary variables
        del(users_df)
        del(users_id_rdd)
        del(users_id_df)
        del(items_df)
        del(items_id_rdd)
        del(items_id_df)

        # load training data
        custom_schema = StructType([
            StructField('user', StringType(), True),
            StructField('item', StringType(), True),
            StructField('rating', FloatType(), True),
            StructField('unique_user_id', IntegerType(), True),
            StructField('unique_item_id', IntegerType(), True),
        ])
        training_df = sqlContext\
            .read\
            .format('csv')\
            .options(header='true')\
            .load(processed_training_data_file_path, schema=custom_schema)
        # users
        unique_users_rdd = training_df.rdd.map(lambda l: [l[0], l[3]])
        unique_users_df = sqlContext.createDataFrame(
            unique_users_rdd,
            ('user', 'unique_user_id')
        ).dropDuplicates()
        unique_users_df.cache()
        # items
        unique_items_rdd = training_df.rdd.map(lambda l: [l[1], l[4]])
        unique_items_df = sqlContext.createDataFrame(
            unique_items_rdd,
            ('item', 'unique_item_id')
        ).dropDuplicates()
        unique_items_df.cache()

        # delete unnecessary variables
        del(training_df)
        del(unique_users_rdd)
        del(unique_items_rdd)

        # add unique user id
        joined_df = joined_df.join(
            unique_users_df,
            joined_df['user_id'] == unique_users_df['user'],
            'inner'
        ).drop(unique_users_df['user'])

        # add unique item id
        joined_df = joined_df.join(
            unique_items_df,
            joined_df['item_id'] == unique_items_df['item'],
            'inner'
        ).drop(unique_items_df['item'])

        # load model
        model = ALSModel.load(model_file_path)

        # predict score
        predictions = model.transform(joined_df)
        all_predict_data = predictions\
            .select('user_id', 'item_id', 'prediction')\
            .filter('prediction > 0')

        # save
        # all score
        saved_data_file_path = predict_data_dir_path + 'als_predict_data_all.csv'
        all_predict_data.write\
            .format('csv')\
            .mode('overwrite')\
            .options(header='true')\
            .save(saved_data_file_path)

        # limited score
        data_limit = int(data_limit)
        if data_limit > 0:
            all_predict_data.registerTempTable('predictions')
            sql = 'SELECT user_id, item_id, prediction ' \
                + 'FROM ( ' \
                + '  SELECT user_id, item_id, prediction, dense_rank() ' \
                + '  OVER (PARTITION BY user_id ORDER BY prediction DESC) AS rank ' \
                + '  FROM predictions ' \
                + ') tmp WHERE rank <= %d' % (data_limit)
            limited_predict_data = sqlContext.sql(sql)

            saved_data_file_path = predict_data_dir_path + 'als_predict_data_limit.csv'
            limited_predict_data.write\
                .format('csv')\
                .mode('overwrite')\
                .options(header='true')\
                .save(saved_data_file_path)

        return True

予測対象の組み合わせは以下の手順で作成しています。

  1. ユーザーIDと商品IDの全組み合わせを、それぞれの一覧データから作成
  2. 学習データを読み込み、1.の組み合わせにナンバリングし直したIDを追加、ナンバリング出来なかったユーザーIDと商品IDは削除

最初の項でナンバリングし直す必要がなかった場合は、runメソッドを以下のように修正してください。

def run(
    self,
    model_file_path: str,
    predict_data_dir_path: str,
    processed_training_data_file_path: str,
    data_limit: int=-1
) -> bool:
    """execute."""
    # make spark context
    spark = SparkSession\
        .builder\
        .appName('create_predicted_score')\
        .config('spark.sql.crossJoin.enabled', 'true')\
        .config('spark.debug.maxToStringFields', 500)\
        .getOrCreate()
    sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

    # load training data
    custom_schema = StructType([
        StructField('user', IntegerType(), True),
        StructField('item', IntegerType(), True),
        StructField('rating', FloatType(), True),
    ])
    training_df = sqlContext\
        .read\
        .format('csv')\
        .options(header='true')\
        .load(processed_training_data_file_path, schema=custom_schema)

    # load user data
    users_id_rdd = training_df.rdd.map(lambda l: Row(user_id=l[0]))
    users_id_df = sqlContext.createDataFrame(users_id_rdd)

    # load item data
    items_id_rdd = training_df.rdd.map(lambda l: Row(item_id=l[1]))
    items_id_df = sqlContext.createDataFrame(items_id_rdd)

    # cross join user_id and item_id
    joined_df = users_id_df.join(items_id_df)
    joined_df.cache()

    # delete unnecessary variables
    del(training_df)
    del(users_id_rdd)
    del(users_id_df)
    del(items_id_rdd)
    del(items_id_df)

    # load model
    model = ALSModel.load(model_file_path)
    以下同じ

以下の様に実行します。

cps = CreatePredictedScore()
cps.run(
    model_file_path,
    predict_data_dir_path,
    user_data_file_path,
    item_data_file_path,
    processed_training_data_file_path,
    data_limit
)

パラメータは以下の通りです。data_limitでスコア上位N件のNを指定しています。0以下を指定した場合、上位N件のデータは作成しません。

  • model_file_path : モデルファイルパス
  • predict_data_dir_path : 予測データ保存先ディレクトリパス
  • user_data_file_path : ユーザーIDの一覧ファイルパス(ナンバリングし直した場合のみ)
  • item_data_file_path : 商品IDの一覧ファイルパス(ナンバリングし直した場合のみ)
  • processed_training_data_file_path: 学習データファイルパス
  • data_limit : ユーザごとのデータ保存数上限(default: 上限なし)

user_data_file_pathには1列目にユーザーIDを持つCSVファイルを設定します。
ヘッダなしのファイルを使用しています。
item_data_file_pathには1列目に商品IDを持つCSVファイルを設定します。同様にヘッダなしのファイルを使用しています。
processed_training_data_file_pathには以下のカラムを持つCSVファイルを設定します。

user item rating unique_user_id unique_item_id
1xxxxxxxx7 3xxxxxxxx3 1 57704 32419
1xxxxxxxx8 3xxxxxxxx3 0 115460 32419
1xxxxxxxx6 3xxxxxxxx3 1 48853 32419

predict_data_dir_pathには以下のカラムを持つCSVファイルが出力されます。

user_id user_id prediction
1xxxxxxxx3 3xxxxxxxx4 0.15594198
1xxxxxxxx3 3xxxxxxxx0 0.19135818
1xxxxxxxx3 3xxxxxxxx8 0.048197098

predictionが予測値となります。

まとめ

PySparkでALSを用いた協調フィルタリングを実装しました。メソッド分けしてなく、読み辛くてすいません・・・。
次回はモデルの評価方法について説明します。

参考リンク

10
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
25