Python
Elasticsearch
TwitterAPI
kibana
StreamingAPI

ElasticsearchとKibanaを使ってTwitterのトレンドワードを可視化してみた

More than 1 year has passed since last update.

大久保です。

最近、会社でElasticsearch+Kibana+Fluentdという定番の組み合わせを使ってログ解析する機会があったので、ついでにいろいろ勉強してみました。

触ってみておもしろかったのが、Elasticsearchがログ解析だけじゃなくてちょっとしたKVSのようにも振る舞えることです。

ElasticsearchはKibanaと組み合わせることで、もっといろいろおもしろいことできそう感あります!!
本記事では、その一例としてTwitterのトレンドワードをリアルタイムに集計するプログラムを組んでみました。

Kobito.ZqzBFh.png
完成形:トレンドワードごとのツイート数をグラフ化

#開発環境と各種ミドルウェアのバージョン

ローカルで確認できればよかったので開発環境はMacです。

Java

ElasticsearchはJava製なのでJavaのバージョンを確認。

$ java -version
java version "1.8.0_40"
Java(TM) SE Runtime Environment (build 1.8.0_40-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)

Elasticsearch

Elasticsearchは最新版の2.2.1でよかったので、バージョン指定せずにそのままbrewでインストール。

$ brew install elasticsearch
==> Downloading https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz
######################################################################## 100.0%
==> Caveats
Data:    /usr/local/var/elasticsearch/elasticsearch_user_name/
Logs:    /usr/local/var/log/elasticsearch/elasticsearch_user_name.log
Plugins: /usr/local/Cellar/elasticsearch/2.2.1/libexec/plugins/
Config:  /usr/local/etc/elasticsearch/
plugin script: /usr/local/Cellar/elasticsearch/2.2.1/libexec/bin/plugin

To have launchd start elasticsearch at login:
  ln -sfv /usr/local/opt/elasticsearch/*.plist ~/Library/LaunchAgents
Then to load elasticsearch now:
  launchctl load ~/Library/LaunchAgents/homebrew.mxcl.elasticsearch.plist
Or, if you don't want/need launchctl, you can just run:
  elasticsearch
==> Summary
🍺  /usr/local/Cellar/elasticsearch/2.2.1: 57 files, 31.3M, built in 23 seconds

動作確認

Elasticsearchをフォアグラウンドで立ち上げる。

$ elasticsearch

別ターミナルでリクエストを投げて動作確認。下のような感じで返ってくればOKだと思います。

$ curl http://localhost:9200/
{
  "name" : "Astron",
  "cluster_name" : "elasticsearch_user_name",
  "version" : {
    "number" : "2.2.1",
    "build_hash" : "d045fc29d1932bce18b2e65ab8b297fbf6cd41a1",
    "build_timestamp" : "2016-03-09T09:38:54Z",
    "build_snapshot" : false,
    "lucene_version" : "5.4.1"
  },
  "tagline" : "You Know, for Search"
}

Notice:
brewを使ったElasticsearchのインストールでいろいろ怒られるときは

$ brew update

とかやるといいかもです。自分は結構ハマりました。。

kuromoji プラグイン

Elasticsearchの検索に日本語を対応させるためにkuromojiというプラグインを入れます。
Elasticsearchをbrewで入れた場合、そのままではpluginコマンドが使えないようなのでパスを通してあげる必要があります。

plugin script: /usr/local/Cellar/elasticsearch/2.2.0_1/libexec/bin/plugin

インストール時に丁寧にもplugin scriptを教えてくれるので、そこのbinのPATHを.bash_profileにでも追加してあげます。

# add for Elasticsearch 2.2
export PATH="/usr/local/Cellar/elasticsearch/2.2.1/libexec/bin:$PATH"

.bash_profileの再読み込み

source .bash_profile

analysis-kuromojiのインストール&確認。

$ plugin install analysis-kuromoji
$ curl -X GET 'http://localhost:9200/_nodes/plugins?pretty'
{
  "cluster_name" : "elasticsearch_user_name",
  "nodes" : {
      ...
      "plugins" : [ {
        "name" : "analysis-kuromoji",
        "version" : "2.2.1",
        "description" : "The Japanese (kuromoji) Analysis plugin integrates Lucene kuromoji analysis module into elasticsearch.",
        "jvm" : true,
        "classname" : "org.elasticsearch.plugin.analysis.kuromoji.AnalysisKuromojiPlugin",
        "isolated" : true,
        "site" : false
      } ],
      ...
  }
}

問題なくインストールされていることが確認できました。

Kibana

kibanaも最新バージョンでよかったので4.4を入れました。
Kibana4では内部でHTTPサーバを起動することでスタンドアローンに動かせるようになってるらしい。(ネット上にはKibana3の記事多くて最初混乱した。。)

適当なディレクトリで、以下を実行。

$ curl -O https://download.elastic.co/kibana/kibana/kibana-4.4.2-darwin-x64.tar.gz
$ tar zxvf kibana-4.4.2-darwin-x64.tar.gz
$ mv kibana-4.4.2-darwin-x64 kibana
$ ./kibana/bin/kibana 

http://localhost:5601にアクセスして以下の画面が表示できれば成功です。
Kobito.lYucZY.png

Python

今回はツイートの取得&整形をPythonで行いました。(ツイートの取得だけならElasticsearchのプラグインがあるのでそもそもコードを書く必要もなさそうです)

$ python -V
Python 3.5.1 :: Anaconda 2.4.1 (x86_64)
$ pip -V
pip 8.1.1 from /Users/user_name/anaconda/lib/python3.5/site-packages (python 3.5)

今回はTwitter Stream APIを使ってツイートの取得を行うので、それに対応したPythonのTwitterライブラリのを入れます。

$ pip install twitter

ツイートデータの保存先がElasticsearchになるので、PythonからElasticsearchにリクエストを送るためのライブラリを入れます。

$ pip install elasticsearch

#ツイートの収集

ツイートの収集にあたって、今回は下記のようなフローで動くプログラムを書いていきました。
1. TwitterのGET trends/placeを使って日本のトレンドワードを取得
2. 取得したトレンドワードをもとにPOST statuses/filterでストリーム接続して、ツイートをリアルタイム取得
3. 取得したツイートを整形してひたすらElasticsearchにぶち込む
4. 5分経ったらまた新しいトレンドワードを取得するために1.へ戻る

下記が実際のコードです。

treand_stream.py
from pytz import timezone
from dateutil import parser
from datetime import datetime
from elasticsearch import Elasticsearch
from twitter import Twitter, TwitterStream, OAuth
from threading import Timer, get_ident

# https://apps.twitter.com/ でTwitterアプリを登録して取得したOAuthキーを設定。
OAUTH_INFO = dict(
    token="123456789-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
    token_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
    consumer_key="XXXXXXXXXXXXXXXXXX",
    consumer_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")

STREAM_INFO = dict(
    timeout=600,
    block=False,
    heartbeat_timeout=600) # デフォルトだと90秒でストリーム維持のためのheartbeatが飛んで来るので10分に設定

JST = timezone('Asia/Tokyo')
WOEID_JP = 23424856 # 日本のWOEID

class TwitterTrendStream():

    def __init__(self):
        self.__current_thread_ident = None
        self.__oauth = OAuth(**OAUTH_INFO)
        self.__es = Elasticsearch()

    def __fetch_trands(self, twitter):
        response = twitter.trends.place(_id=WOEID_JP)
        return [trend["name"] for trend in response[0]["trends"]]

    def __fetch_filter_stream(self, twitter_stream, track_list):
        track = ",".join(track_list)
        return twitter_stream.statuses.filter(track=track)

    def run(self):
        self.__current_thread_ident = get_ident() # 現在の実行スレッドIDを登録
        Timer(300, self.run).start()              # 5分後に新たなスレッドを開始

        twitter = Twitter(auth=self.__oauth)
        twitter_stream = TwitterStream(auth=self.__oauth, **STREAM_INFO)

        trend_list = self.__fetch_trands(twitter)
        tweet_iter = self.__fetch_filter_stream(twitter_stream, trend_list)

        for tweet in tweet_iter:
            if "limit" in tweet: # 取得上限超えた時にくるLimit Jsonは無視
                continue

            if self.__current_thread_ident != get_ident(): # 新たなスレッドが立ち上がったら現在のストリームを終了させる
                return True
            for trend in trend_list:
                if trend in tweet['text']:
                    doc = {
                        'track': trend,
                        'text': tweet['text'],
                        'created_at': str(parser.parse(tweet['created_at']).astimezone(JST).isoformat())
                    }
                    self.__es.index(index="testindex", doc_type='tweet', body=doc)

if __name__ == '__main__':
    TwitterTrendStream().run()

Elasticsearchが起動した状態で実行することで、取得したトレンドを含むツイートをひたすらElasticsearchに溜めていってくれます。

#データのマッピングの設定

ただ、上記コードをそのまま実行してしまうとうまくマッピングが生成されません。具体的には下記のようになります。

$ curl http://localhost:9200/testindex/_mapping?pretty
{
  "testindex" : {
    "mappings" : {
      "tweet" : {
        "properties" : {
          "created_at" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          },
          "text" : {
            "type" : "string"
          },
          "track" : {
            "type" : "string"
          }
        }
      }
    }
  }
}

ElasticsearchのDynamic Mappingによって自動生成されたマッピングJSONでは、"text"と"track"のpropertiesがstringのみになってしまっています。
このままKibanaで表示するとトレンドワードごとに集計したいはずのtrackが、日本語一文字ずつに区切られてしまったり、そもそも検索がうまくできなかったりして使い物になりません。
そのため、"track" には "index" : "not_analyzed"を、"text" には "analyzer" : "japanese"を追加する必要があります。
参考:Elasticsearch + Kibanaで日本語検索の続き

また、一度生成されたマッピングは基本的に更新が難しいため、一度削除して新しく作りなおす必要があります。

具体的に、以下のコマンドで作りなおすことができました。

$ curl -X DELETE http://localhost:9200/testindex?pretty
{
  "acknowledged" : true
}
$ curl -XPUT localhost:9200/testindex/ -d '
{
  "settings":{
     "index":{
        "analysis":{
           "tokenizer" : {
               "kuromoji" : {
                  "type" : "kuromoji_tokenizer"
               }
           },
           "analyzer" : {
               "japanese" : {
                   "type" : "custom",
                   "tokenizer" : "kuromoji"
               }
           }
        }
     }
  },
  "mappings":{
    "tweet":{
      "properties":{
        "created_at" : {
          "type" : "date",
          "format" : "strict_date_optional_time||epoch_millis"
        },
        "text" : {
          "type" : "string",
          "analyzer": "japanese"
        },
        "track": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}'

ちなみに、Elasticsearchで日付をDynamic Mappingしたいのであれば、公式に載ってある対応する日付フォーマットに合わせる必要があります。今回は出力する時刻をISO表記にすることで対応しました。

#動作確認

Elasticsearch、Kibanaを実行させた状態で、Pythonを実行します。

http://localhost:5601 にアクセスして表示したいインデックスを登録。上記のコードそのままだと今回はtestindexです。
登録したインデックスを確認すると、trackのanalyzedのところにチェックが入っていないことが確認できます。これはtrackにnot_analyzedを設定してるためです。

skitched-20160321-155852.png

Discoverでツイートがどんどん流れてきているのが確認できるかと思います。

Kobito.nNbLwN.png

下が今つぶやかれてるツイートをトレンドワードごとにリアルタイムに集計してKibanaでグラフ化したものになります。

Kobito.ZqzBFh.png

昼の14時から15時までの1時間で収集していたのですが、この日がちょうどTwitterの10周年だったこともあって「#LoveTwitter」のツイート数がすごいことになってますね。
※対象のツイート数が多すぎるため、何度もTwitter Streamの出力制限にひっかかっています。そのため、つぶやかれているツイートを全て取得できているわけではありません。

ちなみに、3位になってる「#そこまで言って委員会NPは打ち切りに」のみで絞った時系列つぶやき数は下のようになってます。

Kobito.NFHQK1.png

おそらくTV番組の盛り上がりとかが関係してるんでしょうけど分単位ではいまいち拾える情報はなさそうですね。。

#まとめというか感想

以上、ElasticsearchとKibanaを使ってTwitterのトレンドワードを可視化してみました。
ElasticsearchとKibanaを活用することで、すごく身軽に素早くデータ解析が行えるのを実感しました!
もしこれらのツールを使わなかったとしたら、DBを用意してWebサーバ用意してフロントも作って、、ちょっとしたWebサービスを作るぐらい大掛かりになってたと思います。
今回書いたコード自体はコアとなるPythonの数十行のみでかなりスリムに実現できました。(Python自体は最近書き始めたばかりなので変なところあったらコメントください。。)
ただ、Elasticsearch自体の学習コストはなかなか高そうだなとも思いました。
マッピングやライブラリの導入に関してはなかなか日本語の情報が落ちてないこともあり、設定で詰まることがしばしば。また、Elasticsearchは集計のためにかなりのメモリを食うので、ちゃんとしたサービスとして活用するときはJSONの設計も含めたチューニングが必要そうだと感じました。。

参考:
[Python] Python3でTwitterの検索APIを利用する
threading — スレッドベースの並列処理
Python スレッドをやってみる
Elasticsearch + Kibanaで日本語検索の続き
ElasticSearchのアナライザの設定

本記事はSepteni Engineer's Blogの転載です。