2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksでSparkNLPとMLLibを使って分散トピックモデリングをやってみる

Last updated at Posted at 2022-05-17

こちらで紹介しているノートブックはこちらです。

絶賛Spark NLPの勉強中なのですが以下の記事が非常にわかりやすかったので、Databricksで試してみました。

クラスターの設定

MLLibを使うので機械学習ランタイムを使用します。また、Spark NLPを使う際には、pypiからのライブラリのインストールとMaven経由でのライブラリのインストールが必要です。pypiからはノートブック上でインストールしているのですが、Maven経由ではクラスターライブラリとしてインストールしています。両方ともクラスターライブラリとしてインストールすることも可能です。

ライブラリのバージョンはクラスターのSparkバージョンに依存するので、こちらで確認してください。

例えば、Sparkのバージョンが3.2.xである場合には、com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.4をインストールします。

ライブラリをインストール後に、以下のように「インストール済み」になっていることを確認します。
Screen Shot 2022-05-17 at 10.22.18.png

ライブラリのインストール&インポート

Python
# Install PySpark and Spark NLP
%pip install -q pyspark==3.1.2 spark-nlp

# Install Spark NLP Display lib
%pip install --upgrade -q spark-nlp-display
Python
# Spark NLPのインポート
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

データのロード

  1. こちらからCSVファイルをダウンロードします。
  2. サイドメニューのデータをクリックし、アップロードボタンをクリックします。
  3. アップロードするパスを選択して、CSVファイルをドラッグ&ドロップします。

以下の例では、下のパスにアップロードしています。

dbfs:/FileStore/shared_uploads/takaaki.yayoi@databricks.com/news/abcnews_date_text.csv

Screen Shot 2022-05-17 at 9.58.53.png
Screen Shot 2022-05-17 at 10.01.30.png

Python
file_location = "dbfs:/FileStore/shared_uploads/takaaki.yayoi@databricks.com/news/abcnews_date_text.csv"
file_type = "csv"

# CSVのオプション
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# レコード数の確認
df.count()

Screen Shot 2022-05-17 at 10.43.08.png

Python
display(df)

データは記事の発行日とヘッドラインから構成されています。
Screen Shot 2022-05-17 at 10.43.49.png

Spark NLPを用いた前処理パイプライン

Spark NLPでもMLLibと同じように処理のパイプラインを構築することができます。

Python
# Spark NLPはドキュメントに変換する入力データフレームあるいはカラムが必要です
document_assembler = DocumentAssembler() \
    .setInputCol("headline_text") \
    .setOutputCol("document") \
    .setCleanupMode("shrink")

# 文をトークンに分割(array)
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

# 不要な文字やゴミを除外
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

# ストップワードの除外
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

# 原型にするための単語のステミング
stemmer = Stemmer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("stem")

# Finisherは最も重要なアノテーターです。Spark NLPはデータフレームの各行をドキュメントに変換する際に自身の構造を追加します。Finisherは期待される構造、すなわち、トークンの配列に戻す助けをしてくれます。 
finisher = Finisher() \
    .setInputCols(["stem"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

# それぞれのフェーズが順番に実行されるようにパイプラインを構築します。このパイプラインはモデルのテストにも使うことができます。
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher])

# パイプラインのトレーニング
nlp_model = nlp_pipeline.fit(df)

# データフレームを変換するためにパイプラインを適用します。
processed_df  = nlp_model.transform(df)

# NLPパイプラインは我々にとって不要な中間カラムを作成します。なので、必要なカラムのみを選択します。
tokens_df = processed_df.select('publish_date','tokens').limit(10000)

display(tokens_df)

これで文がトークンに分割されましたので、特徴量エンジニアリングに進むことができます。

Screen Shot 2022-05-17 at 10.45.15.png

特徴量エンジニアリング

単語の出現頻度を特徴量にするCountVectorizerを使います。要件に応じてTF/IDFなどを使うことも可能です。

Python
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)

# モデルのトレーニング
cv_model = cv.fit(tokens_df)

# データを変換します。出力カラムが特徴量となります。
vectorized_tokens = cv_model.transform(tokens_df)

Screen Shot 2022-05-17 at 10.47.14.png

LDAモデルの構築

Python
from pyspark.ml.clustering import LDA

num_topics = 3

lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)

ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)

print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

トピックの可視化

Python
# CountVectorizerからボキャブラリーを抽出
vocab = cv_model.vocabulary

topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

Screen Shot 2022-05-17 at 10.48.53.png

次は日本語にトライしてみます。

Databricks 無料トライアル

Databricks 無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?