LoginSignup
3
6

More than 3 years have passed since last update.

特定のキーワードを含むツイートを毎日収集してcsvに保存するPythonプログラム

Posted at

目的

ツイッターなどのSNSの情報が新型コロナウイルス感染クラスタの発生リスクモニタリングに使えないかというアイデアがあり、「今日 飲み会」などのキーワード検索によるツイートの収集を行いたい。無料の検索APIでは1週間前のツイートしか遡れないので、今後の研究に活かせる可能性を考えて毎日データを自動で取れる仕組みを構築する。

新型コロナウイルス感染クラスタの発生リスク評価に使える検索ワードの良いアイデアがあれば是非コメント下さい!

参考URL

本家
https://developer.twitter.com/en/docs/twitter-api
とてもわかりやすかった解説サイト
https://gaaaon.jp/blog/twitterapi
本記事が上記リンクでいうところの「やってみた系自己満コード」にすら達していないのが悲しいので、場合によっては記事を非公開にするかもしれない。

方法

下記のnomikai_tweets.pyを実行する。

# coding: utf-8
# nomikai_tweets.py

import pandas as pd
import json
import schedule
from time import sleep
from requests_oauthlib import OAuth1Session
import datetime
from datetime import date, timedelta
import pytz

def convert_to_datetime(datetime_str):
    """
    日時のフォーマット変換
    """
    tweet_datetime = datetime.datetime.strptime(datetime_str,'%a %b %d %H:%M:%S %z %Y')
    return(tweet_datetime)


def job():
    """
    main内で繰り返すプログラム
    """

    # 検索キーワード リツイートを除く
    keyword = "今日 飲み会 exclude:retweets" 
    #  保存ディレクトリ 先に作っておく
    DIR = 'nomikai/' 

    #  API通信、developper登録をして得る情報
    Consumer_key = 'bT*****************'
    Consumer_secret = 've*****************'
    Access_token = '25*****************'
    Access_secret = 'NT*****************'
    url = "https://api.twitter.com/1.1/search/tweets.json"
    twitter = OAuth1Session(Consumer_key, Consumer_secret, Access_token, Access_secret)

    #  収集に使うパラメタ
    max_id = -1
    count = 100
    params = {'q' : keyword, 'count' : count, 'max_id' : max_id, 'lang' : 'ja', 'tweet_mode' : 'extended'}

    #  日本時間における日付処理 utc とjstを比較する準備
    today =datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
    today_beggining_of_day = today.replace(hour=0, minute=0, second=0, microsecond=0)
    yesterday_beggining_of_day = today_beggining_of_day - timedelta(days=1)
    yesterday_str = datetime.datetime.strftime(yesterday_beggining_of_day, '%Y-%m-%d')

    #  tweet情報を格納するDF while文内のrecordと対応
    columns = ['time', 'user.id', 'user.location', 'full_text', 'user.followers_count', 'user.friends_count', 'user.description', 'id']
    df = pd.DataFrame(index=[], columns=columns)


    while(True):
        if max_id != -1: #  ツイートidを既に格納したツイートよりも遡る
            params['max_id'] = max_id - 1
        req = twitter.get(url, params = params)

        if req.status_code == 200: #  正常に取得できていれば
            search_timeline = json.loads(req.text)

            if search_timeline['statuses'] == []:  #  全ツイートを取り終えたら
                break

            for tweet in search_timeline['statuses']:
                #  ツイート時刻のutc化
                tweet_datetime = convert_to_datetime(tweet['created_at'])
                #  昨日のツイートでなければskip
                in_jst_yesterday = today_beggining_of_day > tweet_datetime >= yesterday_beggining_of_day

                if not in_jst_yesterday:  #  昨日のツイートでなければskip
                    continue

                #  DFへ格納
                record = pd.Series([tweet_datetime,
                                    tweet['user']['id'],
                                    tweet['user']['location'],
                                    tweet['full_text'],
                                    tweet['user']['followers_count'],
                                    tweet['user']['friends_count'],
                                    tweet['user']['description'],
                                    tweet['id'],
                                   ],index=df.columns)
                df = df.append(record, ignore_index=True)

            max_id = search_timeline['statuses'][-1]['id']

        else: #  アクセス頻度制限に引っかかった場合15分待つ
            print("Total", df.shape[0], "tweets were extracted", sep=" ")
            print('wainting for 15 min ...')
            sleep(15*60)

    #  保存
    df = df.set_index("time")
    df.index = df.index.tz_convert('Asia/Tokyo')
    df.to_pickle(DIR + yesterday_str + keyword +".pkl")
    df.to_csv(DIR + yesterday_str + keyword +".csv")
    print(today, "Total", df.shape[0], "tweets were extracted!\nnext start at 01:00 tommorow")

def main():
    print("start at 01:00 tommorow")
    # 毎日01時に実行する
    schedule.every().day.at("01:00").do(job)

    while True:
        schedule.run_pending()
        sleep(1)

if __name__ == '__main__':
    main()

結果

データが集まり次第解析したい。一週間の過去データだけでは曜日による周期変化すら評価できないため。

コメント

毎日データを取るために日本時間と標準時刻の取り扱いに気をつけなければいけないことを学んだ。
datetime.datetime.now()はプログラムを動かす環境に依存するので、このソースを他国にあるマシンで実行しても正常に動かないだろうことに留意。schedule.every().day.at("01:00").do(job)も同様。

抽出できた過去の「今日」「飲み会」が含まれるツイートのうち、「オンライン」が含まれるのは10%程度であった。
また、ツイッタラーは会社の飲み会が嫌いな人が多い。

3
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
6