LoginSignup
30
27

More than 3 years have passed since last update.

Twitter Streaming API用の完璧で汎用的なモジュールを作ってみる

Posted at

こんにちは!すずともです!
今回は、TwitterのStreaming APIについて。

(1)Twitter Streaming APIとは

TweetDeckみたいに、ツイートをリアルタイムで取得できるAPIのことです。

といっても、ユーザのTLをリアルタイムで取得できる「User Streams API」は2年ほど前に廃止されました。

現在利用できるStreaming APIは「Filter realtime Tweets」というものです。
これは、検索条件を指定して、その条件にマッチしているツイートをリアルタイムできるAPIです。

これには、2種類のAPIが用意されています。

  • statuses/filter API
  • PowerTrack API

この2つの違いは、無料版か有料版かの違いで様々な制限の違いがあります。
詳しくは、OverViewのページを見てください。

僕はそんなにお金持ちじゃないので、今回は無料版の「statuses/filter API」についての記事です。

(2)完璧なモジュールとは

Twitter APIを使う上で、制限というのはつきもの。
というのはAPIを使った人にはわかりますよね?

Streaming APIにも様々な制限があります。
制限についてはGuideのページを見てください!
といってもいろいろ書いてあってなんかめんどくさいな。

そんなあなたに、Pythonで簡単にStreaming APIが使えたら便利ですよね!

(3)使用例

(4)で示すtweest.pyと同じフォルダに置いて実行してください。
また、各Keyは自分の物を入れてください。

tweest_test.py
CK = ''
CS = ''
AT = ''
AS = ''

import tweest

def test(status):
    print(status)

tweest.set_auth(CK,CS,AT,AS)
tweest.start({'track':'あ'},test)

set_authに各APIキーを渡してください。

start関数の第1引数にはAPIに渡すパラメータ、第2引数には受信したときに実行する関数を渡してください。
パラメータに関しては、下記ページを参照してください。
https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter

第2関数に指定する関数は、引数が1つ必要です。
この引数には、受信したツイートの情報が辞書形式で入っています。
詳しい中身に関しては下記のページを参照してください。
https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/intro-to-tweet-json


と、なんともシンプルな形でStreaming APIが書けました!
ちなみにこの例は「あ」という文字が含まれるツイートをリアルタイムで取得できます。
実際に実行してみると、結構な頻度でツイートを取得できると思います。

(4)コード

コードを置いておきます。
しかし、動作は保証してませんのであしからず。

なにか不具合あればコメントなどで言ってください。

あっ、ちなみにPython3.6で作りました。

モジュールのファイルとしてtweest.pyを作ります。

tweest.py
import json
import sys
import threading
import time


from requests_oauthlib import OAuth1Session

auth = None

last_time = 0

re_t_network = 16
re_t_http = 5
re_t_420 = 60

def __reset_backoff_time():
    global re_t_network
    global re_t_http
    global re_t_420
    re_t_network = 16
    re_t_http = 5
    re_t_420 = 60


class TweestError(Exception):
    pass


def set_auth(CK,CS,AT,AS):
    # authの作成
    global auth
    auth = OAuth1Session(CK,CS,AT,AS)


def __streaming_thread(request,func):
    global last_time
    last_time = time.time()
    for line in request.iter_lines(decode_unicode=True):

        last_time = time.time()
        if line :
            # 取得したJsonデータ(バイト列)を辞書形式に変換
            func(json.loads(line))


def start(params,func):
    while True:
        try:
            # リクエストを送る
            r = auth.post('https://stream.twitter.com/1.1/statuses/filter.json',
                            params = params,
                            stream = True)

            r.encoding = 'utf-8'

            # リクエストのステータスコードを確認
            if r.status_code == 200:
                __reset_backoff_time()

                thread = threading.Thread(target=__streaming_thread,args=([r,func]))
                thread.setDaemon(True)
                thread.start()

                # 90秒間受信データがない場合、whileを抜け再接続
                while time.time() - last_time < 90:
                    time.sleep(90 - (time.time() - last_time))

            elif r.status_code == 401:
                raise TweestError('404 : Unauthorized')
            elif r.status_code == 403:
                raise TweestError('403 : Forbidden')
            elif r.status_code == 406:
                raise TweestError('406 : Not Acceptable')
            elif r.status_code == 413:
                raise TweestError('413 : Too Long')
            elif r.status_code == 416:
                raise TweestError('416 : Range Unacceptable')
            elif r.status_code == 420:
                # 420エラーの場合、待機時間を2倍に伸ばす(制限なし)
                print(f'420 : Rate Limited. Recconecting... wait {re_t_420}s')
                time.sleep(re_t_http)
                re_t_http *= 2
            elif r.status_code == 503:
                # 再接続が必要なHTTPエラーの場合、待機時間を2倍に伸ばす(最大320秒)
                print(f'503 : Service Unavailable. Reconnecting... wait {re_t_http}s')
                time.sleep(re_t_http)
                re_t_http *= 2
                if re_t_http > 320:
                    raise TweestError('503 : Service Unavailable.')

            else:
                raise TweestError(f'HTTP ERRORE : {r.status_code}')

        except KeyboardInterrupt: # Ctrl + C で強制終了できる
            break
        except ConnectionError:
            time.sleep(re_t_network)
            re_t_network += 16
            if re_t_network > 250:
                raise TweestError('Network Error')
        except:
            raise

(5)このモジュールについて

簡単にこのモジュールの仕様を示しておきます。

依存するモジュール

requests_oauthlibを使用してます。
あらかじめこのモジュールをインストールしておいてください。

具体的な制限・エラーと、対処法

様々な制限、エラーに引っかかった場合に、このモジュールがどのような仕組みで対処するかを示してあります。
ちなみに、どの状態でもCtrl+Cで強制的に終了できます。

また、各待機時間は正常に接続されたときにリセットされます。

a. キープアライブ信号

APIに接続中はキープアライブ信号という空白行が30秒おきに送られてきます。
これは、きちんと接続されているかの確認用の信号です。

途中でツイートを受信した場合は、そこから30秒後にキープアライブ信号が来るはずです。

このキープアライブ信号が90秒間(つまり3回)来なかったら、接続が何かしらの事情で切れていると考えます。

そのため、前回の受信から90秒間受信がなかった場合には再接続するようになっています。

b. TCP/IPレベルのネットワークエラー

TCP/IPレベル、つまりTwitterに接続する前の段階で失敗したということです。

この場合は、線形バックオフという再接続方法を行います。
最初は16秒待ってから再接続、次は32秒、その次は48秒...というように、線形関数的に再接続を繰り返します。
最大250秒で、待機時間が250秒を超えた場合、例外TweestError('Network Error')をスローします。

このモジュールでは、requests_oauthlibpostするときに発生する例外ConnectionErrorをTCP/IPレベルのエラーと考えて、このバックオフを行っています。

c. HTTPエラー

HTTPエラーの場合は、指数関数パックオフという再接続方法を行います。
最初は5秒、次は10秒、20秒、40秒...
というように、再接続ごとに待機時間を2倍にしていきます。
最大320秒で、待機時間が320秒を超えた場合には、例外TweestError('503 : Service Unavailable.')をスローします。

APIのページを見てみる限りHTTP 503エラーの時のみ「再接続が適切であるHTTPエラー」に該当するように思えたので、このモジュールではその時に、この線形バックオフを行います。

ちなみに、503エラーは、Twitter側の問題(鯖落ちなど)です。

d. HTTP 420エラー

420エラーの場合にも、指数関数バックオフを用います。
この時の初期待機時間は1分からとなっており、HTTPエラーの時より長い待機時間となっています。

というのも、420エラーは、主にAPI接続しすぎの時に返されるエラーなので、制限される時間が長いからです。

このあたりの待機時間に関してはいまいち仕様が分からないので、何回再接続しても420エラーが返ってくる場合には、このモジュールの再接続アルゴリズムを信用せず、一度プログラムを終了して、十分な間隔をあけて手動で再接続することをお勧めします。

e. HTTPエラーコード

APIのGuideページにエラーコードの詳細が書いてあります。

これらのエラーコードが返されたときには、エラーコードと、簡単な説明が書かれたTweestError例外がスローされます。
(420,503エラーは先述のとおり除く)

また、APIのページに書かれていない他のエラーコードが返ってきた場合には、そのエラーコードが示されたTweestErrorがスローされます。

モジュールの動作の確実性

普通にモジュールを使っている場合には、基本エラーが返ってくることはなく、再接続のアルゴリズムや、例外のスローなど、確認しきれていない箇所が複数あります。

全てのエラーを確認することは、方法がないわけではありませんが、なかなか難しいところがあります。

そのため、このモジュールを使ってくれた方の中で不具合が起きたなどありましたらコメントなどくれるとありがたく思います。
正常に再接続アルゴリズムや例外のスローが動いた場合にも報告してくださると助かります。

改善点

python初心者なので...

僕は主にC言語を触る人間で、PythonはTwitterのBotを作る以外にあまり使いません。
そのため、Pythonの構文的におかしいところや、便利な関数などありましたら教えて下さい。

一応、基本的な命名規則には則ったつもりです。(Cとは規則が違ってびっくりした)

キープアライブ信号による接続状態の判定部分

(5)a.の部分ですね。

ツイートを監視する際、r.iter_lines()というところで、次のツイートが来るまで処理が止まってしまうため、同じスレッド内では90秒受信が無かったかの判別が出来ないと思い、データ受信部分を別スレッド化しました。

requests_oauthlibの元となっているrequestモジュールの関数にキープアライブ信号を検知する機能はないか探してみましたが、ありませんでした。

※ちなみに、streamしないときはtimeoutを設定することで最大接続待ち時間を設定できるようですが、このオプションはstream=Trueの時には適用されないようです。

スレッドを使用することで、APIへの再接続の際に受信部分が強制的に終了する形になるため、あまり安全ではありませんね。
(というかスレッドが終了するかどうかも分からない。thread変数のスコープが外れればスレッドが終わるだろうというC的な考えで実装してます)
本来ならスレッドの強制終了を行うのはいけないコードですが、他の実装方法が思いつかなかったのでこの形にしました。

おそらくもっといい方法があると思いますので、教えて下さい🥺

(6)さいごに

このモジュールを作ったのは、誰のためでもない僕のためです。
僕が使える範囲で作ったので、テストなど甘い部分が多々あります。

このモジュールを使用する際には自己責任でお願いします。

30
27
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
30
27