注意事項の追記(2022年5月時点)
最初に作成したときから既に7年とか経ちそうな割に、未だにLGTMとかされているため、念のために追記します。
本稿記載のAPIは、現時点で既に存在しないAPI(2018/08に廃止)です。
代替として、「/statuses/filter」を用いる手段がありましたが、こちらも2020年10月末の廃止が宣言されました。
API 2.0の「/2/tweets/search/stream」が使えるようですが、こちらについては調べていません。
私自身は、「/search/tweets」を使って遡って取得する方向に逃げました。詳細はこちら。
前回まで
- 要求仕様:OK
- 最優先課題:OK
- 実行環境整備:OK
-
追加機能と動作の実証
- DMでエラーを通知
- 時間になったら止める機能
注:だいたいリアルタイムに書いてたりするので、書いてる途中で脱線したり停滞したりします。
その他の機能追加
基本的に現状で、ほぼ動作に問題がないことが確認できました。そうなると次にやるべきは、「実行したまま忘れるのを防ぐために、期限が来たら終了するようにする」と、「エラー終了に気付かずにいる事態を避けるために、DMで連絡する」の第二優先機能の実装です。
時間が来たら終了する機能
毎回 datetime.now() 読んでうんたら、というのも芸がないなーとか思ったのですが、他になんかこれという手段も思いつかないし、速度面の問題もなさげなのでシンプルに。
ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)
こんな感じで時間を切って、取得時の処理を行う on_status()で、
if(datetime.now() > ExitLimit):
print("時間が来たので終了")
self.send_message("時間経過で終了")
self.StopFlg = True
return False
こんな感じで止める。最後の1個は(厳密には)時間外の取得になるけど、まあいいや。
DMでエラー通知する機能
TweepyのStream.pyを覗くと、そのものずばりAPI()なるメソッドがあるのですが、これで取得したオブジェクトでは、DMの送信には使えないみたいです。
そんなわけでメイン処理でtweepy.API()を呼んでオブジェクトを取得する方向で。
def send_message(self, message):
if(message is None) or (self.API is None):
return
if(len(DM_DEST) != 0 and len(message) != 0):
d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
mes = "[TWEET CRAWLER]\n" + d + "\n" + message
self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
print('DM送信:' + message)
else:
print('DM送信しませんでした')
return
こんな感じのメソッドを作って、事象発生時にはこれを呼ぶようにしましょう。
その他の懸案事項
DB落ちたらどうするの?
MongoDBだけ器用に落ちる、という事態はたぶんない、と思うのですが、念のため、
落ちた時にはテキストファイルに書き出す、という処理を加えます。
def insert(self, json_src):
# MongoへのJSONデータのinsert。
try:
self.Collection.insert(json_src) # ここで格納
self.dbErrCnt = 0
except pymongo.errors.PyMongoError as exc:
# mongoDBの謎ストップ
self.dbErrCnt += 1
print('エラー ' + str(self.dbErrCnt) + '回目')
if(self.dbErrCnt < 3):
self.send_message('MongoDB エラー発生。') #複数同一内容連打対策
#dir(exc)
with open('MongoInsertError.txt','a') as f:
f.write(str(json_src) + '\n')
print('再処理失敗、ファイルに書いた')
f.close()
だいたいこんなもんかな。接続が生きてるかチェックする機能、みたいのがないか探したのですが、これというのもない感じだったので割愛。
ちなみに、MongoDBが復活すれば、特に何かの再設定等必要なく再接続されてデータの格納が再開されたので、そういうもんだということにしてます(たぶんダメ
キーワードの指定は?
イベント(隠語)まわりは複数のキーワードで呼ばれたりしてるので、複数のキーワードのORで指定しないとねーとなるわけです。
- 正式名で登録商標
- 登録商標で短め(5文字)
- 登録商標で短め(3文字)
- ハッシュタグに使われがちな英字1文字+数字
- 四季1文字+名称先頭2文字
- 英字(5文字)
あたりを設定すれば、だいたい網羅できるでしょう。一部はハッシュタグと、タグなし両方入れておきます。
Twitter Streaming APIの説明によれば、日本語のタグなしキーワードは、スペース等で分離されていないとヒットしない、とのことなので、タグなしキーワードは無くてもよさげではありますが……
TRACKWORD = ['#D51,#機関車,#蒸気機関車,#丘蒸気,#locomotive,機関車,蒸気機関車']
こんな風に指定して(キーワードは適当にいじくっています)
stream.filter(track=TRACKWORD)
呼び出しはこんな感じに修正。
あまりたくさんのキーワードを入れると、スパム投稿が気になるところです。実際、本番用のキーワードでチェックしてみるとやたらと多いのがオークションbotで、「全部保存と言っても、これはいらんだろ」みたいなものもあるのが(一番静かな今の時期だからこそ)よくわかります。
botはだいたい、特定のクライアントを使っているため、取得したjson中のキー"souce"を参照し、まとめて削除とかできそうです。まあ、「botの投稿数」とかもネタになりそうだから、削除はあとからということで割愛。
実働版ソース
そんなこんなで、Pythonド素人が1か月(実働1週間くらい)で作った完成版がこちら。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tweepy
from tweepy.api import API
import pymongo
from pymongo import MongoClient
import json
from datetime import datetime, timedelta
import sys
# Twitter アクセス関係の変数
CK = '' # Consumer Key
CS = '' # Consumer Secret
AT = '' # Access Token
AS = '' # Accesss Token Secert
TRACKWORD = ['#(隠語)'] # Public Stream Filter動作時のキーワード
DM_DEST = '' # DMの送信先
# MongoDB 接続関係の変数
HOST = 'mongo' # ホスト
PORT = 27017 # ポート(デフォルト:27017)
DB_NAME = 'TwitterDB' # DB名
COL_NAME= 'Twitter' # コレクション名
ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)
class Listener(tweepy.StreamListener):
def __init__(self, api=None): # コンストラクタ
tweepy.StreamListener.__init__(self) # 親クラスのコンストラクタ
print('コンストラクタ')
self.StopFlg = False # ストップフラグ。
self.mongo_init()
self.API = None
self.dbErrCnt = 0
print(ExitLimit)
return
def mongo_init(self): # MongoDBの初期化
try:
Client = MongoClient(HOST, PORT)
db = Client[DB_NAME]
self.Collection = db[COL_NAME]
print('DB準備完了')
except pymongo.errors.PyMongoError as exc:
# 接続エラー
print('DB接続エラー')
return
def on_status(self, status):
#print('ツイート(' + str(self.TweetCnt) + ')')
self.insert(status._json)
if(datetime.now() > ExitLimit):
print("時間が来たので終了")
self.send_message("時間経過で終了")
self.StopFlg = True
return False
return True
def on_error(self, status_code):
print('エラー発生: ' + str(status_code))
self.send_message("ERR:" + str(status_code))
return True
def on_connect(self):
print('接続しました')
self.send_message('接続しました')
return
def on_disconnect(self, notice):
print('切断されました:' + str(notice.code))
self.send_message("DISCCONECT:" + str(notice.code))
return
def on_limit(self, track):
print('受信リミットが発生しました:' + str(track))
self.send_message("RCV_LIMIT:" + str(track))
return
def on_timeout(self):
print('タイムアウト')
self.send_message("TIMEOUT")
return True
def on_warning(self, notice):
print('警告メッセージ:' + str(notice.message))
self.send_message("WARN:" + str(notice.message))
return
def on_exception(self, exception):
print('例外エラー:' + str(exception))
self.send_message("EXCEPTION:" + str(exception))
return
def send_message(self, message):
# DMを送るメソッド
if(message is None) or (self.API is None):
return
if(len(DM_DEST) != 0 and len(message) != 0):
d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
mes = "[TWEET CRAWLER]\n" + d + "\n" + message
self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
print('DM送信:' + message)
else:
print('DM送信しませんでした')
return
def insert(self, json_src):
# MongoへのJSON入力。
try:
self.Collection.insert(json_src) # ここで格納
self.dbErrCnt = 0
except pymongo.errors.PyMongoError as exc:
# エラー発生
self.dbErrCnt += 1
print('エラー ' + str(self.dbErrCnt) + '回目')
if(self.dbErrCnt < 3):
self.send_message('MongoDB エラー発生。') # 同内容で何度も通知が発生するのを抑止する
# ファイルへの書き出しを実施
with open('MongoInsertError.txt','a') as f:
f.write(str(json_src) + '\n')
print('再処理失敗、ファイルに書いた')
f.close()
# ここからメイン処理
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)
ExitCode = 255
while (True): # 無限ループ
try:
listener = Listener()
stream = tweepy.Stream(auth, listener)
listener.API = tweepy.API(auth)
#どれか選択の上コメントアウト外す。
print(TRACKWORD)
stream.filter(track=TRACKWORD)
#stream.sample()
#stream.userstream()
#ストップフラグによるストップ判定
if(listener.StopFlg == True):
ExitCode = 0
break
except KeyboardInterrupt:
# CTRL+C
print('CTRL + C で終了。')
ExitCode = 0
break
except:
print('エラー終了')
pass # 例外全部無視してループさせる
sys.exit(ExitCode)
素人目にもツッコミどころしかないソースですが、何分あと数日(ぼかす)で動かさないといけないし、ある程度長時間動かしてもエラー発生しないので、恐る恐るではありますがこれで動かすことにします。
$ nohup python TweetCrawler.py >normal.log 2>error.log &
こんな感じでぽちっとな、の予定。
急募
つっこみどころと手直しどころ。
(続く。)