LoginSignup
1
1

More than 1 year has passed since last update.

Spark NLPとSpark MLLib(LDA)を用いた分散トピックモデリング

Posted at

Distributed Topic Modelling using Spark NLP and Spark MLLib(LDA) | by Satish Silveri | Analytics Vidhya | Mediumの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

トピックモデリングは自然言語処理で最も一般的なタスクの一つです。数百万のドキュメントからトピックの分布を抽出することは、例えば特定の製品、あるいは、すべての製品に対する不満の理由の特定や、ニュース記事のトピックの特定のような古典的なサンプルなど多くの面で有用です。ここでは、トピックモデリングが何であるのかや、動作原理には踏み込みません。そのような目的に供する良い記事はインターネットに数多く存在しますが、Analytics Vidhyaのこの記事が非常に包括的であると思いました。ですので、トピックモデリングに馴染みがない、あるいは記憶を新たにしたいのであれば、上の記事をチェックしてみてください。

この記事目的は、トピックモデリングに対する分散アプローチに慣れることです。お使いのデータレイクに数十億のドキュメントがあり、それらに含まれるトピックに対してより良く理解をしたいと考えているとします。Pythonで数十億のドキュメントを処理しようとすると、計算能力の制限やボトルネックに遭遇します。幸運なことに、SparkのMLLibは特に分散環境で動作するように設計されたバージョンのLDAを提供します。ここでは、データの前処理にSpark NLP、データからのトピック抽出にSpark MLLibのLDAを用いたシンプルなモデリングパイプラインを構築します。

我々はニュース記事のデータを使います。こちらのリンクからデータセットをダウンロードすることができます。それでは、スタートしていくつかのコードを書いていきましょう。

Sparkセッションの初期化

最初に必要なパッケージをインポートしSparkセッションを初期化します。spark-submitを用いてSparkアプリケーションを起動する際にパラメーターを指定することも可能です。

Python
# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .config("spark.driver.memory","8G")\ #change accordingly
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .getOrCreate()

データの読み込み

3つのソース、ローカル、HDFS、S3からデータを読み込むことができます。データがS3にあるのであれば、分散コピーを行うS3DistCpを用いて、そのデータをHDFSに転送することをお勧めします。このアプローチによって、S3からデータを読み込む際に必要となるネットワークIOを削減し、データをすべてのワーカーノードに分配し、データをメモリーにロードします。

Python
# if you are reading file from local storage
file_location = r'path\to\abcnews_date_txt.csv'
# if you are reading file from hdfs
file_location = r'hdfs:\\\user\path\to\abcnews_date_txt.csv'
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
# Verify the count
df.count()

データは2つのカラム、publish_dateheadline_textから構成されます。

Spark NLPを用いた前処理パイプライン

Python
# Spark NLP requires the input dataframe or column to be converted to document. 
document_assembler = DocumentAssembler() \
    .setInputCol("headline_text") \
    .setOutputCol("document") \
    .setCleanupMode("shrink")
# Split sentence to tokens(array)
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
# clean unwanted characters and garbage
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")
# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)
# stem the words to bring them to the root form.
stemmer = Stemmer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("stem")
# Finisher is the most important annotator. Spark NLP adds its own structure when we convert each row in the dataframe to document. Finisher helps us to bring back the expected structure viz. array of tokens.
finisher = Finisher() \
    .setInputCols(["stem"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)
# We build a ml pipeline so that each phase can be executed in sequence. This pipeline can also be used to test the model. 
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher])
# train the pipeline
nlp_model = nlp_pipeline.fit(df)
# apply the pipeline to transform dataframe.
processed_df  = nlp_model.transform(df)
# nlp pipeline create intermediary columns that we dont need. So lets select the columns that we need
tokens_df = processed_df.select('publish_date','tokens').limit(10000)
tokens_df.show()

Spark NLPのパイプラインのアウトプットは、クレンジングされステミングされたトークンのリストとなります。

特徴量エンジニアリング

テキストデータから特徴量を作成するためにSpark MLLibのCountVectorizerを使います。Latent Dirichlet Allocationでは、トピックモデリングを行うためにデータ固有のボキャブラリーが必要となります。

Python
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)
# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

LDAモデルの構築

LDAモデルには、少なくとも2つのハイパーパラメーター、k(トピックの数)とmaxIter(イテレーションの数)が必要となります。お使いのデータに適したkmaxIterがどれであるのかを確認するために、異なる値で試してみてください。

Python
from pyspark.ml.clustering import LDA
num_topics = 3
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

トピックの可視化

トレーニングが完了したら、以下のコードを用いてそれぞれのトピックを表現する単語を確認します。

Python
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()   
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

アウトプットは以下のようなものになります。

まとめ

機械学習の能力のみの観点では、Pythonで直接利用できる膨大な機械学習ライブラリと比較した場合、Spark MLlibは豊富な機能を提供しているとは言えません。Spark MLLibが価値を付加するのは、大規模データセットに対する基本的なMLタスクを分散処理する能力を提供する場面です。リリースごとの大幅な改善によって、Apache Sparkは徐々にビッグデータと機械学習のギャップを埋めています。

読んでいただきありがとうございます!

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1