はじめに
ゲームレビューを分析するにあたり、データ数が多くなるにつれてマニュアル確認することが難しく、またAPIによるLLMへのリクエストは有料となることから別の手法を調べたところ、無料で機械的にレビューのトピックを判断する方法としてLDA(Latent Dirichlet Allocation:潜在ディリクレ配分法)があることがわかった。備忘録として、LDAについて整理していく。
LDAとは
LDAとはベイズモデルの一種で、与えられた文章がどんなトピックなのか、また各単語はどのトピックと深い関わりがあるか判断する教師なし学習のモデルである。
LDAのイメージとしては、以下の3Stepで行われていく。
Step1:レビュー全体を単語単位で分解する
Step2:一緒に出現しやすい単語を学習し、トピックを作成する
Step3:各レビューについて、トピックごとに関連性を割合で示す
※全体のレビューから単語の組み合わせを抽出するため、該当レビューに関連しない単語がトピック内の単語群に含まれる。
人から見た視点は上記の流れだが、LDAは以下の3Stepで文章が作成されていることを前提としている。
Step1:文章に対して、各トピックがどの割合で利用されるか決める(トピックは決まっている前提)
Step2:各トピックの中から、どの単語を利用するか決める
Step3:利用を決めた単語を並べていく
実装環境
今回は Databricks Free Edition に保存したSteamレビューを対象にLDAを実装した。実装時には「は」「が」などの1文字語がトピックに含まれないよう、禁止ワードをまとめたテーブルを別途用意している。
Databricksを選んだ理由は以下の通りである。
①自分でPostgreSQLなどのDBを構築せずに、簡単にテーブルへデータ登録ができる
②ジョブ機能を使って定期実行を設定できるため、Steamレビューを定期的に自動収集できる
ソースコード
analyze_review_LDA.ipynb(クリックで展開)
%pip install gensim
# 必要ライブラリ
import re
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
# 形態素解析(fugashi)
from fugashi import Tagger
# gensim
from gensim import corpora, models
from gensim.models import Phrases
from gensim.models.phrases import Phraser
def analyze_steam_reviews_LDA(df_app_list):
"""
Spark上の `game_review_detail` を対象に日本語レビューをLDA(gensim)でトピック抽出し、
各レビューのトピック分布を Delta テーブル `game_review_topics_lda` に保存します。
- 形態素解析: fugashi
- ストップワード: Sparkテーブル `word_dictionary` (use_type='stop', del_flg='0')
- ngram: (1,2) 相当として、gensimのPhrasesでBi-gramを付与
"""
# 分析対象日付
today = datetime.today().date()
date = (today - timedelta(days=6))
# ストップワード取得(Spark)
df_stop_words = spark.sql("""
SELECT word
FROM `workspace`.`game_analysis_db`.`word_dictionary`
WHERE use_type = 'stop' AND del_flg = '0'
""").toPandas()
stopwords = set(df_stop_words['word'].astype(str).tolist())
# 形態素解析器
tagger = Tagger()
# 品詞フィルタ
target_pos = {'名詞', '形容詞', '動詞'}
exclude_pos = {'助動詞', '助詞', '記号', '接続詞'}
# トークナイズ関数
def tokenize(text: str):
# 記号等をスペースに置換(分割の手がかり)
clean = re.sub(r'[^\wぁ-んァ-ン一-龥]', ' ', str(text))
tokens = tagger(clean)
out = []
for token in tokens:
# fugashi: token.feature.pos1 が主要品詞
if not hasattr(token.feature, "pos1"):
continue
pos1 = token.feature.pos1
if pos1 in target_pos and pos1 not in exclude_pos:
surf = token.surface
# フィルタリング
if len(surf) <= 1:
continue
if surf in stopwords:
continue
if re.fullmatch(r'[a-zA-Z]', surf):
continue
out.append(surf)
return out
# 出力テーブル名
out_table = "`workspace`.`game_analysis_db`.`game_review_topics_lda`"
# 出力テーブル作成(存在しない場合)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {out_table} (
appid INT,
recommendationid STRING,
dominant_topic INT,
dominant_prob DOUBLE,
-- 可変長(n_topics)列は実行時にDataFrameスキーマから付与するため固定列のみ先に定義
registration_date DATE
) USING DELTA
""")
# アプリごとに処理
for _, row in df_app_list.iterrows():
appid = int(row["id"])
# 0) レビュー本文とIDを取得(順序はそのまま保持)
df_reviews = spark.sql(f"""
SELECT review, recommendationid
FROM `workspace`.`game_analysis_db`.`game_review_detail`
WHERE timestamp_updated > '{date}'
AND id = {appid}
AND weighted_vote_score >= 0.5
""").toPandas()
if df_reviews.empty:
print(f"[{appid}] reviews=0 -> skip")
continue
# 1) レビューの文字列のみ抽出(順序維持)
reviews = df_reviews['review'].dropna().astype(str).tolist()
# 2) トークン化(文書ごと)
tokenized_docs = [tokenize(txt) for txt in reviews]
# 3) Bi-gram 生成(Phrases)
# min_count=2 くらいで低頻度を抑え、閾値(threshold)はデフォルト使用
phrases = Phrases(tokenized_docs, min_count=2)
bigram = Phraser(phrases)
tokenized_docs = [bigram[doc] for doc in tokenized_docs]
# 4) 辞書 & コーパス作成
# 低頻度語/高頻度語のフィルタ: no_below=2, no_above=0.5(調整可)
dictionary = corpora.Dictionary(tokenized_docs)
dictionary.filter_extremes(no_below=2, no_above=0.5, keep_n=20000)
# 辞書フィルタ後に再マッピング
corpus = [dictionary.doc2bow(doc) for doc in tokenized_docs]
# コーパスがスカスカならスキップ
if all(len(bow) == 0 for bow in corpus):
print(f"[{appid}] corpus empty after filtering -> skip")
continue
# 5) LDA(gensim)
n_topics = 8 # 必要に応じて調整
lda_model = models.LdaModel(
corpus=corpus,
id2word=dictionary,
num_topics=n_topics,
random_state=0,
chunksize=2000,
passes=5, # 学習反復(増やすと精度↑/時間↑)
iterations=100, # 反復
alpha='auto',
eta='auto',
eval_every=None
)
# 6) トピック上位語の表示(確認用)
topn = 10
print(f"\n[appid={appid}] トピック上位語")
for k in range(n_topics):
tops = [w for w, _ in lda_model.show_topic(k, topn=topn)]
print(f" Topic {k+1}: {', '.join(tops)}")
# 7) 各レビューのトピック分布(ID紐づけ)
# gensimのget_document_topicsで確率取得、疎→密に詰める
doc_topic_matrix = np.zeros((len(corpus), n_topics), dtype=float)
for i, bow in enumerate(corpus):
# minimum_probability=0で全トピック確率を要求
topics = lda_model.get_document_topics(bow, minimum_probability=0.0)
for tid, prob in topics:
doc_topic_matrix[i, tid] = prob
topic_cols = [f"topic_{i+1}" for i in range(n_topics)]
doc_topic_df = pd.DataFrame(doc_topic_matrix, columns=topic_cols)
# recommendationid(順序そのまま)とappid付与
doc_topic_df.insert(0, "recommendationid", df_reviews["recommendationid"].to_numpy())
doc_topic_df.insert(0, "appid", appid)
# 支配トピック(最大確率)
dom_idx = np.argmax(doc_topic_matrix, axis=1) # 0-based
dom_prob = doc_topic_matrix[np.arange(doc_topic_matrix.shape[0]), dom_idx]
doc_topic_df["dominant_topic"] = (dom_idx + 1) # 1-based
doc_topic_df["dominant_prob"] = dom_prob
# 登録日
doc_topic_df["registration_date"] = pd.to_datetime(today)
# 確認表示
print("\n各レビューのトピック分布(ID紐づけ):")
try:
display(doc_topic_df.head())
except:
# ノートブック以外でも落ちないように
print(doc_topic_df.head().to_string(index=False))
print(f"[appid={appid}] rows={len(doc_topic_df)}")
# # 8) Sparkへ保存(Delta, append)
# sdf = spark.createDataFrame(doc_topic_df)
# # 可変の topic_* 列を含めて保存(都度スキーマ拡張される)
# sdf.write.mode("append").saveAsTable(out_table)
print("\n✅ 完了: すべてのappidに対するLDA結果を保存しました。")
実務上どう利用していくか
LDAはレビュー全体からトピックを抽出する仕組みのため、「先月までのレビューで得られたトピック」と「今月分を追加して再計算したトピック」が必ずしも一致するとは限らない。この性質から、過去から現在にかけてのトピック変化をそのまま定量的に追跡する用途にはあまり向いていない。
実務的には、まず初回の実行で監視対象としたいトピックを定義する。その後、特定のトピックの出現確率に変動が見られた際にLDAを再実行し、生成されるトピックの内容を確認して監視対象を再定義する、といった運用が適していると考えられる。
参考資料
[1]杉山聡、「本質を捉えたデータ分析のための分析モデル入門 統計モデル、深層学習、強化学習等 用途・特徴から原理まで一気通貫」、ソシム株式会社、2024年5月
[2]Databricks, 「Databricks ノートブック」, https://docs.databricks.com/gcp/ja/notebooks/