0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Python】Twitter API V2 StreamingClient

Last updated at Posted at 2023-02-21

前準備(APIキー&トークン取得)

下記のTwitter Developerのサイトにログインして、Projects&App->Overview->+Add Appで新しいアプリ用をAPIキーを取得する。

必要なAPIキー&トークン

  • bearer_token
  • consumer_key
  • consumer_secret
  • access_token
  • access_token_secret
twiterAPI.PNG

その他のTwitter APIのサンプルコードは以下を参照。

各APIメゾッドの制限は以下を参照。

以降、説明するStreamでは以下の制限が課せられている。

  • 1秒当たり50 Tweetsまで
  • ルールは25個まで
  • ルールの長さは512文字まで
  • 1つのアプリで1接続のみ
  • 1 か月あたり最大 200 万 Tweetsまで
  • 15分あたり50 リクエストのみ

StreamingClientの使い方は以下を参照。

Python コード

import time
import tweepy
import configparser
import logging
from logging import getLogger,Formatter,StreamHandler
import json

BT= #bearer_token
CK= #consumer_key
CS= #consumer_secret
AT= #access_token
AS= #access_token_secret

target_user=['アカウントID1','アカウントID2']
target_keywords=['keyword1','keyword2']

target_keyword = '('
for idx, word in enumerate(target_keywords):
    target_keyword = target_keyword + '"' + word + '"' + ' OR '
target_keyword = target_keyword[:-4] + ')'

logger=getLogger('autoLikeAndRT')
streamFormat=Formatter('%(name)s:%(levelname)s:%(message)s')
stream=StreamHandler()
stream.setFormatter(streamFormat)
logger.addHandler(stream)
logger.setLevel(logging.DEBUG)

class Listner(tweepy.StreamingClient):
    def on_tweet(self, tweet):
        logger.info(f' id:{tweet.author_id} tweeted. text:{tweet.text}')
        for user in self.includes:
            if user.get('id')==tweet.author_id:
                self.username=user.get('username')
            else:
                self.username='?'
        self.client.like(tweet.id)
        logger.info(f'Liked tweet from id:{tweet.author_id} userid:{self.username} (tweet_id:{tweet.id} text:{tweet.text})')
        self.client.retweet(tweet.id)
        logger.info(f'Retweeted tweet from id:{tweet.author_id} userid:{self.username} (tweet_id:{tweet.id} text:{tweet.text})')

    def on_connect(self):
        logger.info('Connecting to streaming.')
        response=self.get_rules()
        if response.data!=None:
            for rule in response.data:
                self.delete_rules(rule.id)
                logger.info(f'Deleted previous rule({rule.value})')
        self.target=target_user
        self.keywords=target_keyword
        logger.info(f'Loaded target{self.target}')
        for target in self.target:
            keywords = self.keywords
            self.add_rules(tweepy.StreamRule(f'{keywords} from:{target} -is:reply -is:retweet -is:quote'))
            logger.info(f'Added rule.({keywords} from:{target})')
        self.client=tweepy.Client(bearer_token=BT,consumer_key=CK,consumer_secret=CS,access_token=AT,access_token_secret=AS,wait_on_rate_limit=True)
        logger.info(f'Created Twitter API client.')
        self.includes=[]
        self.ids=[]
        self.username='?'
        self.error_count={'time':time.time()}

    def on_includes(self, includes):
        for user in includes.get('users'):
            if user.get('id') not in self.ids:
                self.ids.append(user.get('id'))
                self.includes.append({'id':includes.get('users')[0].id,'name':includes.get('users')[0].name,'username':includes.get('users')[0].username})

    def on_errors(self, errors):
        print(errors)

    def on_exception(self, exception):
        logger.warning(f'Exception occured.\n{exception}\nRestarting Streaming.')
        time.sleep(3)
        if time.time()-self.error_count['time'] > 10:
            listner = Listner(BT)
            listner.filter(expansions='author_id',user_fields='username',tweet_fields='author_id')
        else:
            logger.error('Error occured within 10s after the instance started. ')

    def on_data(self, data):
        json_data = json.loads(data)
        print("--json-data--",json_data)

listner = Listner(BT)
listner.filter(expansions='author_id',user_fields='username',tweet_fields='author_id')

まとめ

Twitter API V2 StreamingClientを用いたtwitterのツイート・リツイートの監視(ストリーミング)方法を紹介した。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?