Edited at

ツイートを長期間収集する実験(プログラム準備編(2))

More than 3 years have passed since last update.


前回まで



  • 3か月ぶっ通しで動作させる、Twitterのツイート収集プログラム作るよ!


    • リリース予定は10月末。言語はPython、DBはMongoDB




  • じゃあひとまず作り始めるベー


    • 開発環境! Windows! VS2013! (Pythonです)

    • 目標は、Twitterにつながって、データ取得できて、自動で再接続できること。それ以外は後回し。




  • まずはTwitter関係の処理からかなー?


    • 30行チョイ スクリプト書いたら動いた。ほんとにこれでいいのか??

    • 元ソースに肉付けしていこう  <- New




とりあえずソースを見直す

 かなり適当に作った割には意外とちゃんと動いた前回のプログラムですが、見よう見まねで作ったものなのでこれだけでほんとに良いの?というあたりの疑問が抜けません。

 オフィシャルの説明はペラ一枚レベルで、APIリファレンスなし。Stream APIって実は需要無くて放置人気無いんでしょうか……?


  • 「tweepy.StreamListener」クラスのメソッド、オフィシャルの説明だと「on_status」「on_data」「on_error」があるみたいだが、ググってみると「on_timeout」ってのが見つかる。

  • 「on_error」の説明で「Falseを返すと終了される」とか書いてあるけど、True返すとどうなるの?

  • 『接続しました』『切断しました』って表示(ログ出力)したいんだけどどうしたらいいの?

 たぶん詳細に解説してるページもどこかにあるのでしょうが、英語ほとんど読めないのでよくわかりません。Tweepy使ってる人も、主要なモノ以外見てないのか、紹介している箇所以外は自明と考えてるのか、出だしの使い方はともかくその先の使い方が想像以上に少なくて困ります。


わかんないときはソースを読むしかない。

 とにかく想像以上にStream API周りの情報が少なすぎです。なによりオフィシャルのドキュメント放置が一番シャレにならない。ほしい機能が「実は実装されてました」とか、絶対にあるから怖い。

 こうなったら、ソース直接チェックするしかない。幸いというかなんというか、TweepyはMIT Licenseで公開されてるオープンソースのソフトで、GitHubで公開されています。つまり、読み解きしようと思えばできないわけではない--時間とやる気と能力があれば--……はずです。


具体的にソースを見てみる。

 streaming.pyという、そのものずばりな名前のソースがあったのでこちらを開いてみてみますと、クラスが3つ。

* StreamListener

* ReadBuffer

* Stream

 ReadBuffer 以外の2つは、最初に作ったプログラムで使ってます。ReadBufferクラスが名前の通りのものであるならば、とりあえず放置して問題ないでしょう。「StreamListener」と「Stream」の2つを見てみることにします。


StreamListener クラス

 Streamクラスが受け取った情報を渡して、実際に処理するクラス……という理解でよいのでしょーか。実際にはこのクラスを継承した先で親クラスのメソッドをオーバーライドして独自動作を実装、それ以外は親クラスの所定動作を行う……であってるよね?

オーバーライド対象になるメソッドは、結構たくさんありました。以下がその一覧(説明文はdev.twitter.comのリファレンスと突合して確認)。


  • on_connect
    接続したタイミングで呼ばれます。
    既定動作は「pass」……何もしない? なんでこれだけ他と違ってreturnではないのだろう??

  • on_data
    処理すべきデータが飛んできたら呼ばれます。
    この処理の中で取得したデータをチェックし、下の各メソッド(斜体にしたもの)を呼び出ししてます。

  • keep_alive
    何もデータが流れていない状態で接続維持の改行が送られたときに呼ばれます。


  • on_status
    いわゆる一般的なツイートを受け取ったときに呼ばれます。

  • on_exception
    Streamオブジェクトでの処理中に例外が発生した場合に呼ばれます(呼べるようなエラーなら)。


  • on_delete
    メッセージの削除があった場合に呼ばれます。


  • on_event(UserStream時限定)
    RTされた、いいねされた、ブロックされたなどの通知の際に呼ばれます。


  • on_direct_message(UserStream時限定)
    DMが届いたときに呼ばれます。


  • on_friends(UserStream時限定)
    User Stream開始時に呼ばれます。フレンドのIDリストが送られます。


  • on_limit
    Filterして届けてるはずのメッセージが、それでも流量オーバーした際に呼ばれます。取得できなかった数も取得できます。

  • on_error
    Streamオブジェクトでの処理中に、ステータスコード200以外が帰ってきた場合に呼ばれます。

  • on_timeout
    Streamのコネクションがタイムアウトした際に呼ばれます。


  • on_disconnect
    Twitter側から切断された際に呼ばれます。メッセージ付き。


  • on_warning
    Twitter側からの処理警告が来た際に呼ばれます。メッセージ付き。

 なんだよ、結構なメッセージが送られてるじゃないか?

 ちゃんと「接続しました」メッセージはあるようです。ちょっとホッとしました。

 on_dataから呼ばれるメソッドについては、戻り値をFalseとすると切断される、ようです。on_errorはオーバーライドしないとFalseを返すように実装されてます。これはStream クラスをみて、呼び出し元をチェックしないと挙動がわかりません。

 ちなみに、Stream APIで送られてくるメッセージはdev.twitter.comで公開されてますが、「status_withheld」「user_withheld」「scrub_geo」「for_user」「control」のメッセージはon_dataでチェックしていないようです。あんま意味ないからですかね? これらまで見ようと思うと、on_dataをオーバーライドして、上のほとんどのメソッドをオーバーライドしないといけない、と。 いらんからええわ。


Stream クラス

 接続、切断、受信ループが実装されたクラス……のようです。

 接続に当たっての初期設定を 「init(コンストラクタ)」で行い、「userstream」「firehose」「retweet」「sample」「filter」「sitestream」の、Stream APIに対応したメソッドをコールすると「_start」をへて「_run」を呼び出します。

 「while self.running:」からのループで具体的な接続処理を行い、接続後の実行ループは「_read_loop」が担っています。

 「self.session.request()」で接続処理に失敗した場合、StreamListenerクラスのon_errorを呼び、Falseの場合はWhileループ脱出、それ以外の場合はエラーカウンターを回しつつ一定時間待ちながら再接続……

 って、再接続機能あるじゃねーか!

 そりゃそーだよねー、めんどくさいあたりのはなしを、ライブラリで実装しないわけないよねー。ユーザーにやらせるはなしじゃないよねー。

 要求仕様の最重要事項、「不意の切断時に再接続を実施する機能」について、いきなりハードルが下がってしまいました。うれしい反面、なんでこういう重要な機能の詳細が、わかるように記載されていないんでしょうかと小一時間問い詰めたい……

 ちなみに「_read_loop」内では、StreamListenerクラスの「on_data」を呼んでいて、ここでFalseが返る場合ループを脱出します。このときFalseが格納されるself.runningを呼び出し元の「_run」の接続ループでも見ていて、接続ループ脱出、セッションを終了します。


ソースを見て方針を決める。

 気を取り直して。

 ソースを眺めた結果、

1. StreamListenerクラスのon_dataから分岐するメソッドは、Falseを返すと切断されて終了する。
 →元メソッドでFalseを返しているものはすべてオーバーライドしてTrueを返すようにする。

2. on_error、on_timeoutは、Falseを返すと再接続処理を行わずに終了
 →これもオーバーライドしてTrueを返すようにする。

3. on_exceptionは対処できないので、何か手段を考える

 具体的にソースを見ていくと、(1)に当たるものはなし。ていうか、戻り値としてTrueもFalseも返してません。……大丈夫なのかこれ、「is False:」でチェックしてるので、値が戻らない場合の比較でイコールにならないからOKってこと?

 (2)については、on_timeoutは値を返さずreturnしているので、on_errorだけオーバーライドしてTrue返すようにすればOK。

 (3)は……どうしましょうね?

 とはいえ、エラーに限らず警告などについては、ログを残しておいたほうが後々のためになりそうなので、「on_connect」「on_disconnect」「on_limit」「on_timeout」「on_warning」「on_exception」はオーバーライドすることにします。


例外発生への対処

 例外は例外なので、対処に困ります。いちおう、Streamクラスの処理ループ中の例外は、StreamListenerのon_exceptionを呼んで伝えたのちにraiseしているので、これを呼び出し元でexceptしてやればよい、はずです。

 それ以外だと、StreamListenerでの個々の処理中に発生したら?になりますが、呼び出し元をたどればStreamListenerなので、最終的にはそこに集まる……であってる?

 それでも例外発生で止まるようなら……バッチファイルでスクリプトをループで呼び出すようにしますか??


ここまでのソース

最優先事項である「不意の切断時に再接続を実施する機能」を踏まえた、現在までのソースはこんな感じ。


tweetcheck2.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import tweepy

#自分で取得して入れる
CK = '' # Consumer Key
CS = '' # Consumer Secret
AT = '' # Access Token
AS = '' # Accesss Token Secert

class Listener(tweepy.StreamListener):
def on_status(self, status):
print('ツイート') # どうせ読めないのでメッセージがあったことだけ伝えることにする
return True

def on_error(self, status_code):
print('エラー発生: ' + str(status_code))
return True

def on_connect(self):
print('接続しました')
return

def on_disconnect(self, notice):
print('切断されました:' + str(notice.code))
return

def on_limit(self, track):
print('受信リミットが発生しました:' + str(track))
return

def on_timeout(self):
print('タイムアウト')
return True

def on_warning(self, notice):
print('警告メッセージ:' + str(notice.message))
return

def on_exception(self, exception):
print('例外エラー:' + str(exception))
return

# ここからメイン処理
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

while True: # 無限ループ
try:
listener = Listener()
stream = tweepy.Stream(auth, listener)

#どれか選択の上コメントアウト外す。
#stream.filter(track=['#xxxxxx'])
stream.sample()
#stream.userstream()
except:
pass # 例外全部無視してループさせる


 エラーや警告など、実行について危うくさせるメッセージについてはすべて表示するようにし、エラー発生時は再接続を行うようにします。万が一例外が発生した場合、メイン処理部でexceptして例外無視、ループさせて再実行します。

 終了? ……Ctrl+Cで……

(実際に試してみたところ、止められませんでした(アホ 。Ctrl+Cの時だけループ脱出、って、手段ありますっけ? )

 Twitter関係の処理の基本形は、とりあえずこんなもんでしょう。本当にこれでいいのかと悩むところですが……ツッコミ求む

 次回はMongoDBの側、最優先事項の「受信したデータのMongoDBへの格納」に取り掛かろうかと思います。

(続く)