はじめに
過去の記事(※1)で自然言語処理をする機会が増えてますとお伝えしてました。
その対象としては、Twitterも含まれていて、Tweetの取得を試みた時期がありました。
この2~3か月くらい、筆者は仕事でデータ収集(BeautifulSoupやSeleniumを使ったWebスクレイピング)と自然言語処理(MeCabを使った形態素解析とgensim使った単語の分散表現と感情分析)をする機会が増えてます。
もともと趣味で、プログラムからTweet(投稿)したり、他者のTweetを数件取得したりは経験していました。
が、大量データが必要になったので、既存の有志ブログに掲載のCodeを参考にしながら紹介します。(掲載のCodeが素晴らしく、大変助かりました。)
TwitterAPI でツイートを大量に取得。サーバー側エラーも考慮(pythonで)
※対象は「大量にダウンロード」の節です。
本稿では、元Codeに対して手を加えた部分を簡単に説明し、最後にCode全体を再掲してゆく。
本稿で紹介すること
- 元Codeに対する変更点
- 完成版のCode全体
本稿で紹介しないこと
- Twitter APIの利用手続きおよび仕様
- Pythonライブラリ(requests_oauthlib、requests、codecs、etc.)の全般
元Codeに対する変更点
大きく、3つです。
- HTTPリクエストの発行
- Waningメッセージの抑止
- Tweet詳細情報の保存
1. HTTPリクエストの発行
ProxyやらUser-Agentやらを設定するため、慣れ親しんだrequestsを使うように変更しました。
以下の事項をrequests.get()のOptionパラメータで指定
- timeoutの設定
- SSL証明書チェックのSkip(無効化)
- Header(User-Agent)の設定
- Proxyの設定 ★Proxy環境化で実行する場合は必須
特に、4点目の答えにたどり着くまでに筆者は少し悩みましたw
#----------------
# ツイート取得
#----------------
cnt = 0
unavailableCnt = 0
while True:
res = self.session.get(url, params = params)
PROXIES = {
'http' :'http://${ProxyサーバのIPアドレス}:${ProxyサーバのPort番号}/',
'https':'http://${ProxyサーバのIPアドレス}:${ProxyサーバのPort番号}/'
}
HEADERS = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36'
}
~
(中略)
~
#----------------
# ツイート取得
#----------------
cnt = 0
unavailableCnt = 0
while True:
#res = self.session.get(url, params = params)
res = requests.get(url, auth=self.auth, timeout=(10.0), verify=False, headers=HEADERS, proxies=PROXIES, params=params)
2. Waningメッセージの抑止
「1. HTTPリクエストの発行」の対応だけではWaningメッセージが大量に出力されて結果ログが見づらくなるため、Code冒頭でWarningメッセージを抑止する設定を入れました。
Advanced Usage — requests-docs-ja 1.0.4 documentation
本稿では通信相手を安全と判断して扱いますが、読者の所属組織(プログラム実行環境)のセキュリティルールを鑑みて適宜対応を願います。
# SSL証明書の検証をSkipし、以下のメッセージを抑止(無効化)する
# InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
3. Tweet詳細情報の保存
リツイート数やいいね数まで取得し、コンソール表示ではなくファイル出力するように変更しました。
収集対象数の上限は、とりあえず500で指定
sys.maxsizeで指定した場合は、Twitter APIの時間あたりのリクエスト上限数を鑑みながら大量にダウンロードする動き
if __name__ == '__main__':
# キーワードで取得
getter = TweetsGetter.bySearch(u'渋谷')
# ユーザーを指定して取得 (screen_name)
#getter = TweetsGetter.byUser('AbeShinzo')
cnt = 0
for tweet in getter.collect(total = 3000):
cnt += 1
print ('------ %d' % cnt)
print ('{} {} {}'.format(tweet['id'], tweet['created_at'], '@'+tweet['user']['screen_name']))
print (tweet['text'])
if __name__ == '__main__':
# キーワードで取得
getter = TweetsGetter.bySearch(u'新型コロナウイルス感染症')
f = codecs.open('tweet_bySearch.log', 'w', 'utf-8')
cnt = 0
#for tweet in getter.collect(total=sys.maxsize):
for tweet in getter.collect(total=500):
cnt += 1
#print ('------ %d' % cnt)
#print ('{} {} {}'.format(tweet['id'], tweet['created_at'], '@'+tweet['user']['screen_name']))
#print (tweet['text'])
f.write('"{}","{}","{}","{}","{}","{}"\n'.format(tweet['id'], tweet['created_at'].strip(), '@'+tweet['user']['screen_name'].strip(), tweet['retweet_count'], tweet['favorite_count'], tweet['text'].strip().replace('\n', '')))
f.close()
完成版のCode全体
以下、完成版のCode全体です。
Twitter APIのConsumer Keyや社内Proxyサーバ、User-Agentなどは読者で各位設定ください。
# SSL証明書の検証をSkipし、以下のメッセージを抑止(無効化)する
# InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
from requests_oauthlib import OAuth1Session, OAuth1
import requests
import json
import datetime, time, sys
from abc import ABCMeta, abstractmethod
import codecs
CK = '${YOUR Consumer Key}' # Consumer Key
CS = '${YOUR Consumer Secret}' # Consumer Secret
AT = '${YOUR Access Token}' # Access Token
AS = '${YOUR Access Token Secert}' # Access Token Secert
PROXIES = {
'http' :'http://${ProxyサーバのIPアドレス}:${ProxyサーバのPort番号}/', # Proxy for HTTP
'https':'http://${ProxyサーバのIPアドレス}:${ProxyサーバのPort番号}/' # Proxy for HTTPS
}
HEADERS = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36'
}
class TweetsGetter(object):
__metaclass__ = ABCMeta
def __init__(self):
#self.session = OAuth1Session(CK, CS, AT, AS)
self.auth = OAuth1(CK, CS, AT, AS)
@abstractmethod
def specifyUrlAndParams(self, keyword):
'''
呼出し先 URL、パラメータを返す
'''
@abstractmethod
def pickupTweet(self, res_text, includeRetweet):
'''
res_text からツイートを取り出し、配列にセットして返却
'''
@abstractmethod
def getLimitContext(self, res_text):
'''
回数制限の情報を取得 (起動時)
'''
def collect(self, total = -1, onlyText = False, includeRetweet = False):
'''
ツイート取得を開始する
'''
#----------------
# 回数制限を確認
#----------------
self.checkLimit()
#----------------
# URL、パラメータ
#----------------
url, params = self.specifyUrlAndParams()
params['include_rts'] = str(includeRetweet).lower()
# include_rts は statuses/user_timeline のパラメータ。search/tweets には無効
#----------------
# ツイート取得
#----------------
cnt = 0
unavailableCnt = 0
while True:
#res = self.session.get(url, params = params)
res = requests.get(url, auth=self.auth, timeout=(10.0), verify=False, headers=HEADERS, proxies=PROXIES, params=params)
if res.status_code == 503:
# 503 : Service Unavailable
if unavailableCnt > 10:
raise Exception('Twitter API error %d' % res.status_code)
unavailableCnt += 1
print ('Service Unavailable 503')
self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
continue
unavailableCnt = 0
if res.status_code != 200:
raise Exception('Twitter API error %d' % res.status_code)
tweets = self.pickupTweet(json.loads(res.text))
if len(tweets) == 0:
# len(tweets) != params['count'] としたいが
# count は最大値らしいので判定に使えない。
# ⇒ "== 0" にする
# https://dev.twitter.com/discussions/7513
break
for tweet in tweets:
if (('retweeted_status' in tweet) and (includeRetweet is False)):
pass
else:
if onlyText is True:
yield tweet['text']
else:
yield tweet
cnt += 1
if cnt % 100 == 0:
print ('%d件 ' % cnt)
if total > 0 and cnt >= total:
return
params['max_id'] = tweet['id'] - 1
# ヘッダ確認 (回数制限)
# X-Rate-Limit-Remaining が入ってないことが稀にあるのでチェック
if ('X-Rate-Limit-Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers):
if (int(res.headers['X-Rate-Limit-Remaining']) == 0):
self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset']))
self.checkLimit()
else:
print ('not found - X-Rate-Limit-Remaining or X-Rate-Limit-Reset')
self.checkLimit()
def checkLimit(self):
'''
回数制限を問合せ、アクセス可能になるまで wait する
'''
unavailableCnt = 0
while True:
url = "https://api.twitter.com/1.1/application/rate_limit_status.json"
#res = self.session.get(url)
res = requests.get(url, auth=self.auth, timeout=(10.0), verify=False, headers=HEADERS, proxies=PROXIES)
if res.status_code == 503:
# 503 : Service Unavailable
if unavailableCnt > 10:
raise Exception('Twitter API error %d' % res.status_code)
unavailableCnt += 1
print ('Service Unavailable 503')
self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
continue
unavailableCnt = 0
if res.status_code != 200:
raise Exception('Twitter API error %d' % res.status_code)
remaining, reset = self.getLimitContext(json.loads(res.text))
if (remaining == 0):
self.waitUntilReset(reset)
else:
break
def waitUntilReset(self, reset):
'''
reset 時刻まで sleep
'''
seconds = reset - time.mktime(datetime.datetime.now().timetuple())
seconds = max(seconds, 0)
print ('\n =====================')
print (' == waiting %d sec ==' % seconds)
print (' =====================')
sys.stdout.flush()
time.sleep(seconds + 10) # 念のため + 10 秒
@staticmethod
def bySearch(keyword):
return TweetsGetterBySearch(keyword)
@staticmethod
def byUser(screen_name):
return TweetsGetterByUser(screen_name)
class TweetsGetterBySearch(TweetsGetter):
'''
キーワードでツイートを検索
'''
def __init__(self, keyword):
super(TweetsGetterBySearch, self).__init__()
self.keyword = keyword
def specifyUrlAndParams(self):
'''
呼出し先 URL、パラメータを返す
'''
url = 'https://api.twitter.com/1.1/search/tweets.json'
params = {'q':self.keyword, 'count':100}
return url, params
def pickupTweet(self, res_text):
'''
res_text からツイートを取り出し、配列にセットして返却
'''
results = []
for tweet in res_text['statuses']:
results.append(tweet)
return results
def getLimitContext(self, res_text):
'''
回数制限の情報を取得 (起動時)
'''
remaining = res_text['resources']['search']['/search/tweets']['remaining']
reset = res_text['resources']['search']['/search/tweets']['reset']
return int(remaining), int(reset)
class TweetsGetterByUser(TweetsGetter):
'''
ユーザーを指定してツイートを取得
'''
def __init__(self, screen_name):
super(TweetsGetterByUser, self).__init__()
self.screen_name = screen_name
def specifyUrlAndParams(self):
'''
呼出し先 URL、パラメータを返す
'''
url = 'https://api.twitter.com/1.1/statuses/user_timeline.json'
params = {'screen_name':self.screen_name, 'count':200}
return url, params
def pickupTweet(self, res_text):
'''
res_text からツイートを取り出し、配列にセットして返却
'''
results = []
for tweet in res_text:
results.append(tweet)
return results
def getLimitContext(self, res_text):
'''
回数制限の情報を取得 (起動時)
'''
remaining = res_text['resources']['statuses']['/statuses/user_timeline']['remaining']
reset = res_text['resources']['statuses']['/statuses/user_timeline']['reset']
return int(remaining), int(reset)
if __name__ == '__main__':
# キーワードで取得
getter = TweetsGetter.bySearch(u'新型コロナウイルス感染症')
f = codecs.open('tweet_bySearch.log', 'w', 'utf-8')
cnt = 0
#for tweet in getter.collect(total=sys.maxsize):
for tweet in getter.collect(total=500):
cnt += 1
#print ('------ %d' % cnt)
#print ('{} {} {}'.format(tweet['id'], tweet['created_at'], '@'+tweet['user']['screen_name']))
#print (tweet['text'])
f.write('"{}","{}","{}","{}","{}","{}"\n'.format(tweet['id'], tweet['created_at'].strip(), '@'+tweet['user']['screen_name'].strip(), tweet['retweet_count'], tweet['favorite_count'], tweet['text'].strip().replace('\n', '')))
f.close()
# ユーザーを指定して取得 (screen_name)
getter = TweetsGetter.byUser('Qiita')
f = codecs.open('tweet_byUser.log', 'w', 'utf-8')
cnt = 0
#for tweet in getter.collect(total=sys.maxsize):
for tweet in getter.collect(total=500):
cnt += 1
#print ('------ %d' % cnt)
#print ('{} {} {}'.format(tweet['id'], tweet['created_at'], '@'+tweet['user']['screen_name']))
#print (tweet['text'])
f.write('"{}","{}","{}","{}","{}","{}"\n'.format(tweet['id'], tweet['created_at'].strip(), '@'+tweet['user']['screen_name'].strip(), tweet['retweet_count'], tweet['favorite_count'], tweet['text'].strip().replace('\n', '')))
f.close()
出力ファイルを開くと、それぞれ以下のようになっていました。(本稿執筆、2022/01/11時点)
"1480732674314899458","Tue Jan 11 02:45:54 +0000 2022","@nobusinsan","0","0","新型コロナウイルス 感染症専門医なる人たちは素人にも納得のいく説明をhttps://t.co/5a1cbldiod https://t.co/iitIImMWWd"
"1480732637777956866","Tue Jan 11 02:45:46 +0000 2022","@Redpanda_hkd","0","0","[千歳市] 新型コロナウイルス感染症に係る注意喚起 (北海道 千歳市) https://t.co/lIFK2GPRip"
"1480732523420590080","Tue Jan 11 02:45:18 +0000 2022","@DrugstoreCon","0","0","新型コロナウイルス感染症相談コールセンターの入電状況について|沖縄県・9時前後は特に繋がりにくくなっておりますので、入電数の少ない時間帯に電話していただくか、少し時間を空けておかけ直しいただきますよう、ご協力ください… https://t.co/byjbCjYQus"
"1480732192372555776","Tue Jan 11 02:43:59 +0000 2022","@HKT48cheerweb","0","1","池袋駅前にて新型コロナウイルス感染症モニタリングPCR検査を都が無料で行う準備をテントを建てて行ってました。 https://t.co/lpKqV0LfLc"
"1480731387145900035","Tue Jan 11 02:40:48 +0000 2022","@yoshiki7111","0","0","本日(1/10)、飯田保健所管内において、48名の新型コロナウイルス感染症の陽性者が確認されたとの公表がありました。感染された方、ご家族をはじめ関係者の ...リンク:https://t.co/BWvBTn8ylhタ グ:#ホームページ"
"1480720264652800001","Tue Jan 11 01:56:36 +0000 2022","@Qiita","0","0","○○LGTM達成や○○Contributions達成のお知らせツイートは @qiita_milestone でおこなっていますので是非フォローをお願いします! https://t.co/pkDG0n229U"
"1480700066403074049","Tue Jan 11 00:36:20 +0000 2022","@Qiita","1","3","QiitaAzure記事投稿キャンペーン11月テーマ「マイクロソフト認定資格を取得する際の学習方法や経験談、おすすめ学習リソースなどを紹介しよう!」11月の月間Contributorの発表です。キャンペーンに参加したきっ… https://t.co/7sMoJ6DdYL"
"1479329476119269376","Fri Jan 07 05:50:06 +0000 2022","@Qiita","0","2","📝リリースノート訪問済みリンクのスタイルの追加と、検索結果ページにおけるautofocusを外しました。https://t.co/G2XOVqfzmN"
"1479271101985603590","Fri Jan 07 01:58:08 +0000 2022","@Qiita","0","0","○○LGTM達成や○○Contributions達成のお知らせツイートは @qiita_milestone でおこなっていますので是非フォローをお願いします! https://t.co/4WcHuWxSw0"
"1478957891084500992","Thu Jan 06 05:13:33 +0000 2022","@Qiita","0","0","○○LGTM達成や○○Contributions達成のお知らせツイートは @qiita_milestone でおこなっていますので是非フォローをお願いします! https://t.co/EsOgA9UJOV"
まとめ
PythonプログラムからTwitter APIを利用して、大量のTweetを取得することができました。
これを遊休マシンに仕込んでおけば、大量のTweetを溜め込むことができるようになるかな、と。