Help us understand the problem. What is going on with this article?

ツイートを長期間収集する実験(プログラム準備編(5))

More than 3 years have passed since last update.

前回まで

  • 要求仕様:OK
  • 最優先課題:OK
  • 実行環境整備:OK
  • 追加機能と動作の実証
    • DMでエラーを通知
    • 時間になったら止める機能

注:だいたいリアルタイムに書いてたりするので、書いてる途中で脱線したり停滞したりします。

その他の機能追加

 基本的に現状で、ほぼ動作に問題がないことが確認できました。そうなると次にやるべきは、「実行したまま忘れるのを防ぐために、期限が来たら終了するようにする」と、「エラー終了に気付かずにいる事態を避けるために、DMで連絡する」の第二優先機能の実装です。

時間が来たら終了する機能

 毎回 datetime.now() 読んでうんたら、というのも芸がないなーとか思ったのですが、他になんかこれという手段も思いつかないし、速度面の問題もなさげなのでシンプルに。

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

 こんな感じで時間を切って、取得時の処理を行う on_status()で、

        if(datetime.now() > ExitLimit):
            print("時間が来たので終了")
            self.send_message("時間経過で終了")
            self.StopFlg = True
            return False

 こんな感じで止める。最後の1個は(厳密には)時間外の取得になるけど、まあいいや。

DMでエラー通知する機能

 TweepyのStream.pyを覗くと、そのものずばりAPI()なるメソッドがあるのですが、これで取得したオブジェクトでは、DMの送信には使えないみたいです。
 そんなわけでメイン処理でtweepy.API()を呼んでオブジェクトを取得する方向で。

    def send_message(self, message):
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message

            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM送信:' + message)
        else:
            print('DM送信しませんでした')
        return

 こんな感じのメソッドを作って、事象発生時にはこれを呼ぶようにしましょう。

その他の懸案事項

DB落ちたらどうするの?

 MongoDBだけ器用に落ちる、という事態はたぶんない、と思うのですが、念のため、
落ちた時にはテキストファイルに書き出す、という処理を加えます。

    def insert(self, json_src):
        # MongoへのJSONデータのinsert。
        try:
            self.Collection.insert(json_src)        # ここで格納
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            # mongoDBの謎ストップ
            self.dbErrCnt += 1
            print('エラー ' + str(self.dbErrCnt) + '回目')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB エラー発生。')   #複数同一内容連打対策
                #dir(exc)

            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('再処理失敗、ファイルに書いた')
                f.close()

 だいたいこんなもんかな。接続が生きてるかチェックする機能、みたいのがないか探したのですが、これというのもない感じだったので割愛。
 ちなみに、MongoDBが復活すれば、特に何かの再設定等必要なく再接続されてデータの格納が再開されたので、そういうもんだということにしてます(たぶんダメ

キーワードの指定は?

 イベント(隠語)まわりは複数のキーワードで呼ばれたりしてるので、複数のキーワードのORで指定しないとねーとなるわけです。

  • 正式名で登録商標
  • 登録商標で短め(5文字)
  • 登録商標で短め(3文字)
  • ハッシュタグに使われがちな英字1文字+数字
  • 四季1文字+名称先頭2文字
  • 英字(5文字)

 あたりを設定すれば、だいたい網羅できるでしょう。一部はハッシュタグと、タグなし両方入れておきます。
 Twitter Streaming APIの説明によれば、日本語のタグなしキーワードは、スペース等で分離されていないとヒットしない、とのことなので、タグなしキーワードは無くてもよさげではありますが……

TRACKWORD = ['#D51,#機関車,#蒸気機関車,#丘蒸気,#locomotive,機関車,蒸気機関車'] 

 こんな風に指定して(キーワードは適当にいじくっています)

stream.filter(track=TRACKWORD)

 呼び出しはこんな感じに修正。
 あまりたくさんのキーワードを入れると、スパム投稿が気になるところです。実際、本番用のキーワードでチェックしてみるとやたらと多いのがオークションbotで、「全部保存と言っても、これはいらんだろ」みたいなものもあるのが(一番静かな今の時期だからこそ)よくわかります。

 botはだいたい、特定のクライアントを使っているため、取得したjson中のキー"souce"を参照し、まとめて削除とかできそうです。まあ、「botの投稿数」とかもネタになりそうだから、削除はあとからということで割愛。

実働版ソース

 そんなこんなで、Pythonド素人が1か月(実働1週間くらい)で作った完成版がこちら。

TweetCrawler.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tweepy
from tweepy.api import API

import pymongo
from pymongo import MongoClient
import json

from datetime import datetime, timedelta

import sys

# Twitter アクセス関係の変数
CK = ''                            # Consumer Key
CS = ''   # Consumer Secret
AT = ''   # Access Token
AS = ''        # Accesss Token Secert

TRACKWORD = ['#(隠語)']  # Public Stream Filter動作時のキーワード

DM_DEST = ''      # DMの送信先

# MongoDB 接続関係の変数
HOST = 'mongo'      # ホスト
PORT = 27017            # ポート(デフォルト:27017)
DB_NAME = 'TwitterDB'   # DB名
COL_NAME= 'Twitter'    # コレクション名

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

class Listener(tweepy.StreamListener):
    def __init__(self, api=None):   # コンストラクタ
        tweepy.StreamListener.__init__(self)    # 親クラスのコンストラクタ
        print('コンストラクタ')

        self.StopFlg = False          # ストップフラグ。
        self.mongo_init()

        self.API = None

        self.dbErrCnt = 0

        print(ExitLimit)

        return

    def mongo_init(self):           # MongoDBの初期化
        try:
            Client = MongoClient(HOST, PORT)
            db = Client[DB_NAME]
            self.Collection = db[COL_NAME]
            print('DB準備完了')
        except pymongo.errors.PyMongoError as exc:
            # 接続エラー
            print('DB接続エラー')
        return

    def on_status(self, status):
        #print('ツイート(' + str(self.TweetCnt) + ')')

        self.insert(status._json)

        if(datetime.now() > ExitLimit):
            print("時間が来たので終了")
            self.send_message("時間経過で終了")
            self.StopFlg = True
            return False

        return True

    def on_error(self, status_code):
        print('エラー発生: ' + str(status_code))
        self.send_message("ERR:" + str(status_code))
        return True

    def on_connect(self):
        print('接続しました')

        self.send_message('接続しました')
        return

    def on_disconnect(self, notice):
        print('切断されました:' + str(notice.code))
        self.send_message("DISCCONECT:" + str(notice.code))
        return

    def on_limit(self, track):
        print('受信リミットが発生しました:' + str(track))
        self.send_message("RCV_LIMIT:" + str(track))
        return

    def on_timeout(self):
        print('タイムアウト')
        self.send_message("TIMEOUT")
        return True

    def on_warning(self, notice):
        print('警告メッセージ:' + str(notice.message))
        self.send_message("WARN:" + str(notice.message))
        return

    def on_exception(self, exception):
        print('例外エラー:' + str(exception))
        self.send_message("EXCEPTION:" + str(exception))
        return

    def send_message(self, message):
        # DMを送るメソッド
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message

            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM送信:' + message)
        else:
            print('DM送信しませんでした')
        return

    def insert(self, json_src):
        # MongoへのJSON入力。
        try:
            self.Collection.insert(json_src)        # ここで格納
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            # エラー発生
            self.dbErrCnt += 1
            print('エラー ' + str(self.dbErrCnt) + '回目')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB エラー発生。')   # 同内容で何度も通知が発生するのを抑止する

            # ファイルへの書き出しを実施
            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('再処理失敗、ファイルに書いた')
                f.close()

# ここからメイン処理
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

ExitCode = 255

while (True):     # 無限ループ
    try:
        listener = Listener()
        stream = tweepy.Stream(auth, listener)
        listener.API = tweepy.API(auth)

        #どれか選択の上コメントアウト外す。
        print(TRACKWORD)
        stream.filter(track=TRACKWORD)
        #stream.sample()
        #stream.userstream()

        #ストップフラグによるストップ判定
        if(listener.StopFlg == True):
            ExitCode = 0
            break

    except KeyboardInterrupt:
        # CTRL+C
        print('CTRL + C で終了。')
        ExitCode = 0
        break
    except:
        print('エラー終了')
        pass    # 例外全部無視してループさせる

sys.exit(ExitCode)

 素人目にもツッコミどころしかないソースですが、何分あと数日(ぼかす)で動かさないといけないし、ある程度長時間動かしてもエラー発生しないので、恐る恐るではありますがこれで動かすことにします。

$ nohup python TweetCrawler.py >normal.log 2>error.log &

 こんな感じでぽちっとな、の予定。

急募

 つっこみどころと手直しどころ。

(続く。)

yuhkan
へっぽこプログラマです。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away