0
0

[自分用] X API(Basic)でハッシュタグ付きの投稿リストを取得(投稿言語指定)[Python3]

Last updated at Posted at 2024-09-07

自分用:言語指定込み(en, ja, 無指定)でハッシュタグ付きの投稿を取得。
ランダムに近くするように、1日をN(24の約数)分割して、その中で取得時間幅(1分)を生成。

動くことは確認したがコードの適切性は入念に検証していない。あと改行が適当すぎる。

事前に開発者プラットフォーム
https://developer.x.com/
でログインして、APP作成&プロジェクト作成を済ませる

import random
import requests

# 日付系。calenderなどはルーティーン処理したかったので追加した
import datetime
from time import sleep
from dateutil.relativedelta import relativedelta
import calendar

# 保存用
# import csv
import json

import tweepy
# Twitter Developer Platformの使いたいAPPのページから取得する
API_Key     = ''
API_Sec     = ''
Token       = ''
Token_Sec   = ''
BEARER = ''
# 指定した時刻から1分前までの最大10投稿を取得(※APIのmax_resultsは10以上制約があるっぽい)
def main_search(time, filter_lang=None):
    client = tweepy.Client(bearer_token=BEARER, consumer_key=API_Key, consumer_secret=API_Sec, access_token=Token, access_token_secret=Token_Sec)

    count=10
    results=[]

    search_query = '"#" -is:retweet has:hashtags'
    if filter_lang is not None:
        search_query = search_query + ' lang:' + filter_lang

    response = client.search_recent_tweets(
        query=search_query,
        tweet_fields=["created_at", "lang", "entities","attachments","conversation_id","geo","edit_history_tweet_ids","public_metrics"], 
        expansions=["attachments.media_keys", "author_id", "in_reply_to_user_id"],
        max_results = count,
        end_time = time.isoformat(timespec='seconds') + 'Z',
        start_time = (time - datetime.timedelta(minutes=1)).isoformat(timespec='seconds') + 'Z',
    )

    tweets = response.data

    includes = response.includes
    users = includes["users"]


    sleep(1)


    if tweets != None:
        for tweet in tweets:
            obj = {}
            obj["tweet_id"] = tweet.id
            created_at = tweet.created_at
            created_at_str = created_at.isoformat()
            obj['created_at'] = created_at_str
            obj["text"] = tweet.text
            obj['author_id'] = tweet.author_id
            obj['conversation_id'] = tweet.conversation_id
            if tweet.in_reply_to_user_id is not None:
                obj['in_reply_to_user_id'] = tweet.in_reply_to_user_id
            if tweet.attachments is not None:
                obj['attachments'] = tweet.attachments
            if tweet.entities is not None:
                obj['entities'] =tweet.entities
            if tweet.geo is not None:
                obj['geo'] = tweet.geo
            
            results.append(obj)
    else:
        results.append({})

    print(results[0])

    return results
# ランダム時刻生成
def get_random_time_within_hour(day, hour_range=6, hour_add=0):
    # hour_range: 24を分割。
    """ 指定された日のランダムな時間、分、秒を返す """
    hour = random.randint(0, hour_range-1) + hour_add
    minute = random.randint(0, 59)
    second = random.randint(0, 59)
    return datetime.datetime(day.year, day.month, day.day, hour, minute, second)
def make_loop_in_a_day(target_day, separate_num = 6, filter_lang="ja"):
    separate_num = int(separate_num)
    hour_range = 24 // separate_num

    results_total = []

    for loop_count in range(separate_num):
        hour_add = (loop_count) * hour_range
        results_now = []
        time = get_random_time_within_hour(target_day, hour_range=hour_range, hour_add=hour_add)
        try:
            results_now = main_search(time=time, filter_lang=filter_lang)
        except Exception as e:
            print(e)
        results_total = results_total + results_now

        sleep(5)

    return results_total

時刻リストと言語リストを与えて検索してもらう。
保存してもらう。
そのあたりのまとめ関数。

# 時刻リストと言語リスト→検索する。

def make_search_in_time_lang_list(random_time_list, filter_lang_list=["ja", "en", None]):

    labels = []
    for lang in filter_lang_list:
        if lang is None:
            labels.append("unspecified")
        else:
            labels.append(lang)
    
    results_dict = {label: [] for label in labels}


    for label, filter_lang in zip(labels, filter_lang_list):
        results_total = []
        for time in random_time_list:
            results_now = []
            try:
                results_now = main_search(time=time, filter_lang=filter_lang)
            except Exception as e:
                print(e)
            results_total = results_total + results_now
            sleep(5)
        results_dict[label] = results_total

    return results_dict


# 保存用

def makejson_from_twlists(filename: str, twlist):
    if '.json' not in filename:
        filename = filename + ".json"
    
    with open(filename, 'w') as f:
        json.dump(twlist, f, indent=2, ensure_ascii=False)


# 日付与えて検索と保存をしてもらう。

def search_and_save_jaens_on(year=2024, month=9, day=3):
    # ランダムに時間決めるよ
    # 言語は中にコーディングして決めてるよ

    target_day = datetime.date(year, month, day)
    str_target_day = target_day.strftime('%Y%m%d')

    print(str_target_day)

    # 2言語ならseparate_numが12でも10000投稿のリミットは大丈夫なはず
    separate_num=6
    hour_range = 24 // separate_num

    random_time_list = []

    for loop_count in range(separate_num):
        hour_add = (loop_count) * hour_range
        time = get_random_time_within_hour(target_day, hour_range=hour_range, hour_add=hour_add)
        print(time)
        random_time_list.append(time)

    filter_lang_list = ["ja", "en", None]

    results_dict = make_search_in_time_lang_list(random_time_list=random_time_list, filter_lang_list=filter_lang_list)

    for label in results_dict.keys():
        makejson_from_twlists(filename="tw_ht_rand_6_" + str_target_day +"_" + label, twlist=results_dict[label])

    return True

ここからは日付横断用


def search_and_save_jaens_matome(year=2024, month=9, day1=4, day2=6):

    # day2はとりたい日付+1!
    # 1ヶ月の中で指定する用の関数

    for day in range(day1, day2):
        search_and_save_jaens_on(year=year, month=month, day=day)
        sleep(10)

    return True


# 今日の前の日までのデータを取る。

def easy_routine(days_width=5):

    today = datetime.date.today()
    print(today)
    year = today.year
    month = today.month
    day = today.day
    
    # days_width = 5

    if days_width < day:
        # 5日間隔で今日が5th日ならやばいので分ける
        day1 = day - days_width
        day2 = day
        search_and_save_jaens_matome(year=year, month=month, day1=day1, day2=day2)
    else: 
        #2つに分割
        day2 = day
        search_and_save_jaens_matome(year=year, month=month, day1=1, day2=day2)
        # 差分 開始が9/2で2日とるなら 8/31のを取る lm_delda=0
        lm_delta = day - days_width
        lm_end_day = None
        if month == 1:
            # まあ12月は閏じゃないからyear-1とかしなくてもいいんだが
            lm_end_day =  (calendar.monthrange(year-1, 12)[1])
            day1 = lm_end_day-lm_delta
            day2 = lm_end_day+1
            search_and_save_jaens_matome(year=year-1, month=12, day1=day1, day2=day2)
        else:
            lm_end_day = (calendar.monthrange(year, month-1)[1])
            day1 = lm_end_day-lm_delta
            day2 = lm_end_day+1
            search_and_save_jaens_matome(year=year, month=month-1, day1=day1, day2=day2)

    return True

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0