はじめに
今回は、Twitter Streaming API で得られる全ツイート中の 1% の
日本語 Tweet のデータをログとして収集し、
Fluentd を用いて、BigQuery に格納するシステムを構築する。
以下に BigQuery と Fluentd を説明を記載する。
BigQuery とは
Google BigQuery とは、クラウド上で動作するデータウェアハウス。
以下の様な特徴を持っている。
- テラバイト級のデータに対しても、高速に集計が可能。
- ストレージ料金が安い (容量あたりの保存単価だとAWS S3 よりも安価)
- データのインポートが容易 (バルクアップロード / ストリーム)。
細かい話は次のリンクを参照。
Googleの虎の子「BigQuery」をFluentdユーザーが使わない理由がなくなった理由 #gcpja
ありとあらゆるログデータを格納する先として非常に優秀。
Fluentd とは
Fluentd とは、ログ集計処理の一元化を目的としたツール。
性能や安定性にも優れている。
様々なログデータをリアルタイムに Fluentd に集め、
フィルタリング、一時貯蔵、ルーティング、簡易集計等の適切な処理を行った後、
各種ストレージ、DBなどに格納する処理などを一元化して行うことが可能。
各種プラグインで容易に拡張可能。
今回はTweetデータ収集用のプラグインと、BigQueryへのアウトプット用プラグインを利用する。
システム構築
作業環境
今回使用した環境は以下のとおり。
-
クライアントマシン:
PC: MacBookPro Retina 13-inch Late 2013
OS: Yosemite 10.10.3
CPU: Core i5 2.6GHz
Mem: 16GB -
Fluentd 用サーバ
AWS EC2
AMI: Amazon Linux AMI 2015.03
インスタンスタイプ: t2.micro
ストレージ: General Purpose (SSD) 30 GB
なお、AWSの登録等は今回割愛する。
BigQuery 周りの設定
Google Cloud Platform のアカウント登録
Google Cloud Platform: BigQuery から無償体験版に登録する。
新規ユーザーの場合 $300 分の無償枠がおまけでついてくる。
登録にクレカ必須なので注意。
プロジェクト作成
プロジェクトを作成
をクリックする。
プロジェクト名に tweet-streaming-api-test
を入力し、作成をクリック。
左側のメニューから ビッグデータ
-> BigQuery
をクリックし、
BigQueryの管理画面にアクセスする。
動作確認
COMPOSE QUERY
をクリックして、下記のクエリをフォームに入力する。
サンプルデータとして入っている samples.shakespeare
のデータを使う。
SELECT
corpus
FROM
publicdata:samples.shakespeare
GROUP BY
corpus
RUN QUERY
をクリックし、結果が出力されれば動作確認はOK。
CLIツールのインストール (bq コマンド)
新規テーブルの作成等、基本的な操作は CLI ツール (bq コマンド) から行うことができる。
できること一覧は bq-command-line-tool を参照。
Google Cloud SDK をインストールすれば、
CLIツール(bq コマンド) もまとめてインストールされる。
クライアントマシンで下記コマンドを実行する。
$ curl https://sdk.cloud.google.com | bash
途中で、展開場所を聞かれるので指定する (デフォルトだと実行ユーザの home 直下)。
また改善データをGoogleに送信するかを問われるので、Y/n
好きな方を選択する。
また、実行ファイルのパスを追加するかどうか問われるので、基本的に Y
を選ぶ
(.bashrc の場所を問われるがデフォルトのままでOK)。
以下のコマンドを実行して、.bashrc
を再読み込み。
$ source ~/.bashrc
次に、コンソールからGoogleアカウントの接続情報を登録する。
下記コマンドを実行する。
$ gcloud auth login
https://accounts.google.com/o/oauth2/auth?redirect_uri= …
というリンクが表示されるので、クライアントマシンのブラウザに入力し、
管理用アカウントでログインする。
アプリケーションの認証を行うと、認証コードが発行されるので、
コンソールに入力する。
次に、対象のプロジェクトを指定する。
tweet-streaming-api-test
プロジェクトのプロジェクトIDを入力する。
$ gcloud config set project [PROJECT_ID]
bq コマンドの実行テスト。
$ bq show publicdata:samples.shakespeare
Table publicdata:samples.shakespeare
Last modified Schema Total Rows Total Bytes Expiration
----------------- ------------------------------------ ------------ ------------- ------------
21 Oct 21:27:07 |- word: string (required) 164656 6432064
|- word_count: integer (required)
|- corpus: string (required)
|- corpus_date: integer (required)
$ bq query "SELECT corpus FROM publicdata:samples.shakespeare GROUP BY corpus LIMIT 5;"
Waiting on bqjob_r6fb53bf9abfc8c9c_0000014d2dd82009_1 ... (0s) Current status: DONE
+---------------------+
| corpus |
+---------------------+
| loverscomplaint |
| merrywivesofwindsor |
| 3kinghenryvi |
| coriolanus |
| kinghenryviii |
+---------------------+
Fluentd によるログ収集サーバの構築
新規AWS EC2 サーバの準備
以下の内容で新規インスタンスを作成する。
AMI: Amazon Linux AMI 2015.03
インスタンスタイプ: t2.micro
ストレージ: General Purpose (SSD) 30 GB
作成手順の詳細は今回は割愛する。
以下、作成したインスタンスにログインして作業。
ファイルディスクリプタの上限値の変更
1プロセスが同時に開くことが出来るファイルディスクリプタの数の上限を増やす。
下記ファイルに以下の内容を追記する (sudo 必要)。
# これらを追記
root soft nofile 65536
root hard nofile 65536
* soft nofile 65536
* hard nofile 65536
その後、仮想マシンをリブートし、ulimit -n
コマンドを
実行した際の値を確認し下記のようになっていれば良い。
$ ulimit -n
65536
Fluentd のインストール
TresureData のリポジトリを利用してインストールする。
下記コマンドを実行してインストール。
$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
下記コマンドで Fluentd を起動確認。
$ sudo service td-agent start
Starting td-agent: [ OK ]
問題なく起動できることが確認できたら、一度サービスを止めておく。
$ sudo service td-agent stop
Shutting down td-agent: [ OK ]
Twitter Streaming API + Fluentd によるデータ取得
Twitter OAuth 認証キーの取得
まずは使用する Twitter のアカウントを新規作成する。
メール認証と携帯電話番号の登録を済ませ、ログイン状態を維持したまま
Twitter Developers ページ にアクセスする。
画面下部にあるメニューの中から Manage Your Apps
をクリックし、
リンク先のページの Create New App
をクリックする。
Application Details に下記内容を入力する。
- Name: 適当にクローラーの名前をつける
- Description: 適当にクローラーの説明を書く。
- Website: 自分のサイトのURLを入力。仮でも可。
- Callback URL: 空欄のままで可
Developer Agreement を読み、Yes, I agree にチェックを入れ、
Create your Twitter application
をクリック。
なお、電話番号登録が終わっていないと、次に進めないので注意。
アプリケーションが作成できたら、画面上部のタブの
Keys and Access Tokens
をクリックし、
画面遷移後、画面下部の Create my access token
をクリックする。
これで Twitter Streaming API に必要な以下の4つの情報を確認できる。
- consumer_key
- consumer_secret
- Access Token
- Access Token Secret
fluentd の設定ファイルにこれらを記述する必要があるので、
どこかにメモしておく。
fluent-plugin-twitter のインストール
下記コマンドを実行すれば、インストールできる。
$ sudo yum -y install openssl-devel libcurl libcurl-devel gcc-c++
$ sudo td-agent-gem install install eventmachine
$ sudo td-agent-gem install fluent-plugin-twitter
td-agent.conf の設定
/etc/td-agent/td-agent.conf
を編集する。
<source>
type twitter
consumer_key YOUR_CONSUMER_KEY
consumer_secret YOUR_CONSUMER_SECRET
oauth_token YOUR_ACCESS_TOKEN
oauth_token_secret YOUR_ACCESS_TOKEN_SECRET
tag input.twitter.sampling
timeline sampling
lang ja
output_format nest
</source>
<match input.twitter.sampling>
type stdout
</match>
動作確認
以下のコマンドを実行する。
この状態だと、ログファイルにTweetデータが次々と書き込まれていくので、
入力部分の動作が確認できたら、すぐに Fluentd のサービスを止める。
$ sudo /etc/init.d/td-agent start
# 数秒程度待ってから
$ sudo /etc/init.d/td-agent stop
$ tail /var/log/td-agent/td-agent.log
...
2015-05-08 06:10:24 +0000 input.twitter.sampling: {"created_at":"Fri May 08 06:10:24 +0000 2015","id":596557745366536193,"id_str":"596557745366536193","text":"たくさん笑って。\n笑うことは私たちの一番の財産で、\n見た目や雰囲気を変えてくれるだけでなく、気持ちも変えてくれる。\n\n(画像 三浦春馬) http://t.co/LNZE5UTdvT","source":"<a href=\"http://twittbot.net/\" rel=\"nofollow\">twittbot.net</a>","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":1456165825,"id_str":"1456165825","name":"癒しのイケメン達","screen_name":"iyasaretaizo","location":"","url":null,"description":"ストレス社会…癒し必要ですよね。明日もがんばろうと思える言葉、元気が出る言葉をイケメン画像と一緒にお届けします♡Wの力で癒し度UP間違いなし!!\r\n気に入ったらフォロー&リツイートして広めてね♡\r\n※言葉と画像は全く関係はありません。","protected":false,"verified":false,"followers_count":233,"friends_count":0,"listed_count":1,"favourites_count":10,"statuses_count":10235,"created_at":"Sat May 25 06:16:40 +0000 2013","utc_offset":null,"time_zone":null,"geo_enabled":false,"lang":"ja","contributors_enabled":false,"is_translator":false,"profile_background_color":"C0DEED","profile_background_image_url":"http://abs.twimg.com/images/themes/theme1/bg.png","profile_background_image_url_https":"https://abs.twimg.com/images/themes/theme1/bg.png","profile_background_tile":false,"profile_link_color":"0084B4","profile_sidebar_border_color":"C0DEED","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http://pbs.twimg.com/profile_images/3707077480/d1a40337ecfaf4b99309bc44e2819032_normal.jpeg","profile_image_url_https":"https://pbs.twimg.com/profile_images/3707077480/d1a40337ecfaf4b99309bc44e2819032_normal.jpeg","default_profile":true,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[{"id":339351879110782976,"id_str":"339351879110782976","indices":[69,91],"media_url":"http://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","media_url_https":"https://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","url":"http://t.co/LNZE5UTdvT","display_url":"pic.twitter.com/LNZE5UTdvT","expanded_url":"http://twitter.com/iyasaretaizo/status/339351879102394368/photo/1","type":"photo","sizes":{"small":{"w":276,"h":320,"resize":"fit"},"medium":{"w":276,"h":320,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"large":{"w":276,"h":320,"resize":"fit"}},"source_status_id":339351879102394368,"source_status_id_str":"339351879102394368"}]},"extended_entities":{"media":[{"id":339351879110782976,"id_str":"339351879110782976","indices":[69,91],"media_url":"http://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","media_url_https":"https://pbs.twimg.com/media/BLWevHBCcAAD9MM.jpg","url":"http://t.co/LNZE5UTdvT","display_url":"pic.twitter.com/LNZE5UTdvT","expanded_url":"http://twitter.com/iyasaretaizo/status/339351879102394368/photo/1","type":"photo","sizes":{"small":{"w":276,"h":320,"resize":"fit"},"medium":{"w":276,"h":320,"resize":"fit"},"thumb":{"w":150,"h":150,"resize":"crop"},"large":{"w":276,"h":320,"resize":"fit"}},"source_status_id":339351879102394368,"source_status_id_str":"339351879102394368"}]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"ja","timestamp_ms":"1431065424659"}
2015-05-08 06:10:25 +0000 [info]: shutting down fluentd
2015-05-08 06:10:25 +0000 [info]: process finished code=0
このような形でTweetデータの収集が行えていることが確認できればOK。
BigQuery へのデータアウトプット設定
fluent-plugin-bigquery のインストール
下記コマンドを実行すれば、インストールできる。
$ sudo td-agent-gem install fluent-plugin-bigquery
サービスアカウントの発行
以下の作業はクライアントマシンで行う。
作成した tweet-streaming-api-test
プロジェクトの
プロジェクトダッシュボードを表示し、画面左部分から
API と認証
-> 認証情報
-> OAuth
-> 新しいクライアントIDを作成
をクリックする。
サービスアカウント
を選択し、P12キーを選択し鍵ファイルを作成する。
ダウンロードが始まるので、ローカルマシンにダウンロードし、
scp コマンド等で fluentd 用のサーバに転送しておく。
転送した鍵ファイルは /opt/td-agent/google-keys/
以下に設置した。
また、このサービスアカウントの項目で確認できるメールアドレス
は
fluentd の設定でも利用するのでメモっておく。
データセットの作成
下記コマンドで、収集した Tweet を記録するためのデータセットを作成する。
$ bq mk [PROJECT_ID]:tweets
Dataset ‘XXXX-XXXX:tweets' successfully created.
テーブルのスキーマ定義と作成
Tweet 情報のスキーマをこのリンク先のページのように定義し、
/opt/td-agent/bq_schema/
を作成、 bq_tweet.json
として保存する。
users.status は今回カットした。
また、place.coordinates が3重入れ子のリストになっていたが、今回は文字列として格納している。
$ wget https://gist.github.com/Salinger/ef39b81ad2c48516b596
$ mkdir -p /opt/td-agent/bq_schema
$ sudo mv bq_tweet.json /opt/td-agent/bq_schema/
$ sudo chmod 666 /opt/td-agent/bq_schema/bq_tweet.json
下記コマンドで事前にデータの格納先テーブルを作成しておく。
$ bq mk -t [PROJECT_ID]:tweets.streaming_test /opt/td-agent/bq_schema/bq_tweet.json
td-agent.conf の設定
Twitter Streaming API から情報を取得する際に記述した設定のうち
<match input.twitter.sampling> ~ </match>
の部分を書き換える。
設定ファイルの email の部分は、OAuth 認証のサービスアカウントで確認できる
XXXXXXX@developer.gserviceaccount.com
を入力する。
設定は下記ページを参考にした。
fluent-plugin-bigquery の設定
<match input.twitter.sampling>
type copy
<store>
type bigquery
buffer_type file
buffer_path /var/log/td-agent/buffer/twi.*.buf
method insert
auth_method private_key
email XXXXXXXXXX-XXXXXXXXX@developer.gserviceaccount.com
private_key_path /opt/td-agent/google-keys/tweet-streaming-api-test-XXXXXXXXXX.p12
project XXXXXX-XXXXX-XXXXX
dataset tweets
table streaming_test
flush_interval 1
buffer_chunk_limit 1000000
buffer_queue_limit 5000
flush_interval 1
try_flush_interval 0.05
num_threads 4
queued_chunk_flush_interval 0.01
time_format %s
time_field log_time
schema_path /opt/td-agent/bq_schema/bq_tweet.json
log_level error
</store>
</match>
下記コマンドで実行結果の確認。
$ sudo /etc/init.d/td-agent start
これでBigQueryのWebコンソール上で無事Tweetデータが格納され始めているのが確認できればOK。
動作が怪しい場合は、/var/log/td-agent/td-agent.log
を見てエラーがないかどうかを確認する。
また、Google Developers Console でAPIのログも確認し、エラーが出てないかどうか確認する。
InsertErrors になっていても、HTTPのステータスは2xx
が返り正常に見える点には注意。
さいごに
今回は可能な限り元ツイートデータの構造を保ったまま格納しているが、
必要なデータのみ格納したい場合は bq_tweet.json
を書き換えればOK。
また、Tweet データバックアップのために、BigQuery と同時に
AWS S3 への同時格納も検討すべきである。
※ 長期間BigQueryのサービスがダウンし、fluentd 用マシンのバッファからあふれた場合Tweetデータが消失するのを防ぐため
Tweetデータの要素が一部なかった場合 (例えば、entities.urls 等)
BigQuery へどのように格納されるかは下記ページが参考になる。
BigQueryでデータロード時に欠損値の読み込みルールまとめ