はじめに
注意: GCPを利用するため、費用が発生します。この記事を再現される場合は「お支払い」>「予算のアラート」から請求額に対するアラート通知を設定することをお勧めします。
概要
BigQuery ML Matrix Factorizationでレコメンドエンジを実装し、レコメンドリスト(ユーザ x アイテム)を作成してDatastoreに登録します。K8s上に実装したAPIサーバーからユーザIDをキーにレコメンドリストを取得するレコメンドAPIを実装します。
GCPの学習のため、Cloud Buildを使ってCI/CDのパイプラインを構築してBuildとDeployプロセスの自動化も試します。
システム構成
目次
- レコメンドエンジンを実装してDatastoreにレコメンドリストを登録
- BigQuery ML でレコメンドエンジンを実装
- Datastoreにレコメンドリストを登録
- GolangでAPI serverを実装
- ディレクトリの構成
- ソースコード
- CI/CD パイプラインを構築して Kubernetes 上に API sever をデプロイ
- Google Kubernetes Engine (GKE) の設定
- GKEのクラスターを作成
- Cloud Build の設定
- Postmanを使ってレコメンドAPIをテスト
実装
レコメンドエンジンを実装してDatastoreにレコメンドリストを登録
BigQuery ML でレコメンドエンジンを実装
レコメンドのアルゴリズムについて
Matrix Factorization でユーザーベースの協調フィルタを実装します。Matrix Factorization のアルゴリズムの紹介は本記事の主旨ではないので省きますが、こちらの記事がわかりやすいと思います。
TensorFlow でのレコメンデーション システムの構築: 概要
利用したデータ
MovieLens 100Kのデータセットを使用しました。
実装方法
以下の記事を参考にさせていただきました。
BigQuery ML を使用して映画の評価に基づきレコメンデーションを行う
BigQuery ML の Matrix Factorization で映画の推薦を行ってみる
注意: 2021/01 時点ではBigQuery ML の Matrix Factorizationは、Reservations(予約)からスロットを購入する必要があります。Flex Slotsを購入されると思いますが、使用が済んだら削除することを気をつけてください。-> Reservations の概要
レコメンドエンジン作成クエリ(モデルの学習)
CREATE OR REPLACE MODEL
recommendapi_ml_100k.mf_model OPTIONS (model_type="matrix_factorization",
user_col="user_id",
item_col="item_id",
l2_reg=9.83) AS
SELECT
user_id,
item_id,
rating
FROM
recommendapi_ml_100k.ratings
レコメンドリストの作成(予測)
CREATE OR REPLACE TABLE
recommendapi_ml_100k.predicted_ratings OPTIONS() AS
SELECT
*
FROM
ML.RECOMMEND(MODEL recommendapi_ml_100k.mf_model)
予測結果
学習したモデルでユーザーごとの各アイテム(映画)に対するレビュースコアの予測をします。
レコメンドリスト(ユーザーxアイテム)を作成
BigQueryで datstore に登録する形式にします。
ここでは、ユーザIDに対してお勧め順(スコアの降順)に10本の映画のタイトルを紐づけています。
DataflowでBigQueryのレコメンドリストをDatastoreへ登録
Dataflowで実装した場合にgoogle-cloud-datastoreのパッケージがimportできない問題があり、、ApacheBeamでの実装方法をご紹介します。(奮闘中です、。)
ApacheBeamでレコメンドリストをDatastoreへ登録
BigQueryで作成したレコメンドリストをCSVでエクスポートして以下のApacheBeamのプログラムでdatastoreに登録します。
Cloud shellで実行しました。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.cloud import datastore
import re
class MyOptions(PipelineOptions):
"""カスタムオプション."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--input",
default="./recommendapi_recommend_list.csv",
help="Input path for the pipeline",
)
class RegistDatastore(beam.DoFn):
"""CSVのレコメンドリストをDatastoreに登録"""
def __init__(self):
pass
def clientDatastore(self, line):
"""クライアントの設定"""
client = datastore.Client()
kind = "Recommend"
# ユーザ-ID
name = line.split(",")[0]
key = client.key(kind, name)
entity = datastore.Entity(key)
# アイテムID
entity["items"] = re.search('"(.*)"', line).group(1)
client.put(entity)
def process(self, line):
self.clientDatastore(line)
def run():
options = MyOptions()
options.view_as(StandardOptions).runner = "DirectRunner"
p = beam.Pipeline(options=options)
(
p
| "ReadFromText" >> beam.io.ReadFromText(options.input, skip_header_lines=1)
| "RegistDatastore" >> beam.ParDo(RegistDatastore())
)
p.run()
if __name__ == "__main__":
run()
GolangでAPI serverを実装
ディレクトリの構成
.
├── Dockerfile
└── src
└── recommendAPI
└── httpd
├── handler
│ └── recommendFunc.go
├── main.go
└── recommend
└── recommend.go
ソースコード
Golangは今回初めて使用するため至らないところは目を瞑ってください。。><
main.go
package main
import (
"github.com/gin-gonic/gin"
"recommendAPI/httpd/handler"
)
func main() {
r := gin.Default()
r.GET("/recommend", handler.RecommendGet())
r.Run()
}
recommendFunc.go
package handler
import (
"net/http"
"recommendAPI/httpd/recommend"
"github.com/gin-gonic/gin"
)
func RecommendGet() gin.HandlerFunc {
return func(c *gin.Context) {
result := recommend.GetUserData(c)
c.JSON(http.StatusOK, result)
}
}
recommend.go
データストアのclientの実装はこちらを参照しました。
Datastore mode Client Libraries
package recommend
import (
"fmt"
"log"
"github.com/gin-gonic/gin"
"cloud.google.com/go/datastore"
)
// Creates a instance.
type Recommend struct{
// UserId *datastore.Key `datastore:"__key__.id"`
Items string
}
func GetUserData(ctx *gin.Context) Recommend{
// Set your Google Cloud Platform project ID.
projectID := "xxxxxxxxxxxx"
// Creates a client.
client, err := datastore.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
// Sets the kind for the new entity.
kind := "Recommend"
// Sets the name/ID for the new entity.
name := ctx.Query("userId")
// Creates a Key instance.
recommendKey := datastore.NameKey(kind, name, nil)
// Creates a Recommend instance.
var recommend Recommend
// e := new(Entity)
if err := client.Get(ctx, recommendKey, &recommend); err != nil {
log.Fatalf("Failed to get recommend: %v", err)
}
return recommend
}
Dockerfile
FROM golang:1.13-alpine3.10
# FROM google/cloud-sdk:alpine
COPY src/recommendAPI /go/src/recommendAPI
WORKDIR /go/src/recommendAPI
# Install required packages
RUN apk update \
&& apk add --no-cache git \
&& go mod init \
&& go get -u github.com/gin-gonic/gin \
&& go get -u cloud.google.com/go/datastore
CMD go run httpd/main.go
CI/CD パイプラインを構築して Kubernetes 上に API sever をデプロイ
Google Kubernetes Engine (GKE) の設定
GKEのマニュフェストファイルを作成
以下を参考にしてDeploymentを作成します。
https://kubernetes.io/ja/docs/concepts/workloads/controllers/deployment/
k8s.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: recommend-api
spec:
selector:
matchLabels:
app: recommend-api
replicas: 1
template:
metadata:
labels:
app: recommend-api
spec:
containers:
- name: recommend-api
image: gcr.io/<project id>/recommend-api:latest
resources:
requests:
cpu: 100m
memory: 100Mi
ports:
- containerPort: 8080
---
kind: Service
apiVersion: v1
metadata:
name: recommend-api
spec:
type: LoadBalancer
selector:
app: recommend-api
ports:
- name: http
port: 8080
targetPort: 8080
GKEのクラスターを作成
クラスタを作成する際に「各APIにアクセス権を設定」から「Cloud Datastore」を有効にします。
Cloud Build の設定
Cloud Buildの構成ファイルを作成
CI/CD環境としてCloud Buildを利用いてます。
開発環境で作成したdocker imageをビルドしてcontainer registryにpushして、GKE上にデプロイされるように一連の設定を記載します。
cloudbuild.yaml
steps:
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/<project id>/recommend-api:latest', '.']
id: docker build
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/<project id>/recommend-api:latest']
id: docker push
- name: 'gcr.io/cloud-builders/gcloud'
args: ['container', 'clusters', 'get-credentials', 'recommend-api', '--zone', 'us-central1-c', '--project', '<project id>']
id: gcloud container clusters get-credentials
- name: 'gcr.io/cloud-builders/kubectl'
args: ['apply', '-f', 'k8s.yml']
env :
- "CLOUDSDK_COMPUTE_REGION=us-central1-c"
- "CLOUDSDK_COMPUTE_ZONE=us-central1-c"
- "CLOUDSDK_CONTAINER_CLUSTER=recommend-api"
id: kubectl apply
Cloud Buildにトリガーを設定
GitHubのリポジトリを連携して、デフォルト設定のmasterブランチへのpushをトリガーとしました。
Postmanを使ってレコメンドAPIをテスト
Cloud Buildに登録したrepositoryに対してpushをします。GKEにAPI serverがデプロイされます。
デプロイされたら、GKEの管理画面からエンドポイントを調べて、以下のようにパラメータにuserIdを加えてリクエストを投げます。。
レコメンド結果の確認
ユーザーIDに対してレコメンド結果(映画のリスト)が返ってきましたらテストは終わりです。
参考
TensorFlow でのレコメンデーション システムの構築: 概要
BigQuery ML を使用して映画の評価に基づきレコメンデーションを行う
BigQuery ML の Matrix Factorization で映画の推薦を行ってみる
GCPの教科書II 【コンテナ開発編】