16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Pythonでtweepyを使ってつぶやきを収集し、MongoDBに保存する

Posted at

普段は仕事上C#やJavaを使うことが多いのですが、以前からPythonに興味があり、
データ分析や機械学習が流行っているので、これを機にPythonを勉強しようと思い立った次第です!!

Pythonについては、最近発売した入門Python3を読んで学習しています。

今回は、すでに色々な方が紹介してくださっていますが、Twitter検索した結果をMongoDBに保存するプログラムを書いてみたので紹介します。
もっとこうしたほうがいいとか色々突っ込んでいただけるととても嬉しいです!!

環境

  • Python 3.5
  • PyYAML 3.1.1
  • pymongo 3.2
  • tweepy 3.5.0

各種設定

各自の環境に合わせた設定をしてください。

config.py
# coding=utf-8
# write code...

# mongodb
HOST = 'localhost'
PORT = 27017
DB_NAME = 'twitter-archive'
COLLECTION_NAME = 'tweets'

# twitter
CONSUMER_KEY = ''
CONSUMER_SECRET = ''
ACCESS_TOKEN_KEY = ''
ACCESS_TOKEN_SECRET = ''

検索キーワード

Twitter検索する際に指定するキーワードは、YAMLファイルで管理することにしました。

keywords.yml
# Twitterの検索キーワードをリストとして定義する。
# 以下は、例です。
- 'ハンバーグ'
- '野球'
- 'クリスマス'

ログ出力用クラス

loggingの使い方を調べながらラッパークラスを作成しました。
まだ理解できていないことも多く、細かい設定は勉強中ですが、ログ出力できるところまでは確認しました。

logger.py
import logging
from logging.handlers import TimedRotatingFileHandler

# coding=utf-8
# write code...

class Logger:
    def __init__(self, log_type):
        logger = logging.getLogger(log_type)
        logger.setLevel(logging.DEBUG)
        # 日ごとにローテーションしたいけどまだできてない。。。
        handler = TimedRotatingFileHandler(filename='archive.log', when='D', backupCount=30)
        formatter = logging.Formatter('[%(asctime)s] %(name)s %(levelname)s %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        self.logger = logger

    def info(self, msg, *args, **kwargs):
        self.logger.info(msg, *args, **kwargs)

    def debug(self, msg, *args, **kwargs):
        self.logger.debug(msg, *args, **kwargs)

    def error(self, msg, *args, **kwargs):
        self.logger.error(msg, *args, **kwargs)

    def exception(self, msg, *args, exc_info=True, **kwargs):
        self.logger.exception(msg, *args, exc_info, **kwargs)

メインの検索&保存処理

週に一度バッチ起動して定期的につぶやきを蓄積していこうと考えています。
思っていたよりもTwitter APIの仕様の理解に苦労しました。
重複したつぶやきを取得しないようにsince_idとmax_idを使って制御する方法を考えてみました。
どうやってやるのがよかったのだろうか。。。

archive.py
import sys
import config
import yaml
from tweepy import *
from tweepy.parsers import JSONParser
from pymongo import *
from logger import Logger


# coding: UTF-8
# write code...

def archive():

    # YAMLファイルから検索キーワードのリストを読み取り、OR検索用の文字列を生成する。
    with open('keywords.yml', 'r') as file:
        keywords = yaml.load(file)
    query_string = ' OR '.join(keywords)

    # ログ出力用オブジェクトの初期化
    logger = Logger('archive')

    # Twitter検索用のクライアント生成
    auth = OAuthHandler(config.CONSUMER_KEY, config.CONSUMER_SECRET)
    auth.set_access_token(config.ACCESS_TOKEN_KEY, config.ACCESS_TOKEN_SECRET)
    # JSONで結果を受け取りたいので、JSONParserを設定する。
    # 検索の上限に達してもライブラリ側でよろしくやってくれる。はず。
    twitter_client = API(auth, parser=JSONParser(), wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
    if twitter_client is None:
        logger.error('認証に失敗しました。')
        sys.exit(-1)

    # つぶやきを保存するmongodbのコレクションを初期化
    client = MongoClient(config.HOST, config.PORT)
    tweet_collection = client[config.DB_NAME][config.COLLECTION_NAME]

    # 取得済のつぶやきの中から最新のつぶやきを取得し、そのつぶやきのid以降を取得するように設定しておく。
    last_tweet = tweet_collection.find_one(sort=[('id', DESCENDING)])
    since_id = None if last_tweet is None else last_tweet['id']

    # 初回の検索時は、max_idの設定をしないように-1を設定しておく。
    max_id = -1

    # tweet_countがmax_tweet_countまで達したら、検索を終了する。
    # max_tweet_countには大きな値を設定しておく。
    tweet_count = 0
    max_tweet_count = 100000

    logger.info('最大{0}個のつぶやきを収集します。'.format(max_tweet_count))
    while tweet_count < max_tweet_count:
        try:
            params = {
                'q': query_string,
                'count': 100,
                'lang': 'ja'
            }
            # max_idとsince_idは設定されている場合のみ、パラメータとして渡すようにする。
            if max_id > 0:
                params['max_id'] = str(max_id - 1)
            if since_id is not None:
                params['since_id'] = since_id

            search_result = twitter_client.search(**params)
            statuses = search_result['statuses']

            # 最後まで検索できたかチェック
            if statuses is None or len(statuses) == 0:
                logger.info('つぶやきが見つかりませんでした。')
                break

            tweet_count += len(statuses)
            logger.debug('{0}個のつぶやきを取得しました。'.format(tweet_count))

            result = tweet_collection.insert_many([status for status in statuses])
            logger.debug('MongoDBに保存しました。IDは、{0}です。'.format(result))

            # 最後に取得したTweetのIDで更新する。
            max_id = statuses[-1]['id']

        except (TypeError, TweepError) as e:
            print(str(e))
            logger.exception(str(e))
            break

if __name__ == '__main__':
    archive()

まとめ

Pythonはまだ全然使いこなせていませんが、やりたいことがスッキリ書ける言語だと思いました。
継続して学習しようと思います。
今後は、データ分析用のライブラリを使って、収集したつぶやきを解析してみようと思います!!

16
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?