LoginSignup
11
9

More than 1 year has passed since last update.

apache-beam-pythonを用いてGCP dataflowでmecabを使い形態素解析をする話

Last updated at Posted at 2021-01-24

概要

BigQueryに保存されたtweetを形態素解析したい。BigQuery単体では形態素解析出来ないし、mecabの新語辞書を使ったりも出来ないのでdataflowを利用することで形態素解析を行う。
twitterでは、常に最新の言葉が使われる。そのためmecabの辞書は neologd を利用して最新の状態の言葉で形態素解析ができるようにする。

通常のdataflowのサンプルと異なるのはmecabで使う辞書ファイルをどのように配布するかという問題だ。今回は、パッケージ等は作らなず、インスタンスが生成された後、GCSにおいたmecabの辞書ファイルを読んでいる。

今回はtwitterで使われる人名は誰が多いのかをサンプルとして調べることにする。

shibacow@xxxx~$ mecab -d /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd
初音ミクは歌う
初音ミク        名詞,固有名詞,人名,一般,*,*,初音ミク,ハツネミク,ハツネミク
は      助詞,係助詞,*,*,*,*,は,ハ,ワ
歌う    動詞,自立,*,*,五段・ワ行促音便,基本形,歌う,ウタウ,ウタウ
EOS

初音ミク        名詞,固有名詞, **人名** ,一般,*,*,初音ミク,ハツネミク,ハツネミク

に様に、mecabで人名と判断されるものを抜き出して人名を数える。neologdの人名辞書、例えば安倍晋三を人名として判断しないなど、ちょっと挙動がわからないところがある。

手順

  1. neologdの辞書を用意
  2. 生成したneologdの辞書をGCSに設置
  3. dataflowの実行

dataflowを利用して形態素解析を行う

今回作ったサンプルプログラムはこちらのgithubに公開している。

mecab-neologd の準備

こちらのサイトを参考に、mecab-neologdの用意をする。
見たところ、 pip でいれたmecab-python3をそのまま使い、辞書の場所を、

mecab = MeCab.Tagger('-r/dev/null -d 辞書ファイルの場所')

で指定すれば良いようだ。

GCSにneologdの辞書を置く

GCSのバケットを生成し、そこ先程作った辞書を置く。

私は次のファイルをおいた。

char.bin
dicrc
left-id.def
matrix.bin
pos-id.def
rewrite.def
right-id.def
sys.dic
unk.dic

dataflowの実行

次のrun.shを作り実行する

#!/bin/bash
export PROJECT=xxxxxxx <- あなたのプロジェクトID
export BUCKET=xxxxxxxxxx <- 辞書ファルをおいたり実行結果を入れたりするバケット

export REGION=us-central1
export RUNNER=DataflowRunner
export DT=`date "+%s"`
#export MODE=COST_OPTIMIZED
export MODE=SPEED_OPTIMIZED
export INPUT=$PROJECT:user_bq_dataset.user_bq_table
export MACHINE_TYPE=n1-highmem-16
#export MACHINE_TYPE=n1-standard-2
python3 mecab_count.py \
        --region $REGION \ <- リージョン BQのデータがUSの場合は、USのリージョンを指定する。
        --input  $INPUT \ <- BQのテーブル project_id:dataset.table_name のフォーマット 
        --output gs://$BUCKET/results_$DT/outputs \ <- GSのバケット
        --runner $RUNNER \ <- DataflowRunner
        --project $PROJECT \ <- Project name
        --flexrs_goal $MODE \ <- COST優先かスピード優先 
        --dicpath gs://$BUCKET/dic/mecab-ipadic-neologd \ <- GCSに設置した辞書の場所
        --machine_type  $MACHINE_TYPE \ <- インスタンスのタイプ
        --temp_location gs://$BUCKET/tmp/ \ <- tmpファイルの場所 GCSの場所を指定する
        --setup_file ./setup.py <- setup.pyにロードするライブラリを指定(mecabとか)

コードの説明

エントリーポイントになる mecab_count.py は単純に from mecab_word_count import mecab_word_count を呼び出すためのもの(後述するように、main文で、superが使えなかったためモジュールを作った)。

コードはこちら

もともとのapache beam の サンプル word_count の word_count.py を拡張した。

ポイント

setup

DoFnでレコードのエレメントごとに呼び出されるのは、process 関数になる。 setup は、インスタンスの生成時、もしくは少数しか呼ばれないためこの時点で、gcsから辞書のロードを行う。また、mecabのインスタンスの生成も行う。

processでやるとエレメントごとに呼ばれる為、大量に呼ばれてしまうが、setupであれば少数しか呼ばれない。

google cloud storage ライブラリではなく、beam付属のGcsIOを使う。

mecabの辞書ファイルをGCSからダウンロードする時に、ふつうであれば、 google-cloud-storage ライブラリを使うがどうもbeamのライブラリと相性が悪いようで実行時にエラーになる。
そのため、

gcs = gcsio.GcsIO()

のように、 apache_beam.io.gcp.gcsio をつかって、GCSから辞書ファイルをローカルにロードする。

        gcs = gcsio.GcsIO()
        src=gcs.open(gcs_path)
        with open(dstpath,'wb') as outf:
          outf.write(src.read())

のようにすれば、GCSからデータをロードして、ローカルに保存してくれる。

mecab前のインターバル

    time.sleep(120)
    path = os.getcwd()+os.sep+'dic/mecab-ipadic-neologd'
    self.mecab = MeCab.Tagger('-r/dev/null -d {}'.format(path))

setupの中で、time.sleepを呼んでいるのは、GCSでのロードの直後に、

    self.mecab = MeCab.Tagger('-r/dev/null -d {}'.format(path))

を呼ぶとインスタンスが大量に立ち上がったときに、エラーが起こったためのワークアラウンド(正しい処置ではない気がする)。

__parse

ストップワードとして、

ex_word=(u'RT',u'さん',u'様',u'ちゃん',u'くん',u'笑',u'氏',u'君',u'ツイ',u'さま')

を使った(人名としてmecabから出力されるが人名ではないため)、

実行結果

tweetされた年と、人名とペアにして次のような出力がGCSに保存される。

2018_初音ミク: 34508

コード部分はこちらになる。

# code
   for e in names:
      elm=u"{}_{}".format(created_at.year,e)
      result.append(elm)

年ごとの多い順のランキングを作ると次のようになる。
ローソンと、前澤以外は人名ではないような気がする。ジュジュンは人名かな?SixTONESはグループ名かな。

人気人名を2018,2019,2020,2021年で500名までを列挙した。
人気人名.csv

rank 2018 2019 2020 2021
1 …。 ローソン
2 …。
3 ローソン …。
4 ローソン ローソン
5 LINE …。
6 な! な! な! トク
7 LINE リプ
8 ジェジュン 万名 LINE 前澤
9 チキ ジェジュン SixTONES な!
10 かな リプ かな LINE

例えば、初音ミクだと次のような順位になる

shibacow@dataflow-twitter:~/prog/dataflow/mecab_result$ ./collect_name.py --limit 50000 --name 初音ミク
INFO:root:y=2018 rank=270 data=year=2018 body=初音ミク cnt=34508
INFO:root:y=2019 rank=201 data=year=2019 body=初音ミク cnt=45339
INFO:root:y=2020 rank=138 data=year=2020 body=初音ミク cnt=63762
INFO:root:y=2021 rank=122 data=year=2021 body=初音ミク cnt=3704

2021年は1月20日まで。徐々にランクが上がっている。

dataflowを使う利点

  • PCollectionを使うことで、分散処理を透過的に実行可能
  • インスタンスを増やせば、処理数も増える。
  • 一台のマシンで不可能でも、複数台並べてスケールできる

javaを使った同等の処理との比較

完全に同一ではないが、こちらの記事のようにjava-dataflow使って、tweetを分析をmecab-neologd-kuromoji で行っている。

image.png

java-dataflowでのCPU利用率

javaのdataflowはCPUの利用率が安定している。

image.png

python-dataflowでのCPU利用率

python-dataflowではCPU利用率にゆらぎがある

image.png

効率的にCPUを使うという点ではjavaのほうが分がありそうだが、pythonでは処理のロジックがシンプルなので、コードの書きやすさ、効率性のどちらを選ぶかを決めれば良いと思う。

はまった点

requestments.txtを指定するとそこからワーカーが動かない

setup.pyに移行すると上手く動いた。

main文でsuperが上手く出来ないため、別モジュールにきりだした

例えば、word count examleWordExtractingDoFn に対して、


class WordExtractingDoFn(beam.DoFn):
   def __init__(self,foobar):
       super(WordExtractingDoFn, self).__init__()

などの拡張しても上手く実行されない。

AttributeError: Can't get attribute 'WordExtractingDoFn

などとエラーが出る。
理由は、こちらにあるが、

Beam does not support super() calls in main module on Python 3.

dataflowのpython3でmainでのsuperの使用が出来ないからのようだ。

bigqueryのデータを使う場合は、dataflowのインスタンスを立ち上げるリージョンに気をつけよう

bigqueryのデータは、us にあるそうだ。そのため、そのデータを、asiaでdataflowのインスタンスを立ち上げて利用すると、データの移動が発生するそのデータの移動でコストが結構かかる。dataflowをus-central1 で利用した方がコストが安いようだ。

下に添付した画像のように、転送コストの方がCPU利用コストより高かった。

image.png

Tips

FlexRS を活用するとコストカットできる

dataflowでは、最大6時間ほど持って実行する代わりに、コストが安くなるFlexRSのCOST_OPTIMIZED が使える。

FlexRSに関してはこちらを参照

私の実験では、7億件のtweetの処理通常であれば、21$ かかっていたが、FlexRSの COST_OPTIMIZEDモードであれば、 4.72$ で済んだ。通常であれば、全てレギュラーインスタンスを使うが、COST_OPTIMAIZED モードであれば、プリエンティティブVMを活用するため。

11
9
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
9