0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ツイートを長期間収集する実験(プログラム準備編(5))

Last updated at Posted at 2016-10-25

注意事項の追記(2022年5月時点)

最初に作成したときから既に7年とか経ちそうな割に、未だにLGTMとかされているため、念のために追記します。
本稿記載のAPIは、現時点で既に存在しないAPI(2018/08に廃止)です。
代替として、「/statuses/filter」を用いる手段がありましたが、こちらも2020年10月末の廃止が宣言されました。
API 2.0の「/2/tweets/search/stream」が使えるようですが、こちらについては調べていません。
私自身は、「/search/tweets」を使って遡って取得する方向に逃げました。詳細はこちら

前回まで

  • 要求仕様:OK
  • 最優先課題:OK
  • 実行環境整備:OK
  • 追加機能と動作の実証
    • DMでエラーを通知
    • 時間になったら止める機能

注:だいたいリアルタイムに書いてたりするので、書いてる途中で脱線したり停滞したりします。

その他の機能追加

 基本的に現状で、ほぼ動作に問題がないことが確認できました。そうなると次にやるべきは、「実行したまま忘れるのを防ぐために、期限が来たら終了するようにする」と、「エラー終了に気付かずにいる事態を避けるために、DMで連絡する」の第二優先機能の実装です。

時間が来たら終了する機能

 毎回 datetime.now() 読んでうんたら、というのも芸がないなーとか思ったのですが、他になんかこれという手段も思いつかないし、速度面の問題もなさげなのでシンプルに。

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

 こんな感じで時間を切って、取得時の処理を行う on_status()で、

        if(datetime.now() > ExitLimit):
            print("時間が来たので終了")
            self.send_message("時間経過で終了")
            self.StopFlg = True
            return False

 こんな感じで止める。最後の1個は(厳密には)時間外の取得になるけど、まあいいや。

DMでエラー通知する機能

 TweepyのStream.pyを覗くと、そのものずばりAPI()なるメソッドがあるのですが、これで取得したオブジェクトでは、DMの送信には使えないみたいです。
 そんなわけでメイン処理でtweepy.API()を呼んでオブジェクトを取得する方向で。

    def send_message(self, message):
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM送信:' + message)
        else:
            print('DM送信しませんでした')
        return

 こんな感じのメソッドを作って、事象発生時にはこれを呼ぶようにしましょう。

その他の懸案事項

DB落ちたらどうするの?

 MongoDBだけ器用に落ちる、という事態はたぶんない、と思うのですが、念のため、
落ちた時にはテキストファイルに書き出す、という処理を加えます。

    def insert(self, json_src):
        # MongoへのJSONデータのinsert。
        try:
            self.Collection.insert(json_src)        # ここで格納
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            # mongoDBの謎ストップ
            self.dbErrCnt += 1
            print('エラー ' + str(self.dbErrCnt) + '回目')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB エラー発生。')   #複数同一内容連打対策
                #dir(exc)

            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('再処理失敗、ファイルに書いた')
                f.close()

 だいたいこんなもんかな。接続が生きてるかチェックする機能、みたいのがないか探したのですが、これというのもない感じだったので割愛。
 ちなみに、MongoDBが復活すれば、特に何かの再設定等必要なく再接続されてデータの格納が再開されたので、そういうもんだということにしてます(たぶんダメ

キーワードの指定は?

 イベント(隠語)まわりは複数のキーワードで呼ばれたりしてるので、複数のキーワードのORで指定しないとねーとなるわけです。

  • 正式名で登録商標
  • 登録商標で短め(5文字)
  • 登録商標で短め(3文字)
  • ハッシュタグに使われがちな英字1文字+数字
  • 四季1文字+名称先頭2文字
  • 英字(5文字)

 あたりを設定すれば、だいたい網羅できるでしょう。一部はハッシュタグと、タグなし両方入れておきます。
 Twitter Streaming APIの説明によれば、日本語のタグなしキーワードは、スペース等で分離されていないとヒットしない、とのことなので、タグなしキーワードは無くてもよさげではありますが……

TRACKWORD = ['#D51,#機関車,#蒸気機関車,#丘蒸気,#locomotive,機関車,蒸気機関車'] 

 こんな風に指定して(キーワードは適当にいじくっています)

stream.filter(track=TRACKWORD)

 呼び出しはこんな感じに修正。
 あまりたくさんのキーワードを入れると、スパム投稿が気になるところです。実際、本番用のキーワードでチェックしてみるとやたらと多いのがオークションbotで、「全部保存と言っても、これはいらんだろ」みたいなものもあるのが(一番静かな今の時期だからこそ)よくわかります。

 botはだいたい、特定のクライアントを使っているため、取得したjson中のキー"souce"を参照し、まとめて削除とかできそうです。まあ、「botの投稿数」とかもネタになりそうだから、削除はあとからということで割愛。

実働版ソース

 そんなこんなで、Pythonド素人が1か月(実働1週間くらい)で作った完成版がこちら。

TweetCrawler.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tweepy
from tweepy.api import API

import pymongo
from pymongo import MongoClient
import json

from datetime import datetime, timedelta

import sys

# Twitter アクセス関係の変数
CK = ''                            # Consumer Key
CS = ''   # Consumer Secret
AT = ''   # Access Token
AS = ''        # Accesss Token Secert

TRACKWORD = ['#(隠語)']  # Public Stream Filter動作時のキーワード

DM_DEST = ''      # DMの送信先

# MongoDB 接続関係の変数
HOST = 'mongo'      # ホスト
PORT = 27017            # ポート(デフォルト:27017)
DB_NAME = 'TwitterDB'   # DB名
COL_NAME= 'Twitter'    # コレクション名

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

class Listener(tweepy.StreamListener):
    def __init__(self, api=None):   # コンストラクタ
        tweepy.StreamListener.__init__(self)    # 親クラスのコンストラクタ
        print('コンストラクタ')

        self.StopFlg = False          # ストップフラグ。
        self.mongo_init()

        self.API = None
        
        self.dbErrCnt = 0
        
        print(ExitLimit)

        return

    def mongo_init(self):           # MongoDBの初期化
        try:
            Client = MongoClient(HOST, PORT)
            db = Client[DB_NAME]
            self.Collection = db[COL_NAME]
            print('DB準備完了')
        except pymongo.errors.PyMongoError as exc:
            # 接続エラー
            print('DB接続エラー')
        return

    def on_status(self, status):
        #print('ツイート(' + str(self.TweetCnt) + ')')
        
        self.insert(status._json)
        
        if(datetime.now() > ExitLimit):
            print("時間が来たので終了")
            self.send_message("時間経過で終了")
            self.StopFlg = True
            return False

        return True

    def on_error(self, status_code):
        print('エラー発生: ' + str(status_code))
        self.send_message("ERR:" + str(status_code))
        return True

    def on_connect(self):
        print('接続しました')

        self.send_message('接続しました')
        return

    def on_disconnect(self, notice):
        print('切断されました:' + str(notice.code))
        self.send_message("DISCCONECT:" + str(notice.code))
        return

    def on_limit(self, track):
        print('受信リミットが発生しました:' + str(track))
        self.send_message("RCV_LIMIT:" + str(track))
        return

    def on_timeout(self):
        print('タイムアウト')
        self.send_message("TIMEOUT")
        return True

    def on_warning(self, notice):
        print('警告メッセージ:' + str(notice.message))
        self.send_message("WARN:" + str(notice.message))
        return

    def on_exception(self, exception):
        print('例外エラー:' + str(exception))
        self.send_message("EXCEPTION:" + str(exception))
        return
        
    def send_message(self, message):
        # DMを送るメソッド
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM送信:' + message)
        else:
            print('DM送信しませんでした')
        return

    def insert(self, json_src):
        # MongoへのJSON入力。
        try:
            self.Collection.insert(json_src)        # ここで格納
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            # エラー発生
            self.dbErrCnt += 1
            print('エラー ' + str(self.dbErrCnt) + '回目')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB エラー発生。')   # 同内容で何度も通知が発生するのを抑止する

            # ファイルへの書き出しを実施
            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('再処理失敗、ファイルに書いた')
                f.close()

# ここからメイン処理
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

ExitCode = 255

while (True):     # 無限ループ
    try:
        listener = Listener()
        stream = tweepy.Stream(auth, listener)
        listener.API = tweepy.API(auth)

        #どれか選択の上コメントアウト外す。
        print(TRACKWORD)
        stream.filter(track=TRACKWORD)
        #stream.sample()
        #stream.userstream()

        #ストップフラグによるストップ判定
        if(listener.StopFlg == True):
            ExitCode = 0
            break

    except KeyboardInterrupt:
        # CTRL+C
        print('CTRL + C で終了。')
        ExitCode = 0
        break
    except:
        print('エラー終了')
        pass    # 例外全部無視してループさせる

sys.exit(ExitCode)

 素人目にもツッコミどころしかないソースですが、何分あと数日(ぼかす)で動かさないといけないし、ある程度長時間動かしてもエラー発生しないので、恐る恐るではありますがこれで動かすことにします。

$ nohup python TweetCrawler.py >normal.log 2>error.log &

 こんな感じでぽちっとな、の予定。

急募

 つっこみどころと手直しどころ。

(続く。)

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?