LoginSignup
1
2

More than 3 years have passed since last update.

twitter API で大量ツイート収集(77 tweet/s)(streamとsearchの並列処理で~)

Last updated at Posted at 2020-02-08
collect.py
import tweepy
import datetime
import re
import collections
from pytz import timezone
import time
import MeCab
#import threading
#from multiprocessing import Pool
import os
#import multiprocessing
import concurrent.futures
#↑インポート


consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
authapp = tweepy.AppAuthHandler(consumer_key,consumer_secret)
apiapp = tweepy.API(authapp)
#↑認証

m = MeCab.Tagger("-Ochasen")

N=0
#↑取得ツイート数

lang_dict="{'en': '英語', 'und': '不明', 'is': 'アイスランド語', 'ay': 'アイマラ語', 'ga': 'アイルランド語', 'az': 'アゼルバイジェン語', 'as': 'アッサム語', 'aa': 'アファル語', 'ab': 'アプハジア語', 'af': 'アフリカーンス語', 'am': 'アムハラ語', 'ar': 'アラビア語', 'sq': 'アルバニア語', 'hy': 'アルメニア語', 'it': 'イタリア語', 'yi': 'イディッシュ語', 'iu': 'イヌクティトット語', 'ik': 'イヌピア語', 'ia': 'インターリングア', 'ie': 'インターリング語', 'in': 'インドネシア語', 'ug': 'ウイグル語', 'cy': 'ウェールズ語', 'vo': 'ヴォラピュック語', 'wo': 'ウォロフ語', 'uk': 'ウクライナ語', 'uz': 'ウズベク語', 'ur': 'ウルドゥー語', 'et': 'エストニア語', 'eo': 'エスペラント語', 'or': 'オーリア語', 'oc': 'オキタン語', 'nl': 'オランダ語', 'om': 'オロモ語', 'kk': 'カザフ語', 'ks': 'カシミール語', 'ca': 'カタラン語', 'gl': 'ガリシア語', 'ko': '韓国語', 'kn': 'カンナダ語', 'km': 'カンボジア語', 'rw': 'キヤーワンダ語', 'el': 'ギリシャ語', 'ky': 'キルギス語', 'rn': 'キルンディ語', 'gn': 'グアラニー語', 'qu': 'クエチュア語', 'gu': 'グジャラト語', 'kl': 'グリーンランド語', 'ku': 'クルド語', 'ckb': '中央クルド語', 'hr': 'クロアチア語', 'gd': 'ゲーリック語', 'gv': 'ゲーリック語', 'xh': 'コーサ語', 'co': 'コルシカ語', 'sm': 'サモア語', 'sg': 'サングホ語', 'sa': 'サンスクリット語', 'ss': 'シスワティ語', 'jv': 'ジャワ語', 'ka': 'ジョージア語', 'sn': 'ショナ語', 'sd': 'シンド語', 'si': 'シンハラ語', 'sv': 'スウェーデン語', 'su': 'スーダン語', 'zu': 'ズールー語', 'es': 'スペイン語', 'sk': 'スロヴァキア語', 'sl': 'スロヴェニア語', 'sw': 'スワヒリ語', 'tn': 'セツワナ語', 'st': 'セト語', 'sr': 'セルビア語', 'sh': 'セルボクロアチア語', 'so': 'ソマリ語', 'th': 'タイ語', 'tl': 'タガログ語', 'tg': 'タジク語', 'tt': 'タタール語', 'ta': 'タミル語', 'cs': 'チェコ語', 'ti': 'チグリニャ語', 'bo': 'チベット語', 'zh': '中国語', 'ts': 'ヅォンガ語', 'te': 'テルグ語', 'da': 'デンマーク 語', 'de': 'ドイツ語', 'tw': 'トウィ語', 'tk': 'トルクメン語', 'tr': 'トルコ語', 'to': 'トンガ語', 'na': 'ナウル語', 'ja': '日本語', 'ne': 'ネパール語', 'no': 'ノルウェー語', 'ht': 'ハイチ語', 'ha': 'ハウサ語', 'be': '白ロシア語', 'ba': 'バシキール語', 'ps': 'パシト語', 'eu': 'バスク語', 'hu': 'ハンガリー語', 'pa': 'パンジャビ語', 'bi': 'ビスラマ語', 'bh': 'ビハール語', 'my': 'ビルマ語', 'hi': 'ヒンディー語', 'fj': 'フィジー語', 'fi': 'フィンランド語', 'dz': 'ブータン語', 'fo': 'フェロー語', 'fr': 'フランス語', 'fy': 'フリジア語', 'bg': 'ブルガリア語', 'br': 'ブルターニュ語', 'vi': 'ベトナム語', 'iw': 'ヘブライ語', 'fa': 'ペルシャ語', 'bn': 'ベンガル語', 'pl': 'ポーランド語', 'pt': 'ポルトガル語', 'mi': 'マオリ語', 'mk': 'マカドニア語', 'mg': 'マダガスカル語', 'mr': 'マラッタ語', 'ml': 'マラヤーラム語', 'mt': 'マルタ語', 'ms': 'マレー語', 'mo': 'モルダビア語', 'mn': 'モンゴル語', 'yo': 'ヨルバ語', 'lo': 'ラオタ語', 'la': 'ラテン語', 'lv': 'ラトビア語', 'lt': 'リトアニア語', 'ln': 'リンガラ語', 'li': 'リンブルク語', 'ro': 'ルーマニア語', 'rm': 'レートロマンス語', 'ru': 'ロシア語'}"
lang_dict=eval(lang_dict)
#↑辞書

def process(ty,tw):
 pass
#↑行いたい言語処理

def judge_tweet_type(tweet):
 if tweet.in_reply_to_status_id_str:
  return "reply"
 else:
  head= str(tweet.text).split(":")
 if len(head) >= 2 and "RT" in head[0]:
  return "retwe"
 else:
  return "tweet"
#↑リプ、リツイート、ツイートか判断

def get_time(id):
 two_raw=format(int(id),'016b').zfill(64)
 unixtime = int(two_raw[:-22],2) + 1288834974657
 unixtime_th = datetime.datetime.fromtimestamp(unixtime/1000)
 tim = str(unixtime_th).replace(" ","_")[:-3]
 return tim,unixtime
#↑idからツイート時間

def gather(tweet,type):
 global N
 global all_time
 tim,unix=get_time(tweet.id)
 original_text=tweet.text.replace("\n","")
 tweet_type=judge_tweet_type(tweet)
 nowtime=time.time()
 tweet_pertime=str(round(N/(nowtime-all_time),1))
 lag=str(round(nowtime-unix/1000,1))
 lang=lang_dict[tweet.lang]
 process(tweet_type,original_text)
 print(N,tweet_pertime,"/s","+"+lag,tim,type,tweet_type,lang)
 #print(N,tweet_pertime,"/s","+"+lag,tim,type,tweet_type,lang,original_text)
 N=N+1
#↑streamとsearchの全ツイートが集まる


def search(last_id):
 time_search =time.time()
 for status in apiapp.search(q="filter:safe OR -filter:safe ",count="100",result_type="recent",since_id=last_id):
   gather(status,"search")
#↑search本体


interval = 2.1
#↑search呼び出し間隔

trysearch=0
#↑search呼び出し回数

class StreamingListener(tweepy.StreamListener):
    def on_status(self, status):
        global time_search
        global trysearch
        gather(status,"stream")
        time_stream=time.time()
        time_stream-time_search % interval
        #print(time_stream-time_search)
        #print(interval*trysearch)
        if time_stream-time_search-interval>interval*trysearch:
           last_id=status.id
           executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
           #↑並列定義
           executor.submit(search(last_id))
           trysearch=trysearch+1
#↑streaming本体

def carry():
 listener = StreamingListener()
 streaming = tweepy.Stream(auth, listener)
 streaming.sample()
#↑stream呼び出し関数

time_search =time.time()
#↑searchだがstream前に定義

executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
#↑並列定義
all_time=time.time()
executor.submit(carry)
#↑stream呼び出し関数呼び出し

これを実行すると、
bandicam 2020-02-08 12-37-51-538.jpg
こんな感じで、6900目のツイートを1秒間に80ツイートほど、2秒の遅延で、streamとsearchの区別も付けて、リプやRTなどの種別も着けて、言語まで表示してくれます。
途中の関数のprocessでテキストをどう、自然言語処理行うか、ここにプラスしていくと、大きいと思います。

ありがとうございました。また付け加えます。

定型文
ツイッター(@kenkensz9)にいつもいるので何かあればどうぞ
よろしければいいねお願いします!

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2