概要
BigQueryに保存されたtweetを形態素解析したい。BigQuery単体では形態素解析出来ないし、mecabの新語辞書を使ったりも出来ないのでdataflowを利用することで形態素解析を行う。
twitterでは、常に最新の言葉が使われる。そのためmecabの辞書は neologd
を利用して最新の状態の言葉で形態素解析ができるようにする。
通常のdataflowのサンプルと異なるのはmecabで使う辞書ファイルをどのように配布するかという問題だ。今回は、パッケージ等は作らなず、インスタンスが生成された後、GCSにおいたmecabの辞書ファイルを読んでいる。
今回はtwitterで使われる人名は誰が多いのかをサンプルとして調べることにする。
shibacow@xxxx~$ mecab -d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd
初音ミクは歌う
初音ミク 名詞,固有名詞,人名,一般,*,*,初音ミク,ハツネミク,ハツネミク
は 助詞,係助詞,*,*,*,*,は,ハ,ワ
歌う 動詞,自立,*,*,五段・ワ行促音便,基本形,歌う,ウタウ,ウタウ
EOS
の
初音ミク 名詞,固有名詞, **人名** ,一般,*,*,初音ミク,ハツネミク,ハツネミク
に様に、mecabで人名と判断されるものを抜き出して人名を数える。neologdの人名辞書、例えば安倍晋三を人名として判断しないなど、ちょっと挙動がわからないところがある。
手順
- neologdの辞書を用意
- 生成したneologdの辞書をGCSに設置
- dataflowの実行
dataflowを利用して形態素解析を行う
今回作ったサンプルプログラムはこちらのgithubに公開している。
mecab-neologd の準備
こちらのサイトを参考に、mecab-neologdの用意をする。
見たところ、 pip でいれたmecab-python3
をそのまま使い、辞書の場所を、
mecab = MeCab.Tagger('-r/dev/null -d 辞書ファイルの場所')
で指定すれば良いようだ。
GCSにneologdの辞書を置く
GCSのバケットを生成し、そこ先程作った辞書を置く。
私は次のファイルをおいた。
char.bin
dicrc
left-id.def
matrix.bin
pos-id.def
rewrite.def
right-id.def
sys.dic
unk.dic
dataflowの実行
次のrun.shを作り実行する
#!/bin/bash
export PROJECT=xxxxxxx <- あなたのプロジェクトID
export BUCKET=xxxxxxxxxx <- 辞書ファルをおいたり実行結果を入れたりするバケット
export REGION=us-central1
export RUNNER=DataflowRunner
export DT=`date "+%s"`
#export MODE=COST_OPTIMIZED
export MODE=SPEED_OPTIMIZED
export INPUT=$PROJECT:user_bq_dataset.user_bq_table
export MACHINE_TYPE=n1-highmem-16
#export MACHINE_TYPE=n1-standard-2
python3 mecab_count.py \
--region $REGION \ <- リージョン BQのデータがUSの場合は、USのリージョンを指定する。
--input $INPUT \ <- BQのテーブル project_id:dataset.table_name のフォーマット
--output gs://$BUCKET/results_$DT/outputs \ <- GSのバケット
--runner $RUNNER \ <- DataflowRunner
--project $PROJECT \ <- Project name
--flexrs_goal $MODE \ <- COST優先かスピード優先
--dicpath gs://$BUCKET/dic/mecab-ipadic-neologd \ <- GCSに設置した辞書の場所
--machine_type $MACHINE_TYPE \ <- インスタンスのタイプ
--temp_location gs://$BUCKET/tmp/ \ <- tmpファイルの場所 GCSの場所を指定する
--setup_file ./setup.py <- setup.pyにロードするライブラリを指定(mecabとか)
コードの説明
エントリーポイントになる mecab_count.py
は単純に from mecab_word_count import mecab_word_count
を呼び出すためのもの(後述するように、main文で、superが使えなかったためモジュールを作った)。
コードはこちら
もともとのapache beam の サンプル word_count の word_count.py を拡張した。
ポイント
setup
DoFnでレコードのエレメントごとに呼び出されるのは、process
関数になる。 setup
は、インスタンスの生成時、もしくは少数しか呼ばれないためこの時点で、gcsから辞書のロードを行う。また、mecabのインスタンスの生成も行う。
processでやるとエレメントごとに呼ばれる為、大量に呼ばれてしまうが、setupであれば少数しか呼ばれない。
google cloud storage ライブラリではなく、beam付属のGcsIOを使う。
mecabの辞書ファイルをGCSからダウンロードする時に、ふつうであれば、 google-cloud-storage
ライブラリを使うがどうもbeamのライブラリと相性が悪いようで実行時にエラーになる。
そのため、
gcs = gcsio.GcsIO()
のように、 apache_beam.io.gcp.gcsio
をつかって、GCSから辞書ファイルをローカルにロードする。
gcs = gcsio.GcsIO()
src=gcs.open(gcs_path)
with open(dstpath,'wb') as outf:
outf.write(src.read())
のようにすれば、GCSからデータをロードして、ローカルに保存してくれる。
mecab前のインターバル
time.sleep(120)
path = os.getcwd()+os.sep+'dic/mecab-ipadic-neologd'
self.mecab = MeCab.Tagger('-r/dev/null -d {}'.format(path))
setupの中で、time.sleepを呼んでいるのは、GCSでのロードの直後に、
self.mecab = MeCab.Tagger('-r/dev/null -d {}'.format(path))
を呼ぶとインスタンスが大量に立ち上がったときに、エラーが起こったためのワークアラウンド(正しい処置ではない気がする)。
__parse
ストップワードとして、
ex_word=(u'RT',u'さん',u'様',u'ちゃん',u'くん',u'笑',u'氏',u'君',u'ツイ',u'さま')
を使った(人名としてmecabから出力されるが人名ではないため)、
実行結果
tweetされた年と、人名とペアにして次のような出力がGCSに保存される。
2018_初音ミク: 34508
コード部分はこちらになる。
# code
for e in names:
elm=u"{}_{}".format(created_at.year,e)
result.append(elm)
年ごとの多い順のランキングを作ると次のようになる。
ローソンと、前澤以外は人名ではないような気がする。ジュジュンは人名かな?SixTONESはグループ名かな。
人気人名を2018,2019,2020,2021年で500名までを列挙した。
人気人名.csv
rank | 2018 | 2019 | 2020 | 2021 |
---|---|---|---|---|
1 | …。 | 譲 | 譲 | ローソン |
2 | 譲 | …。 | 求 | 譲 |
3 | 求 | ローソン | …。 | 求 |
4 | ローソン | 求 | ローソン | 原 |
5 | LINE | 金 | 金 | …。 |
6 | な! | な! | な! | トク |
7 | 金 | LINE | リプ | 金 |
8 | ジェジュン | 万名 | LINE | 前澤 |
9 | チキ | ジェジュン | SixTONES | な! |
10 | かな | リプ | かな | LINE |
例えば、初音ミクだと次のような順位になる
shibacow@dataflow-twitter:~/prog/dataflow/mecab_result$ ./collect_name.py --limit 50000 --name 初音ミク
INFO:root:y=2018 rank=270 data=year=2018 body=初音ミク cnt=34508
INFO:root:y=2019 rank=201 data=year=2019 body=初音ミク cnt=45339
INFO:root:y=2020 rank=138 data=year=2020 body=初音ミク cnt=63762
INFO:root:y=2021 rank=122 data=year=2021 body=初音ミク cnt=3704
2021年は1月20日まで。徐々にランクが上がっている。
dataflowを使う利点
- PCollectionを使うことで、分散処理を透過的に実行可能
- インスタンスを増やせば、処理数も増える。
- 一台のマシンで不可能でも、複数台並べてスケールできる
javaを使った同等の処理との比較
完全に同一ではないが、こちらの記事のようにjava-dataflow使って、tweetを分析をmecab-neologd-kuromoji で行っている。
java-dataflowでのCPU利用率
javaのdataflowはCPUの利用率が安定している。
python-dataflowでのCPU利用率
python-dataflowではCPU利用率にゆらぎがある
効率的にCPUを使うという点ではjavaのほうが分がありそうだが、pythonでは処理のロジックがシンプルなので、コードの書きやすさ、効率性のどちらを選ぶかを決めれば良いと思う。
はまった点
requestments.txtを指定するとそこからワーカーが動かない
setup.pyに移行すると上手く動いた。
main文でsuperが上手く出来ないため、別モジュールにきりだした
例えば、word count examle
の WordExtractingDoFn
に対して、
class WordExtractingDoFn(beam.DoFn):
def __init__(self,foobar):
super(WordExtractingDoFn, self).__init__()
などの拡張しても上手く実行されない。
AttributeError: Can't get attribute 'WordExtractingDoFn
などとエラーが出る。
理由は、こちらにあるが、
Beam does not support super() calls in main module on Python 3.
dataflowのpython3でmainでのsuperの使用が出来ないからのようだ。
bigqueryのデータを使う場合は、dataflowのインスタンスを立ち上げるリージョンに気をつけよう
bigqueryのデータは、us
にあるそうだ。そのため、そのデータを、asiaでdataflowのインスタンスを立ち上げて利用すると、データの移動が発生するそのデータの移動でコストが結構かかる。dataflowをus-central1
で利用した方がコストが安いようだ。
下に添付した画像のように、転送コストの方がCPU利用コストより高かった。
Tips
FlexRS を活用するとコストカットできる
dataflowでは、最大6時間ほど持って実行する代わりに、コストが安くなるFlexRSのCOST_OPTIMIZED
が使える。
FlexRSに関してはこちらを参照
私の実験では、7億件のtweetの処理通常であれば、21$ かかっていたが、FlexRSの COST_OPTIMIZED
モードであれば、 4.72$ で済んだ。通常であれば、全てレギュラーインスタンスを使うが、COST_OPTIMAIZED
モードであれば、プリエンティティブVMを活用するため。