Posted at

ツイートを長期間収集する実験(プログラム準備編(5))

More than 3 years have passed since last update.


前回まで



  • 要求仕様:OK


  • 最優先課題:OK


  • 実行環境整備:OK


  • 追加機能と動作の実証



    • DMでエラーを通知


    • 時間になったら止める機能



注:だいたいリアルタイムに書いてたりするので、書いてる途中で脱線したり停滞したりします。


その他の機能追加

 基本的に現状で、ほぼ動作に問題がないことが確認できました。そうなると次にやるべきは、「実行したまま忘れるのを防ぐために、期限が来たら終了するようにする」と、「エラー終了に気付かずにいる事態を避けるために、DMで連絡する」の第二優先機能の実装です。


時間が来たら終了する機能

 毎回 datetime.now() 読んでうんたら、というのも芸がないなーとか思ったのですが、他になんかこれという手段も思いつかないし、速度面の問題もなさげなのでシンプルに。

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

 こんな感じで時間を切って、取得時の処理を行う on_status()で、

        if(datetime.now() > ExitLimit):

print("時間が来たので終了")
self.send_message("時間経過で終了")
self.StopFlg = True
return False

 こんな感じで止める。最後の1個は(厳密には)時間外の取得になるけど、まあいいや。


DMでエラー通知する機能

 TweepyのStream.pyを覗くと、そのものずばりAPI()なるメソッドがあるのですが、これで取得したオブジェクトでは、DMの送信には使えないみたいです。

 そんなわけでメイン処理でtweepy.API()を呼んでオブジェクトを取得する方向で。

    def send_message(self, message):

if(message is None) or (self.API is None):
return
if(len(DM_DEST) != 0 and len(message) != 0):
d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
mes = "[TWEET CRAWLER]\n" + d + "\n" + message

self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
print('DM送信:' + message)
else:
print('DM送信しませんでした')
return

 こんな感じのメソッドを作って、事象発生時にはこれを呼ぶようにしましょう。


その他の懸案事項


DB落ちたらどうするの?

 MongoDBだけ器用に落ちる、という事態はたぶんない、と思うのですが、念のため、

落ちた時にはテキストファイルに書き出す、という処理を加えます。

    def insert(self, json_src):

# MongoへのJSONデータのinsert。
try:
self.Collection.insert(json_src) # ここで格納
self.dbErrCnt = 0
except pymongo.errors.PyMongoError as exc:
# mongoDBの謎ストップ
self.dbErrCnt += 1
print('エラー ' + str(self.dbErrCnt) + '回目')
if(self.dbErrCnt < 3):
self.send_message('MongoDB エラー発生。') #複数同一内容連打対策
#dir(exc)

with open('MongoInsertError.txt','a') as f:
f.write(str(json_src) + '\n')
print('再処理失敗、ファイルに書いた')
f.close()

 だいたいこんなもんかな。接続が生きてるかチェックする機能、みたいのがないか探したのですが、これというのもない感じだったので割愛。

 ちなみに、MongoDBが復活すれば、特に何かの再設定等必要なく再接続されてデータの格納が再開されたので、そういうもんだということにしてます(たぶんダメ


キーワードの指定は?

 イベント(隠語)まわりは複数のキーワードで呼ばれたりしてるので、複数のキーワードのORで指定しないとねーとなるわけです。


  • 正式名で登録商標

  • 登録商標で短め(5文字)

  • 登録商標で短め(3文字)

  • ハッシュタグに使われがちな英字1文字+数字

  • 四季1文字+名称先頭2文字

  • 英字(5文字)

 あたりを設定すれば、だいたい網羅できるでしょう。一部はハッシュタグと、タグなし両方入れておきます。

 Twitter Streaming APIの説明によれば、日本語のタグなしキーワードは、スペース等で分離されていないとヒットしない、とのことなので、タグなしキーワードは無くてもよさげではありますが……

TRACKWORD = ['#D51,#機関車,#蒸気機関車,#丘蒸気,#locomotive,機関車,蒸気機関車'] 

 こんな風に指定して(キーワードは適当にいじくっています)

stream.filter(track=TRACKWORD)

 呼び出しはこんな感じに修正。

 あまりたくさんのキーワードを入れると、スパム投稿が気になるところです。実際、本番用のキーワードでチェックしてみるとやたらと多いのがオークションbotで、「全部保存と言っても、これはいらんだろ」みたいなものもあるのが(一番静かな今の時期だからこそ)よくわかります。

 botはだいたい、特定のクライアントを使っているため、取得したjson中のキー"souce"を参照し、まとめて削除とかできそうです。まあ、「botの投稿数」とかもネタになりそうだから、削除はあとからということで割愛。


実働版ソース

 そんなこんなで、Pythonド素人が1か月(実働1週間くらい)で作った完成版がこちら。


TweetCrawler.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import tweepy
from tweepy.api import API

import pymongo
from pymongo import MongoClient
import json

from datetime import datetime, timedelta

import sys

# Twitter アクセス関係の変数
CK = '' # Consumer Key
CS = '' # Consumer Secret
AT = '' # Access Token
AS = '' # Accesss Token Secert

TRACKWORD = ['#(隠語)'] # Public Stream Filter動作時のキーワード

DM_DEST = '' # DMの送信先

# MongoDB 接続関係の変数
HOST = 'mongo' # ホスト
PORT = 27017 # ポート(デフォルト:27017)
DB_NAME = 'TwitterDB' # DB名
COL_NAME= 'Twitter' # コレクション名

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

class Listener(tweepy.StreamListener):
def __init__(self, api=None): # コンストラクタ
tweepy.StreamListener.__init__(self) # 親クラスのコンストラクタ
print('コンストラクタ')

self.StopFlg = False # ストップフラグ。
self.mongo_init()

self.API = None

self.dbErrCnt = 0

print(ExitLimit)

return

def mongo_init(self): # MongoDBの初期化
try:
Client = MongoClient(HOST, PORT)
db = Client[DB_NAME]
self.Collection = db[COL_NAME]
print('DB準備完了')
except pymongo.errors.PyMongoError as exc:
# 接続エラー
print('DB接続エラー')
return

def on_status(self, status):
#print('ツイート(' + str(self.TweetCnt) + ')')

self.insert(status._json)

if(datetime.now() > ExitLimit):
print("時間が来たので終了")
self.send_message("時間経過で終了")
self.StopFlg = True
return False

return True

def on_error(self, status_code):
print('エラー発生: ' + str(status_code))
self.send_message("ERR:" + str(status_code))
return True

def on_connect(self):
print('接続しました')

self.send_message('接続しました')
return

def on_disconnect(self, notice):
print('切断されました:' + str(notice.code))
self.send_message("DISCCONECT:" + str(notice.code))
return

def on_limit(self, track):
print('受信リミットが発生しました:' + str(track))
self.send_message("RCV_LIMIT:" + str(track))
return

def on_timeout(self):
print('タイムアウト')
self.send_message("TIMEOUT")
return True

def on_warning(self, notice):
print('警告メッセージ:' + str(notice.message))
self.send_message("WARN:" + str(notice.message))
return

def on_exception(self, exception):
print('例外エラー:' + str(exception))
self.send_message("EXCEPTION:" + str(exception))
return

def send_message(self, message):
# DMを送るメソッド
if(message is None) or (self.API is None):
return
if(len(DM_DEST) != 0 and len(message) != 0):
d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
mes = "[TWEET CRAWLER]\n" + d + "\n" + message

self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
print('DM送信:' + message)
else:
print('DM送信しませんでした')
return

def insert(self, json_src):
# MongoへのJSON入力。
try:
self.Collection.insert(json_src) # ここで格納
self.dbErrCnt = 0
except pymongo.errors.PyMongoError as exc:
# エラー発生
self.dbErrCnt += 1
print('エラー ' + str(self.dbErrCnt) + '回目')
if(self.dbErrCnt < 3):
self.send_message('MongoDB エラー発生。') # 同内容で何度も通知が発生するのを抑止する

# ファイルへの書き出しを実施
with open('MongoInsertError.txt','a') as f:
f.write(str(json_src) + '\n')
print('再処理失敗、ファイルに書いた')
f.close()

# ここからメイン処理
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

ExitCode = 255

while (True): # 無限ループ
try:
listener = Listener()
stream = tweepy.Stream(auth, listener)
listener.API = tweepy.API(auth)

#どれか選択の上コメントアウト外す。
print(TRACKWORD)
stream.filter(track=TRACKWORD)
#stream.sample()
#stream.userstream()

#ストップフラグによるストップ判定
if(listener.StopFlg == True):
ExitCode = 0
break

except KeyboardInterrupt:
# CTRL+C
print('CTRL + C で終了。')
ExitCode = 0
break
except:
print('エラー終了')
pass # 例外全部無視してループさせる

sys.exit(ExitCode)


 素人目にもツッコミどころしかないソースですが、何分あと数日(ぼかす)で動かさないといけないし、ある程度長時間動かしてもエラー発生しないので、恐る恐るではありますがこれで動かすことにします。

$ nohup python TweetCrawler.py >normal.log 2>error.log &

 こんな感じでぽちっとな、の予定。


急募

 つっこみどころと手直しどころ。

(続く。)