search
LoginSignup
98

posted at

updated at

PythonでTwitterスクレイピング&データフレーム化

なぜこの記事を書いたか

twitter感情分析に必要なデータの準備、すなわち
(1)PythonでTwitterスクレイピング⇨**(2)スクレイピング結果をデータフレームとして出力**」をシームレスに解説している記事がなかったからです。

「Twiiterスクレイピング」や「Pythonで感情分析(ネガポジ分析)」、「データフレーム」はそれぞれポピュラーなテーマなので単体ではよく解説記事を見かけます。しかしどれも部分的で、当時初心者レベルの自分には痒いところに手が届かず、中々苦労しました。

本記事の内容

本記事のコードをコピペすればそのまま任意のキーワードやユーザーからツイートを取得、データフレームとして出力が出来ます。ただし、TwiiterAPIを取得していることが前提です。

TwitterAPI取得方法はこちら

##注意点
・スクレイピングはTwiiter社の規約に則って行いましょう(このコードだと多分問題ないと思うのですが・・・。問題あれば是非ご指摘ください)
・twitterAPIの制限で1週間前までのツイートしか取得できません
・本記事ではテキストデータの整形は扱いません(いつか追記するかも)
・本記事ではデータフレームとして出力したデータを感情分析するコードは扱いません(別の記事で書くかも)

環境

・Google Colaboratory
・Python3

コード

準備

まず、各種ライブラリのインポートと、TwitterAPIのセットを行います。
個人で取得した4つのTwitterAPIキーをCK,CS,AT,AS''内に入れてください。


# -*- coding: utf-8 -*-
import pandas as pd
from requests_oauthlib import OAuth1Session
import json
import datetime, time, sys
from abc import ABCMeta, abstractmethod
 
CK = '??????????' # Consumer Key
CS = '??????????' # Consumer Secret
AT = '??????????' # Access Token
AS = '??????????' # Accesss Token Secert

ツイートを取得

ツイートを取得します。
(1)ある単語を含むツイートを取得 or (2)あるユーザーIDのツイートを取得 が可能です。お好みで。
一番下のtotal=内に任意の数値を入れれば、その数までのツイートを取得できます。


class TweetsGetter(object):
    __metaclass__ = ABCMeta
 
    def __init__(self):
        self.session = OAuth1Session(CK, CS, AT, AS)
 
    @abstractmethod
    def specifyUrlAndParams(self, keyword):
        '''
        呼出し先 URL、パラメータを返す
        '''
 
    @abstractmethod
    def pickupTweet(self, res_text, includeRetweet):
        '''
        res_text からツイートを取り出し、配列にセットして返却
        '''
 
    @abstractmethod
    def getLimitContext(self, res_text):
        '''
        回数制限の情報を取得 (起動時)
        '''
 
    def collect(self, total = -1, onlyText = False, includeRetweet = False):
        '''
        ツイート取得を開始する
        '''
 
        #----------------
        # 回数制限を確認
        #----------------
        self.checkLimit()
 
        #----------------
        # URL、パラメータ
        #----------------
        url, params = self.specifyUrlAndParams()
        params['include_rts'] = str(includeRetweet).lower()
        # include_rts は statuses/user_timeline のパラメータ。search/tweets には無効
 
        #----------------
        # ツイート取得
        #----------------
        cnt = 0
        unavailableCnt = 0
        while True:
            res = self.session.get(url, params = params)
            if res.status_code == 503:
                # 503 : Service Unavailable
                if unavailableCnt > 10:
                    raise Exception('Twitter API error %d' % res.status_code)
 
                unavailableCnt += 1
                print ('Service Unavailable 503')
                self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
                continue
 
            unavailableCnt = 0
 
            if res.status_code != 200:
                raise Exception('Twitter API error %d' % res.status_code)
 
            tweets = self.pickupTweet(json.loads(res.text))
            if len(tweets) == 0:
                # len(tweets) != params['count'] としたいが
                # count は最大値らしいので判定に使えない。
                # ⇒  "== 0" にする
                # https://dev.twitter.com/discussions/7513
                break
 
            for tweet in tweets:
                if (('retweeted_status' in tweet) and (includeRetweet is False)):
                    pass
                else:
                    if onlyText is True:
                        yield tweet['text']
                    else:
                        yield tweet
 
                    cnt += 1
                    if cnt % 100 == 0:
                        print ('%d件 ' % cnt)
 
                    if total > 0 and cnt >= total:
                        return
 
            params['max_id'] = tweet['id'] - 1
 
            # ヘッダ確認 (回数制限)
            # X-Rate-Limit-Remaining が入ってないことが稀にあるのでチェック
            if ('X-Rate-Limit-Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers):
                if (int(res.headers['X-Rate-Limit-Remaining']) == 0):
                    self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset']))
                    self.checkLimit()
            else:
                print ('not found  -  X-Rate-Limit-Remaining or X-Rate-Limit-Reset')
                self.checkLimit()
 
    def checkLimit(self):
        '''
        回数制限を問合せ、アクセス可能になるまで wait する
        '''
        unavailableCnt = 0
        while True:
            url = "https://api.twitter.com/1.1/application/rate_limit_status.json"
            res = self.session.get(url)
 
            if res.status_code == 503:
                # 503 : Service Unavailable
                if unavailableCnt > 10:
                    raise Exception('Twitter API error %d' % res.status_code)
 
                unavailableCnt += 1
                print ('Service Unavailable 503')
                self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
                continue
 
            unavailableCnt = 0
 
            if res.status_code != 200:
                raise Exception('Twitter API error %d' % res.status_code)
 
            remaining, reset = self.getLimitContext(json.loads(res.text))
            if (remaining == 0):
                self.waitUntilReset(reset)
            else:
                break
 
    def waitUntilReset(self, reset):
        '''
        reset 時刻まで sleep
        '''
        seconds = reset - time.mktime(datetime.datetime.now().timetuple())
        seconds = max(seconds, 0)
        print ('\n     =====================')
        print ('     == waiting %d sec ==' % seconds)
        print ('     =====================')
        sys.stdout.flush()
        time.sleep(seconds + 10)  # 念のため + 10 秒
 
    @staticmethod
    def bySearch(keyword):
        return TweetsGetterBySearch(keyword)
 
    @staticmethod
    def byUser(screen_name):
        return TweetsGetterByUser(screen_name)
 

 
class TweetsGetterBySearch(TweetsGetter):
    '''
    キーワードでツイートを検索
    '''
    def __init__(self, keyword):
        super(TweetsGetterBySearch, self).__init__()
        self.keyword = keyword
        
    def specifyUrlAndParams(self):
        '''
        呼出し先 URL、パラメータを返す
        '''
        url = 'https://api.twitter.com/1.1/search/tweets.json'
        params = {'q':self.keyword, 'count':100}
        return url, params
 
    def pickupTweet(self, res_text):
        '''
        res_text からツイートを取り出し、配列にセットして返却
        '''
        results = []
        for tweet in res_text['statuses']:
            results.append(tweet)
 
        return results
 
    def getLimitContext(self, res_text):
        '''
        回数制限の情報を取得 (起動時)
        '''
        remaining = res_text['resources']['search']['/search/tweets']['remaining']
        reset     = res_text['resources']['search']['/search/tweets']['reset']
 
        return int(remaining), int(reset)
    
 
class TweetsGetterByUser(TweetsGetter):
    '''
    ユーザーを指定してツイートを取得
    '''
    def __init__(self, screen_name):
        super(TweetsGetterByUser, self).__init__()
        self.screen_name = screen_name
        
    def specifyUrlAndParams(self):
        '''
        呼出し先 URL、パラメータを返す
        '''
        url = 'https://api.twitter.com/1.1/statuses/user_timeline.json'
        params = {'screen_name':self.screen_name, 'count':200}
        return url, params
 
    def pickupTweet(self, res_text):
        '''
        res_text からツイートを取り出し、配列にセットして返却
        '''
        results = []
        for tweet in res_text:
            results.append(tweet)
 
        return results
 
    def getLimitContext(self, res_text):
        '''
        回数制限の情報を取得 (起動時)
        '''
        remaining = res_text['resources']['statuses']['/statuses/user_timeline']['remaining']
        reset     = res_text['resources']['statuses']['/statuses/user_timeline']['reset']
 
        return int(remaining), int(reset)


if __name__ == '__main__':
 
    # キーワードで取得
    getter = TweetsGetter.bySearch(u'Python')
    
    # ユーザーを指定して取得 (screen_name)
    #getter = TweetsGetter.byUser('@realDonaldTrump')
 
    list_text = []
    list_id = []
    list_user_screenname = []
    list_created_at = []
    
    for tweet in getter.collect(total = 3000):
        list_text.append(tweet['text'])
        list_id.append(tweet['id'])
        list_user_screenname.append(tweet['user']['screen_name'])
        list_created_at.append(tweet['created_at'])
       

スクレイピング結果をデータフレームとして出力

Columns: [text, id, user, created_at]
Row: ツイートの数分(例えば3000個ツイートを取得したなら3000)
のデータフレームが出来ます。

df = pd.DataFrame(columns=['text', 'id', 'user', 'created_at'])
df_new = df.assign(text=list_text, id=list_id, user=list_user_screenname, created_at=list_created_at)

print(df_new)

[2/3]最後のfor文でリストにスクレピング結果を格納して、リストからデータフレームを作っています。
が、本来[2/3]の最後のfor文の部分で直接データフレームを作ることも可能です。
とりあえず動くものの、そもそももっとスマートに書ける部分はいくらでもありますね。時間あるときに校正します。
スクレイピングの実行部分とデータフレーム作る部分を分けたかったため、このような書き方をしています。

参考

人工知能特化型プログラミング学習サービスAidemy Premium Plan【Aidemy】

自走できるAI人材になるための6ヶ月長期コース【キカガク】

現役シリコンバレーエンジニアが教えるPython 3 入門 + 応用 +アメリカのシリコンバレー流コードスタイル【Udemy】

Pythonによるビジネスに役立つWebスクレイピング(BeautifulSoup、Selenium、Requests)
【Udemy】

【世界で18万人が受講】実践 Python データサイエンス【Udemy】

【画像判定AIアプリ開発・パート1】TensorFlow・Python・Flaskで作る画像判定AIアプリ開発入門【Udemy】

【キカガク流】プログラミング力向上のためのPythonで学ぶアルゴリズム論(前編)
【Udemy】

Python 1年生 体験してわかる!会話でまなべる!プログラミングのしくみ
Python2年生 スクレイピングのしくみ 体験してわかる!会話でまなべる!

入門 Python 3
独学プログラマー Python言語の基本から仕事のやり方まで

人工知能プログラミングのための数学がわかる本
[第2版]Python 機械学習プログラミング 達人データサイエンティストによる理論と実践

Kaggleで勝つデータ分析の技術
スクレイピング・ハッキング・ラボ Pythonで自動化する未来型生活


Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
98