概要
今回は、顧客ロイヤリティをユースケースにして、ksqlDBを利用したプログラムを試してみました。
顧客ロイヤリティプログラムを考える
「 10個のスタンプを集めると無料のコーヒーが1杯飲める」という単純なものはいたるところで目にします。こうしたものを顧客ロイヤリティプログラムと呼びます。
今回は、このようなプログラムを進化させて、適切な場所と時間に顧客を引き付けるより洗練されたリワードプログラムを作成してみようと思います。
そのためには適切なプロモーションを適切なタイミングで適用するために複数のデータストリームを集約する必要があります。
ksqlDBを使う
ksqlDBは、クラウド内の一般的なデータソースやエンドシステムから直接データをインポートおよびエクスポートするができます。ここでは、ksqlDBのINSERT INTO機能で模擬データを使用してコードを実行します。
この記事では、ユーザーの行動を分析し顧客に提供するインセンティブを決定するさまざまな方法の検討に役立てばと思っております。
手順1:Streamを作成する
以下のksqlDB構文を実行します。
ここでは、users,products,purchasesの3つのSTREAMを作成します。
CREATE STREAM users (
user_id VARCHAR KEY,
name VARCHAR
) WITH (
KAFKA_TOPIC = 'USERS',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE STREAM products (
product_id VARCHAR KEY,
category VARCHAR,
price DECIMAL(10,2)
) WITH (
KAFKA_TOPIC = 'products',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE STREAM purchases (
user_id VARCHAR KEY,
product_id VARCHAR
) WITH (
KAFKA_TOPIC = 'purchases',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
手順2:マテリアライズドビューを作成する
CREATE TABLE all_products AS
SELECT
product_id,
LATEST_BY_OFFSET(category) AS category,
LATEST_BY_OFFSET(CAST(price AS DOUBLE)) AS price
FROM products
GROUP BY product_id;
CREATE STREAM enriched_purchases AS
SELECT
purchases.user_id,
purchases.product_id AS product_id,
all_products.category,
all_products.price
FROM purchases
LEFT JOIN all_products ON purchases.product_id = all_products.product_id;
CREATE TABLE sales_totals AS
SELECT
user_id,
SUM(price) AS total,
CASE
WHEN SUM(price) > 400 THEN 'GOLD'
WHEN SUM(price) > 300 THEN 'SILVER'
WHEN SUM(price) > 200 THEN 'BRONZE'
ELSE 'CLIMBING'
END AS reward_level
FROM enriched_purchases
GROUP BY user_id;
CREATE TABLE caffeine_index AS
SELECT
user_id,
COUNT(*) AS total,
(COUNT(*) % 6) AS sequence,
(COUNT(*) % 6) = 5 AS next_one_free
FROM purchases
WHERE product_id = 'coffee'
GROUP BY user_id;
CREATE TABLE promotion_french_poodle
AS
SELECT
user_id,
collect_set(product_id) AS products,
'french_poodle' AS promotion_name
FROM purchases
WHERE product_id IN ('dog', 'beret')
GROUP BY user_id
HAVING ARRAY_CONTAINS( collect_set(product_id), 'dog' )
AND ARRAY_CONTAINS( collect_set(product_id), 'beret' )
EMIT changes;
CREATE TABLE promotion_loose_leaf AS
SELECT
user_id,
collect_set(product_id) AS products,
'loose_leaf' AS promotion_name
FROM enriched_purchases
WHERE product_id IN ('coffee', 'tea')
GROUP BY user_id
HAVING ARRAY_CONTAINS( collect_set(product_id), 'coffee' )
AND NOT ARRAY_CONTAINS( collect_set(product_id), 'tea' )
AND sum(price) > 20;
手順3:モックデータを投入する
ここでは、ksqlDBを使って模擬データを投入します。
user,products,purchasesそれぞれに模擬データをセットします。
-- Some users.
INSERT INTO users ( user_id, name ) VALUES ( 'u2001', 'kris' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2002', 'dave' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2003', 'yeva' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2004', 'rick' );
-- Some products.
INSERT INTO products ( product_id, category, price ) VALUES ( 'tea', 'beverages', 2.55 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'coffee', 'beverages', 2.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'dog', 'pets', 249.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'cat', 'pets', 195.00 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'beret', 'fashion', 34.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'handbag', 'fashion', 126.00 );
-- Some purchases.
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'beret' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'cat' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'tea' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'dave', 'dog' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'dave', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'beret' );
-- A price increase!
INSERT INTO products ( product_id, category, price ) VALUES ( 'coffee', 'beverages', 3.05 );
-- Some more purchases.
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'dog' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'cat' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'handbag' );
手順4:検証する
ここでは、模擬データが正しく投入されたか以下のクエリを実行して検証します。
SELECT * FROM promotion_loose_leaf;
以下の出力が得られたらここまでのプロセスは正しく動作しています。
+-----------------------------+----------------------------------+--------------------------------------+
|USER_ID |PRODUCTS |PROMOTION_NAME |
+-----------------------------+----------------------------------+--------------------------------------+
|kris |[coffee] |loose_leaf |
Query terminated
検証の説明
手順4で実行したSQLについて見ておきたいと思います。
このSQLでは"promotion_loose_leaf"に対してクエリを実行していますので、その内容を確認します。
コーヒーを飲んでいるが紅茶を飲んだことのないすべてのお客様を探します。具体的にはenriched purchasesストリームをスキャンし、コーヒーと紅茶を選び出し、そのうえで紅茶ではなくコーヒーを購入したユーザーを見つけ、少なくとも20ドルを使用したお客様に限定したマテリアライズドビューを作成していますね。
ユースケースのイメージとしては、例えば該当するお客様に対して、紅茶の割引クーポンを提供しその効果を測定することができるかもしれませんね。
上記の他にもマテリアライズドビューがありますので、続きは次回試してみたいと思います。
最後までお読み頂きましてありがとうございました。