はじめに
Pythonを使ってツイートを収集する方法を紹介します。私的には最も簡単な方法だと思ってます。
以下の2つの方法のコードを書いています。
前準備
参考などに従って、Twitterのアクセストークンを取得している前提です。
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
使用するライブラリTweepyは以下のコマンドでインストール可能です。
コマンド
$ pip install tweepy
コード
- 言語: Python
- 使用ライブラリ: Tweepyなど
- GitHubリポジトリ: https://github.com/haradai1262/get_twitter_streaming_python
特定のキーワードやハッシュタグが付与されたツイートを収集する
pythonスクリプト
import os, sys, json
import tweepy
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
global stream_num
global dir_idx
tweet = json.loads(data)
if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない
save_path = "%s/%s.json" % ( query_dir, tweet["id_str"] )
f = open(save_path, "w")
json.dump(tweet, f)
f.close()
stream_num += 1
return True
return True
def on_error(self, status):
print( status )
if __name__ == '__main__':
query = "#乃木坂工事中" # 取得したい特定のキーワードやハッシュタグ
with open( '../secret/twitterapi.json', 'r') as f: key_dic = json.load(f)
key_id = "1"
savepath = "../data/tweet"
query_dir = "%s/%s" % (savepath, query)
if not os.path.exists( query_dir ):
os.mkdir(query_dir)
stream_num = 0
l = StdOutListener()
auth = tweepy.OAuthHandler( key_dic[key_id]["app_key"],
key_dic[key_id]["app_secret"] )
auth.set_access_token( key_dic[key_id]["oauth_token"],
key_dic[key_id]["oauth_token_secret"] )
def start_stream():
while True :
stream = tweepy.Stream(auth, l)
stream.filter(track = [query])
print( 'target query:', query )
start_stream()
ツイートを収集しながら10000件毎に圧縮する
pythonスクリプト
import os, sys, json
import tweepy
import tarfile
import shutil
def tar_compress_dir( path ):
zip_targets = []
base = os.path.basename(path)
zipfilepath = os.path.abspath('%s.zip' % path)
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if filepath == zipfilepath:
continue
arc_name = os.path.relpath(filepath, os.path.dirname(path))
zip_targets.append((filepath, arc_name))
for dirname in dirnames:
filepath = os.path.join(dirpath, dirname)
arc_name = os.path.relpath(filepath, os.path.dirname(path)) + os.path.sep
zip_targets.append((filepath, arc_name))
_tar = tarfile.open('%s.tar.gz' % path,'w:gz')
for filepath, name in zip_targets:
_tar.add(filepath, arcname = name)
_tar.close()
shutil.rmtree(path)
return
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
global stream_num
global dir_idx
tweet = json.loads(data)
if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない
if dir_idx != 0 and ( stream_num % 10000 ) == 0:
tar_compress_dir( "%s/%d" % ( query_dir, dir_idx ) ) # 圧縮
if ( stream_num % 10000 ) == 0:
dir_idx += 1
new_dir = "%s/%d" % ( query_dir, dir_idx )
os.mkdir(new_dir)
save_path = "%s/%d/%s.json" % ( query_dir, dir_idx, tweet["id_str"] )
f = open(save_path, "w")
json.dump(tweet, f)
f.close()
stream_num += 1
return True
return True
def on_error(self, status):
print( status )
if __name__ == '__main__':
query = "#乃木坂工事中" # 取得したいツイートに含まれるキーワード
with open( '../secret/twitterapi.json', 'r') as f: key_dic = json.load(f)
key_id = "1"
savepath = "../data/tweet"
query_dir = "%s/%s" % (savepath, query)
if not os.path.exists( query_dir ):
os.mkdir(query_dir)
dir_idx = 0
stream_num = 0
l = StdOutListener()
auth = tweepy.OAuthHandler( key_dic[key_id]["app_key"],
key_dic[key_id]["app_secret"] )
auth.set_access_token( key_dic[key_id]["oauth_token"],
key_dic[key_id]["oauth_token_secret"] )
def start_stream():
while True :
stream = tweepy.Stream(auth, l)
stream.filter(track = [query])
print( 'target query:', query )
start_stream()