37
43

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Fluentd + BigQuery による Tweet データ収集

Posted at

はじめに

今回は、Twitter Streaming API で得られる全ツイート中の 1% の
日本語 Tweet のデータをログとして収集し、
Fluentd を用いて、BigQuery に格納するシステムを構築する。

以下に BigQuery と Fluentd を説明を記載する。

BigQuery とは

Google BigQuery とは、クラウド上で動作するデータウェアハウス。
以下の様な特徴を持っている。

  • テラバイト級のデータに対しても、高速に集計が可能。
  • ストレージ料金が安い (容量あたりの保存単価だとAWS S3 よりも安価)
  • データのインポートが容易 (バルクアップロード / ストリーム)。

細かい話は次のリンクを参照。
Googleの虎の子「BigQuery」をFluentdユーザーが使わない理由がなくなった理由 #gcpja

ありとあらゆるログデータを格納する先として非常に優秀。

Fluentd とは

Fluentd とは、ログ集計処理の一元化を目的としたツール。
性能や安定性にも優れている。

様々なログデータをリアルタイムに Fluentd に集め、
フィルタリング、一時貯蔵、ルーティング、簡易集計等の適切な処理を行った後、
各種ストレージ、DBなどに格納する処理などを一元化して行うことが可能。

各種プラグインで容易に拡張可能。
今回はTweetデータ収集用のプラグインと、BigQueryへのアウトプット用プラグインを利用する。

システム構築

作業環境

今回使用した環境は以下のとおり。

  • クライアントマシン:
    PC: MacBookPro Retina 13-inch Late 2013
    OS: Yosemite 10.10.3
    CPU: Core i5 2.6GHz
    Mem: 16GB

  • Fluentd 用サーバ
    AWS EC2
    AMI: Amazon Linux AMI 2015.03
    インスタンスタイプ: t2.micro
    ストレージ: General Purpose (SSD) 30 GB

なお、AWSの登録等は今回割愛する。

BigQuery 周りの設定

Google Cloud Platform のアカウント登録

Google Cloud Platform: BigQuery から無償体験版に登録する。
新規ユーザーの場合 $300 分の無償枠がおまけでついてくる。
登録にクレカ必須なので注意。

プロジェクト作成

プロジェクトを作成 をクリックする。
プロジェクト名に tweet-streaming-api-test を入力し、作成をクリック。

左側のメニューから ビッグデータ -> BigQuery をクリックし、
BigQueryの管理画面にアクセスする。

動作確認

COMPOSE QUERY をクリックして、下記のクエリをフォームに入力する。
サンプルデータとして入っている samples.shakespeare のデータを使う。

SELECT
    corpus
FROM
    publicdata:samples.shakespeare
GROUP BY
    corpus

RUN QUERY をクリックし、結果が出力されれば動作確認はOK。

50B25DAD-04F0-406A-B67E-D39EA077447A.png

CLIツールのインストール (bq コマンド)

新規テーブルの作成等、基本的な操作は CLI ツール (bq コマンド) から行うことができる。
できること一覧は bq-command-line-tool を参照。

Google Cloud SDK をインストールすれば、
CLIツール(bq コマンド) もまとめてインストールされる。

クライアントマシンで下記コマンドを実行する。

$ curl https://sdk.cloud.google.com | bash

途中で、展開場所を聞かれるので指定する (デフォルトだと実行ユーザの home 直下)。
また改善データをGoogleに送信するかを問われるので、Y/n 好きな方を選択する。
また、実行ファイルのパスを追加するかどうか問われるので、基本的に Y を選ぶ
(.bashrc の場所を問われるがデフォルトのままでOK)。

以下のコマンドを実行して、.bashrc を再読み込み。

$ source ~/.bashrc

次に、コンソールからGoogleアカウントの接続情報を登録する。
下記コマンドを実行する。

$ gcloud auth login

https://accounts.google.com/o/oauth2/auth?redirect_uri= …
というリンクが表示されるので、クライアントマシンのブラウザに入力し、
管理用アカウントでログインする。
アプリケーションの認証を行うと、認証コードが発行されるので、
コンソールに入力する。

次に、対象のプロジェクトを指定する。
tweet-streaming-api-test プロジェクトのプロジェクトIDを入力する。

$ gcloud config set project [PROJECT_ID]

bq コマンドの実行テスト。

$ bq show publicdata:samples.shakespeare
Table publicdata:samples.shakespeare

 Last modified                  Schema                  Total Rows   Total Bytes   Expiration
----------------- ------------------------------------ ------------ ------------- ------------
 21 Oct 21:27:07  |- word: string (required)              164656       6432064
                  |- word_count: integer (required)
                  |- corpus: string (required)
                  |- corpus_date: integer (required)

$ bq query "SELECT corpus FROM publicdata:samples.shakespeare GROUP BY corpus LIMIT 5;"
Waiting on bqjob_r6fb53bf9abfc8c9c_0000014d2dd82009_1 ... (0s) Current status: DONE
+---------------------+
| corpus              |
+---------------------+
| loverscomplaint     |
| merrywivesofwindsor |
| 3kinghenryvi        |
| coriolanus          |
| kinghenryviii       |
+---------------------+

Fluentd によるログ収集サーバの構築

新規AWS EC2 サーバの準備

以下の内容で新規インスタンスを作成する。

AMI: Amazon Linux AMI 2015.03
インスタンスタイプ: t2.micro
ストレージ: General Purpose (SSD) 30 GB

作成手順の詳細は今回は割愛する。
以下、作成したインスタンスにログインして作業。

ファイルディスクリプタの上限値の変更

1プロセスが同時に開くことが出来るファイルディスクリプタの数の上限を増やす。
下記ファイルに以下の内容を追記する (sudo 必要)。

/etc/security/limits.conf
# これらを追記
root soft nofile 65536
root hard nofile 65536
* soft nofile 65536
* hard nofile 65536

その後、仮想マシンをリブートし、ulimit -n コマンドを
実行した際の値を確認し下記のようになっていれば良い。

$ ulimit -n
65536

Fluentd のインストール

TresureData のリポジトリを利用してインストールする。
下記コマンドを実行してインストール。

$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh

下記コマンドで Fluentd を起動確認。

$ sudo service td-agent start
Starting td-agent: [ OK ]

問題なく起動できることが確認できたら、一度サービスを止めておく。

$ sudo service td-agent stop
Shutting down td-agent: [ OK ]

Twitter Streaming API + Fluentd によるデータ取得

Twitter OAuth 認証キーの取得

まずは使用する Twitter のアカウントを新規作成する。
メール認証と携帯電話番号の登録を済ませ、ログイン状態を維持したまま
Twitter Developers ページ にアクセスする。

画面下部にあるメニューの中から Manage Your Apps をクリックし、
リンク先のページの Create New App をクリックする。

Application Details に下記内容を入力する。

  • Name: 適当にクローラーの名前をつける
  • Description: 適当にクローラーの説明を書く。
  • Website: 自分のサイトのURLを入力。仮でも可。
  • Callback URL: 空欄のままで可

Developer Agreement を読み、Yes, I agree にチェックを入れ、
Create your Twitter application をクリック。
なお、電話番号登録が終わっていないと、次に進めないので注意。

アプリケーションが作成できたら、画面上部のタブの
Keys and Access Tokens をクリックし、
画面遷移後、画面下部の Create my access token をクリックする。

これで Twitter Streaming API に必要な以下の4つの情報を確認できる。

  • consumer_key
  • consumer_secret
  • Access Token
  • Access Token Secret

fluentd の設定ファイルにこれらを記述する必要があるので、
どこかにメモしておく。

fluent-plugin-twitter のインストール

下記コマンドを実行すれば、インストールできる。

$ sudo yum -y install openssl-devel libcurl libcurl-devel gcc-c++
$ sudo td-agent-gem install install eventmachine
$ sudo td-agent-gem install fluent-plugin-twitter

td-agent.conf の設定

/etc/td-agent/td-agent.conf を編集する。

/etc/td-agent/td-agent.conf
<source>
  type twitter
  consumer_key YOUR_CONSUMER_KEY
  consumer_secret YOUR_CONSUMER_SECRET
  oauth_token YOUR_ACCESS_TOKEN
  oauth_token_secret YOUR_ACCESS_TOKEN_SECRET
  tag input.twitter.sampling
  timeline sampling
  lang ja
  output_format nest
</source>

<match input.twitter.sampling>
  type stdout
</match>

動作確認

以下のコマンドを実行する。
この状態だと、ログファイルにTweetデータが次々と書き込まれていくので、
入力部分の動作が確認できたら、すぐに Fluentd のサービスを止める。

$ sudo /etc/init.d/td-agent start
# 数秒程度待ってから
$ sudo /etc/init.d/td-agent stop
$ tail /var/log/td-agent/td-agent.log
...
2015-05-08 06:10:24 +0000 input.twitter.sampling: {"created_at":"Fri May 08 06:10:24 +0000 2015","id":596557745366536193,"id_str":"596557745366536193","text":"たくさん笑って。\n笑うことは私たちの一番の財産で、\n見た目や雰囲気を変えてくれるだけでなく、気持ちも変えてくれる。\n\n(画像 三浦春馬) http://t.co/LNZE5UTdvT","source":"<a href=\"http://twittbot.net/\" rel=\"nofollow\">twittbot.net</a>","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":1456165825,"id_str":"1456165825","name":"癒しのイケメン達","screen_name":"iyasaretaizo","location":"","url":null,"description":"ストレス社会…癒し必要ですよね。明日もがんばろうと思える言葉、元気が出る言葉をイケメン画像と一緒にお届けします♡Wの力で癒し度UP間違いなし!!\r\n気に入ったらフォロー&リツイートして広めてね♡\r\n※言葉と画像は全く関係はありません。","protected":false,"verified":false,"followers_count":233,"friends_count":0,"listed_count":1,"favourites_count":10,"statuses_count":10235,"created_at":"Sat May 25 06:16:40 +0000 2013","utc_offset":null,"time_zone":null,"geo_enabled":false,"lang":"ja","contributors_enabled":false,"is_translator":false,"profile_background_color":"C0DEED","profile_background_image_url":"http://abs.twimg.com/images/themes/theme1/bg.png","profile_background_image_url_https":"https://abs.twimg.com/images/themes/theme1/bg.png","profile_background_tile":false,"profile_link_color":"0084B4","profile_sidebar_border_color":"C0DEED","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http://pbs.twimg.com/profile_images/3707077480/d1a40337ecfaf4b99309bc44e2819032_normal.jpeg","profile_image_url_https":"https://pbs.twimg.com/profile_images/3707077480/d1a40337ecfaf4b99309bc44e2819032_normal.jpeg","default_profile":true,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":339351879110782976,"id_str":"339351879110782976","indices":[69,91],"media_url":"http://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","media_url_https":"https://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","url":"http://t.co/LNZE5UTdvT","display_url":"pic.twitter.com/LNZE5UTdvT","expanded_url":"http://twitter.com/iyasaretaizo/status/339351879102394368/photo/1","type":"photo","sizes":{"small":{"w":276,"h":320,"resize":"fit"},"medium":{"w":276,"h":320,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"large":{"w":276,"h":320,"resize":"fit"}},"source_status_id":339351879102394368,"source_status_id_str":"339351879102394368"}]},"extended_entities":{"media":[{"id":339351879110782976,"id_str":"339351879110782976","indices":[69,91],"media_url":"http://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","media_url_https":"https://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","url":"http://t.co/LNZE5UTdvT","display_url":"pic.twitter.com/LNZE5UTdvT","expanded_url":"http://twitter.com/iyasaretaizo/status/339351879102394368/photo/1","type":"photo","sizes":{"small":{"w":276,"h":320,"resize":"fit"},"medium":{"w":276,"h":320,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"large":{"w":276,"h":320,"resize":"fit"}},"source_status_id":339351879102394368,"source_status_id_str":"339351879102394368"}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"ja","timestamp_ms":"1431065424659"}
2015-05-08 06:10:25 +0000 [info]: shutting down fluentd
2015-05-08 06:10:25 +0000 [info]: process finished code=0

このような形でTweetデータの収集が行えていることが確認できればOK。

BigQuery へのデータアウトプット設定

fluent-plugin-bigquery のインストール

下記コマンドを実行すれば、インストールできる。

$ sudo td-agent-gem install fluent-plugin-bigquery

サービスアカウントの発行

以下の作業はクライアントマシンで行う。

作成した tweet-streaming-api-test プロジェクトの
プロジェクトダッシュボードを表示し、画面左部分から
API と認証 -> 認証情報 -> OAuth -> 新しいクライアントIDを作成
をクリックする。

サービスアカウントを選択し、P12キーを選択し鍵ファイルを作成する。
ダウンロードが始まるので、ローカルマシンにダウンロードし、
scp コマンド等で fluentd 用のサーバに転送しておく。
転送した鍵ファイルは /opt/td-agent/google-keys/ 以下に設置した。

また、このサービスアカウントの項目で確認できるメールアドレス
fluentd の設定でも利用するのでメモっておく。

データセットの作成

下記コマンドで、収集した Tweet を記録するためのデータセットを作成する。

$ bq mk [PROJECT_ID]:tweets
Dataset ‘XXXX-XXXX:tweets' successfully created.

テーブルのスキーマ定義と作成

Tweet 情報のスキーマをこのリンク先のページのように定義し、
/opt/td-agent/bq_schema/ を作成、 bq_tweet.json として保存する。

users.status は今回カットした。
また、place.coordinates が3重入れ子のリストになっていたが、今回は文字列として格納している。

$ wget https://gist.github.com/Salinger/ef39b81ad2c48516b596
$ mkdir -p /opt/td-agent/bq_schema
$ sudo mv bq_tweet.json /opt/td-agent/bq_schema/
$ sudo chmod 666 /opt/td-agent/bq_schema/bq_tweet.json

下記コマンドで事前にデータの格納先テーブルを作成しておく。

$ bq mk -t [PROJECT_ID]:tweets.streaming_test /opt/td-agent/bq_schema/bq_tweet.json

td-agent.conf の設定

Twitter Streaming API から情報を取得する際に記述した設定のうち
<match input.twitter.sampling> ~ </match>
の部分を書き換える。

設定ファイルの email の部分は、OAuth 認証のサービスアカウントで確認できる
XXXXXXX@developer.gserviceaccount.com を入力する。

設定は下記ページを参考にした。
fluent-plugin-bigquery の設定

<match input.twitter.sampling>
  type copy                                                                                                                                                    
  <store>
    type bigquery
    buffer_type file
    buffer_path /var/log/td-agent/buffer/twi.*.buf
    method insert

    auth_method private_key
    email XXXXXXXXXX-XXXXXXXXX@developer.gserviceaccount.com
    private_key_path /opt/td-agent/google-keys/tweet-streaming-api-test-XXXXXXXXXX.p12

    project XXXXXX-XXXXX-XXXXX
    dataset tweets                                                                                                                                  
    table streaming_test
    flush_interval 1
    buffer_chunk_limit           1000000
    buffer_queue_limit           5000                                                      
    flush_interval               1 
    try_flush_interval           0.05
    num_threads                  4
    queued_chunk_flush_interval  0.01

    time_format %s
    time_field log_time

    schema_path /opt/td-agent/bq_schema/bq_tweet.json
    log_level error
  </store>
</match>

下記コマンドで実行結果の確認。

$ sudo /etc/init.d/td-agent start

これでBigQueryのWebコンソール上で無事Tweetデータが格納され始めているのが確認できればOK。

動作が怪しい場合は、/var/log/td-agent/td-agent.log を見てエラーがないかどうかを確認する。

また、Google Developers Console でAPIのログも確認し、エラーが出てないかどうか確認する。
InsertErrors になっていても、HTTPのステータスは2xx が返り正常に見える点には注意。

さいごに

今回は可能な限り元ツイートデータの構造を保ったまま格納しているが、
必要なデータのみ格納したい場合は bq_tweet.json を書き換えればOK。

また、Tweet データバックアップのために、BigQuery と同時に
AWS S3 への同時格納も検討すべきである。
※ 長期間BigQueryのサービスがダウンし、fluentd 用マシンのバッファからあふれた場合Tweetデータが消失するのを防ぐため

Tweetデータの要素が一部なかった場合 (例えば、entities.urls 等)
BigQuery へどのように格納されるかは下記ページが参考になる。
BigQueryでデータロード時に欠損値の読み込みルールまとめ

参考資料

37
43
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
37
43

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?