21
Help us understand the problem. What are the problem?

More than 3 years have passed since last update.

posted at

updated at

Pythonで特定のキーワードが付与されたツイート収集する方法

はじめに

Pythonを使ってツイートを収集する方法を紹介します。私的には最も簡単な方法だと思ってます。
以下の2つの方法のコードを書いています。

前準備

参考などに従って、Twitterのアクセストークンを取得している前提です。

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

使用するライブラリTweepyは以下のコマンドでインストール可能です。

コマンド
$ pip install tweepy

コード

特定のキーワードやハッシュタグが付与されたツイートを収集する

pythonスクリプト
import os, sys, json
import tweepy

class StdOutListener(tweepy.StreamListener):
    def on_data(self, data):
        global stream_num
        global dir_idx

        tweet = json.loads(data)
        if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない
            save_path = "%s/%s.json" % ( query_dir, tweet["id_str"] )
            f = open(save_path, "w")
            json.dump(tweet, f)
            f.close()
            stream_num += 1
            return True
        return True

    def on_error(self, status):
        print( status )

if __name__ == '__main__':
    query = "#乃木坂工事中" # 取得したい特定のキーワードやハッシュタグ

    with open( '../secret/twitterapi.json', 'r') as f: key_dic = json.load(f)
    key_id = "1"
    savepath =  "../data/tweet"
    query_dir = "%s/%s" % (savepath, query)
    if not os.path.exists( query_dir ):
        os.mkdir(query_dir)

    stream_num = 0

    l = StdOutListener()
    auth = tweepy.OAuthHandler( key_dic[key_id]["app_key"],
                                key_dic[key_id]["app_secret"] )
    auth.set_access_token( key_dic[key_id]["oauth_token"],
                           key_dic[key_id]["oauth_token_secret"] )

    def start_stream():
        while True :
            stream = tweepy.Stream(auth, l)
            stream.filter(track = [query])

    print( 'target query:', query )
    start_stream()

ツイートを収集しながら10000件毎に圧縮する

pythonスクリプト
import os, sys, json
import tweepy
import tarfile
import shutil

def tar_compress_dir( path ):
    zip_targets = []
    base = os.path.basename(path)
    zipfilepath = os.path.abspath('%s.zip' % path)
    for dirpath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if filepath == zipfilepath:
                continue
            arc_name = os.path.relpath(filepath, os.path.dirname(path))
            zip_targets.append((filepath, arc_name))
        for dirname in dirnames:
            filepath = os.path.join(dirpath, dirname)
            arc_name = os.path.relpath(filepath, os.path.dirname(path)) + os.path.sep
            zip_targets.append((filepath, arc_name))

    _tar = tarfile.open('%s.tar.gz' % path,'w:gz')
    for filepath, name in zip_targets:
        _tar.add(filepath, arcname = name)
    _tar.close()
    shutil.rmtree(path)
    return

class StdOutListener(tweepy.StreamListener):
    def on_data(self, data):
        global stream_num
        global dir_idx

        tweet = json.loads(data)
        if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しない
            if dir_idx != 0 and ( stream_num % 10000 ) == 0: 
                tar_compress_dir( "%s/%d" % ( query_dir, dir_idx ) ) # 圧縮

            if ( stream_num % 10000 ) == 0:
                dir_idx += 1
                new_dir = "%s/%d" % ( query_dir, dir_idx )
                os.mkdir(new_dir)

            save_path = "%s/%d/%s.json" % ( query_dir, dir_idx, tweet["id_str"] )
            f = open(save_path, "w")
            json.dump(tweet, f)
            f.close()
            stream_num += 1
            return True
        return True

    def on_error(self, status):
        print( status )

if __name__ == '__main__':
    query = "#乃木坂工事中" # 取得したいツイートに含まれるキーワード

    with open( '../secret/twitterapi.json', 'r') as f: key_dic = json.load(f)
    key_id = "1"
    savepath =  "../data/tweet"
    query_dir = "%s/%s" % (savepath, query)
    if not os.path.exists( query_dir ):
        os.mkdir(query_dir)

    dir_idx = 0
    stream_num = 0

    l = StdOutListener()
    auth = tweepy.OAuthHandler( key_dic[key_id]["app_key"],
                                key_dic[key_id]["app_secret"] )
    auth.set_access_token( key_dic[key_id]["oauth_token"],
                           key_dic[key_id]["oauth_token_secret"] )

    def start_stream():
        while True :
            stream = tweepy.Stream(auth, l)
            stream.filter(track = [query])

    print( 'target query:', query )
    start_stream()

参考

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
21
Help us understand the problem. What are the problem?