26
34

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PySpark, NetworkXを利用した単語共起ネットワークの並列分散処理と可視化

Last updated at Posted at 2017-11-22

はじめに

データマイニング手法の一つに アソシエーション分析 という、何かと何かの関連を定量的に分析する手法があります。

  • ECサイトで同時に買われている商品と商品から購買行動を分析する
  • ペアでダウンロードされているアプリを探す

等々、応用例には枚挙に暇がありませんが、特にデータが膨大になってくると、
直観的に全体像を把握するためには何かしらの可視化を行う必要があるでしょう。
そんな時、一つの手段として、共起関係にあるモノ同士をつなげた ネットワーク・グラフ に表現する事ができます。

あれこれと説明するよりは、図を見て頂いた方が早いかと思いますので、一つ分析の例を用意しました。
下記は、経済ニュースに頻繁に同時に表れる、単語と単語の関係を可視化したものです。
 networkgraph.png

図の考察は最後に行うとして、まずは作り方を説明させて頂きたいと思います。

尚、アソシエーション分析の具体例や他の分析との関連など、用例と手法に関する包括的な説明はALBERT様の解説ページに詳しいため、そちらをご参照ください。
商品分析の手法(ABC分析、アソシエーション分析)

余談ですが、直近構築系の業務から離れていたため、QBK(急に、分散処理が、書きたくなったので)という個人的事情が多分にあります。

設計

そもそも、このような共起ネットワーク分析を行う際には、
以下の3点について設計を行う必要があるでしょう。

  • 共起関係にあるモノが何であるか、分析対象の設計
  • 共起性の定量化方法の設計
  • 何を以って、共起したとみなすかどうか、共起性の境界の設計

分析対象の設計

グラフのNodeの設計です。
自然言語においては単語単語の共起関係が定番ですが、
マーケティングデータにおいては同時に買われている商品の共起や、
よく一緒に働いている同僚や、共演した俳優・女優の人物相関図を作る事も可能でしょう。

今回は単語と単語の共起関係を可視化します。

共起性の定量化方法の設計

こちらは、グラフのEdgeの設計に相当します。

代表的な手法がいくつかありますが、今回はJaccard係数で計算を行います。
Jaccard係数に関しては他に解説が多いためここでは説明を省きますが、
ここではある単語aが出現する文書の集合Aと、ある単語bが出現する文書の集合Bを比較して類似度を計算します。

  • 0 ≦ Jaccard(A,B) ≦ 1
  • 2つの単語が、必ずペアで出現する場合には Jaccard(A,B) = 1
  • 2つの単語が、一度もペアで出現しない場合には Jaccard(A,B) = 0

という性質があります。

集合同士の類似度を計算する方法として、よく引き合いに出されるその他の手法に
Dice係数Simpson係数がありますが、詳細は下記を参照ください。

Jaccard係数、Dice係数、Simpson係数

共起性の境界の設計

見落としがちな点に、何を以って共起したとみなすかを設計する、という問題があります。
今回の単語共起の例で言えば

  1. ある2つの単語のペアが、1つの文書の中に含まれていれば共起である
  2. ある2つの単語のペアが、1つのの中に含まれていれば共起である

もう少しmezzoには
3. ある2つの単語のペアが、1つのパラグラフの中に含まれていれば共起である

かなり範囲を狭めて、
4. ある2つの単語のペアが、隣り合って出現していれば共起である

などです。

今回は、
2.ある2つの単語のペアが、1つのの中に含まれていれば共起である
とします。

私見ですが、文書に混合ユニグラムモデルを仮定した場合は 文書の単位、トピックモデルを仮定した場合はパラグラフ単位、係り受け解析等に利用する場合はもしくは隣り合いに境界を設定すると良いのではないかと思います。

実装

ここからが実装の解説です。

利用ライブラリ

下記に記載したライブラリを利用しています。
勿論今回主となるのは Spark および NetworkX です。

import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

from natto import MeCab
from sklearn.feature_extraction.text import CountVectorizer

import pyspark
from pyspark import SQLContext, sql
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

単語文書行列の生成

scikit-learnのVectorizerを利用しています。
泥臭い前処理をしているため、省略しますが、
書きなぐったサンプルコードは記事の末尾に記載してあるため、そちらをご参照ください。

vectorizer = CountVectorizer(max_df=0.05, min_df=2, max_features=100)
X = vectorizer.fit_transform(documents)
features = vectorizer.get_feature_names()
# (略)

共起度計算

ここから、今回抽出した単語の、考え得るすべての組み合わせについてJaccard係数を計算し、重み付けを行います。

デカルト積を利用した組み合わせ列挙

まずは単語A単語B単語A単語Cなど、考え得る組み合わせを全て列挙します。

SQLに慣れている方にはどうという事のない操作かと思いますが、
単語文書行列の自身とのデカルト積を取ってから、不要なものを排除します。
結合条件にPKにあたる単語の文字列比較、sdf1.word_1 < sdf2.word_2 を行っているのがミソで、
これにより 単語A単語B という行のみ残し、組み合わせが重複する 単語B単語A という行を省いています。また、ついでに 単語A単語A という同じ要素の組み合わせも排除しています。

joined = sdf1.join(sdf2, sdf1.word_1 < sdf2.word_2)

PySparkによるJaccard係数の並列計算

前節で作成した単語の全組み合わせに対して、Jaccard係数を計算します。
作法としてもっと良い書き方もあるかもしれませんが、Jaccard係数の計算に必要な情報が1行に全て含まれているため、各行に対して並列で計算を行います。

前述の通り、今回はある単語aが出現する文書の集合Aと、ある単語bが出現する文書の集合Bに対してJaccard(A, B) を計算します。

Intersect = sum([min(x[c +'_1'], x[c + '_2']) for c in colnames])
Union = sum([min(x[c +'_1'], x[c + '_2']) for c in colnames])
を計算し、 Intersect / Union します。

最後に filter しているのは、
データの増加に比例して計算結果を集約するReducerに負荷が掛かりがちなので、
Mapperのノード(グラフのノードとは全く関係ありません)の中で、共起性が極めて低いものを省いています。

result = joined.rdd.map(lambda x: (
	x["word_1"],
	x["word_2"],
	float(sum([min(x[c +'_1'], x[c + '_2']) for c in colnames])) /
	float(sum([max(x[c +'_1'], x[c + '_2']) for c in colnames]))
	)).filter(lambda x: x[2] > 0.01).collect()

ネットワーク構築

さて、Jaccard係数の計算が完了したため、
ようやくグラフの構築と可視化を行います。

グラフ操作

下記の3stepでグラフを構築します

  1. 単語をグラフのノードとして追加
  2. Jaccard係数が閾値を超えたエッジのみ追加
  3. 2の結果として、どのノードともつながらず孤立してしまったノードを削除
# build network 
G = nx.Graph()
G.add_nodes_from(features, size=10)

edge_threshold = 0.1
for i, j, w in result:
	if w > edge_threshold:
		G.add_edge(i, j, weight=w)

isolated = [n for n in G.nodes if len([ i for i in nx.all_neighbors(G, n)]) == 0]
for n in isolated:
	G.remove_node(n)

グラフ描画

このあたりはお好みで変更してください。

plt.figure(figsize=(15,15))
pos = nx.spring_layout(G) # k = node間反発力

# nodeの大きさ
node_size = [d["size"] * 20 for (n,d) in G.nodes(data=True)]
nx.draw_networkx_nodes(G, pos, node_color="b",alpha=0.3, node_size=node_size)

# 日本語ラベル
nx.draw_networkx_labels(G, pos, fontsize=14, font_family="Hiragino Kaku Gothic Pro", font_weight="bold")

# エッジの太さ調節
edge_width = [ d["weight"]*20 for (u,v,d) in G.edges(data=True)]
nx.draw_networkx_edges(G, pos, alpha=0.4, edge_color="c", width=edge_width)

plt.axis('off')
plt.show()

総括

今回は解説記事を書くために可視化を行いました。
分析そのものが自己目的化しており、実務上は大変よろしくないですが、それでもいくつかの考察・仮説が得られます。

  • データ分析はもっぱら過去に起こった事象の原因や理由の把握に使われている?

人工知能が世間を賑わせつつも、forecastに応用した事例は相対的には少数派という事かもしれません。

  • 何らかの組織の代表は利害調整のための条件交渉をしばしば行っている?
  • 記者会見では目標達成などポジティブな内容が多い?

あくまで憶測交じりの仮説でしかないため、このような可視化の行い得られたヒントに基いて、
適宜元のデータや、別の参考資料をあたって裏付けを行っていくのが良いでしょう。

また、一般的な経済用語も多いため、サンプル文書集合に広く分布する単語の刈り込みを行う事も必要でしょう。

非構造データの大雑把な要約や、調査で行き詰った際の一つの手段として利用するのが良いかもしれません。

スクリプト

association.py
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

import mojimoji as mj
from natto import MeCab
from sklearn.feature_extraction.text import CountVectorizer

import pyspark
from pyspark import SQLContext, sql
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

tagger = MeCab('-d /usr/local/lib/mecab/dic/mecab-ipadic-neologd')

ng_noun = ["これ", "よう",  "こと", "", "もの", "それ", "とき"] # お好みで

appName = 'association'
conf = pyspark.SparkConf().setAppName(appName).setMaster('local[4]').set("spark.executor.cores", "2")
sc = pyspark.SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)


def _sentence2bow(sentence):
  """
  文を形態素解析してBagOfWordsに変換
  @param sentence: text
    自然言語の文
  @return bag: list
    語形変化が修正された単語のリスト
  """
  bag = []
  # e.g. 動詞:surface="滑れ", feature="動詞,自立,*,*,一段,未然形,滑れる,スベレ,スベレ"
  for node in tagger.parse(sentence, as_nodes=True):
    features = node.feature.split(",")
    if features[0] == "名詞":
      noun = mj.zen_to_han(node.surface.decode('utf-8')).encode('utf-8')
      if noun not in ng_noun:
        bag.append(node.surface)

  # 文書中の重複はまとめてしまう
  return list(set(bag))


file = "sample.txt"
df = pd.read_csv(file, delimiter='\t', names=["URL", "Text"],
  dtype = {'URL':'object', 'Text':'object'})

documents = []
for i, row in df.iterrows():
  documents.append(' '.join(_sentence2bow(row['Text'])))

vectorizer = CountVectorizer(max_df=0.5, min_df=2, max_features=100)
X = vectorizer.fit_transform(documents)
features = vectorizer.get_feature_names()

colnames = ['doc' + str(i) for i in range(0, X.shape[0])]

index = 'word'
pdf = pd.DataFrame(X.T.toarray())
pdf[index] = features

def _createDataFrame(df, colnames, index):
  idx = str(index)
  col = [col + '_' + idx for col in colnames]
  fields = [StructField(field_name, IntegerType(), True) for field_name in col]
  fields.append(StructField("word" + "_" + idx , StringType(), True))
  sdf = sqlContext.createDataFrame(pdf, StructType(fields))
  return sdf

sdf1 = _createDataFrame(pdf, colnames, 1)
sdf2 = _createDataFrame(pdf, colnames, 2)

joined = sdf1.join(sdf2, sdf1.word_1 < sdf2.word_2)

result = joined.rdd.map(lambda x: (
	x["word_1"],
	x["word_2"],
	float(sum([min(x[c +'_1'], x[c + '_2']) for c in colnames])) /
	float(sum([max(x[c +'_1'], x[c + '_2']) for c in colnames]))
	)).filter(lambda x: x[2] > 0.01).collect()

# build network 
G = nx.Graph()
G.add_nodes_from(features, size=10)

# edgeの追加
edge_threshold = 0.15
for i, j, w in result:
	if w > edge_threshold:
		G.add_edge(i, j, weight=w)

# 孤立したnodeを削除
isolated = [n for n in G.nodes if len([ i for i in nx.all_neighbors(G, n)]) == 0]
for n in isolated:
	G.remove_node(n)

plt.figure(figsize=(20,20))
pos = nx.spring_layout(G, k=0.3) # k = node間反発係数

# nodeの大きさ
node_size = [d["size"]*50 for (n,d) in G.nodes(data=True)]
nx.draw_networkx_nodes(G, pos, node_color="b",alpha=0.3, node_size=node_size)

# 日本語ラベル
nx.draw_networkx_labels(G, pos, fontsize=14, font_family="Hiragino Kaku Gothic Pro", font_weight="bold")

# エッジの太さ調節
edge_width = [ d["weight"]*20 for (u,v,d) in G.edges(data=True)]
nx.draw_networkx_edges(G, pos, alpha=0.4, edge_color="c", width=edge_width)

plt.axis('off')
plt.show()

26
34
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
26
34

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?