自分用:言語指定込み(en, ja, 無指定)でハッシュタグ付きの投稿を取得。
ランダムに近くするように、1日をN(24の約数)分割して、その中で取得時間幅(1分)を生成。
動くことは確認したがコードの適切性は入念に検証していない。あと改行が適当すぎる。
事前に開発者プラットフォーム
https://developer.x.com/
でログインして、APP作成&プロジェクト作成を済ませる
import random
import requests
# 日付系。calenderなどはルーティーン処理したかったので追加した
import datetime
from time import sleep
from dateutil.relativedelta import relativedelta
import calendar
# 保存用
# import csv
import json
import tweepy
# Twitter Developer Platformの使いたいAPPのページから取得する
API_Key = ''
API_Sec = ''
Token = ''
Token_Sec = ''
BEARER = ''
# 指定した時刻から1分前までの最大10投稿を取得(※APIのmax_resultsは10以上制約があるっぽい)
def main_search(time, filter_lang=None):
client = tweepy.Client(bearer_token=BEARER, consumer_key=API_Key, consumer_secret=API_Sec, access_token=Token, access_token_secret=Token_Sec)
count=10
results=[]
search_query = '"#" -is:retweet has:hashtags'
if filter_lang is not None:
search_query = search_query + ' lang:' + filter_lang
response = client.search_recent_tweets(
query=search_query,
tweet_fields=["created_at", "lang", "entities","attachments","conversation_id","geo","edit_history_tweet_ids","public_metrics"],
expansions=["attachments.media_keys", "author_id", "in_reply_to_user_id"],
max_results = count,
end_time = time.isoformat(timespec='seconds') + 'Z',
start_time = (time - datetime.timedelta(minutes=1)).isoformat(timespec='seconds') + 'Z',
)
tweets = response.data
includes = response.includes
users = includes["users"]
sleep(1)
if tweets != None:
for tweet in tweets:
obj = {}
obj["tweet_id"] = tweet.id
created_at = tweet.created_at
created_at_str = created_at.isoformat()
obj['created_at'] = created_at_str
obj["text"] = tweet.text
obj['author_id'] = tweet.author_id
obj['conversation_id'] = tweet.conversation_id
if tweet.in_reply_to_user_id is not None:
obj['in_reply_to_user_id'] = tweet.in_reply_to_user_id
if tweet.attachments is not None:
obj['attachments'] = tweet.attachments
if tweet.entities is not None:
obj['entities'] =tweet.entities
if tweet.geo is not None:
obj['geo'] = tweet.geo
results.append(obj)
else:
results.append({})
print(results[0])
return results
# ランダム時刻生成
def get_random_time_within_hour(day, hour_range=6, hour_add=0):
# hour_range: 24を分割。
""" 指定された日のランダムな時間、分、秒を返す """
hour = random.randint(0, hour_range-1) + hour_add
minute = random.randint(0, 59)
second = random.randint(0, 59)
return datetime.datetime(day.year, day.month, day.day, hour, minute, second)
def make_loop_in_a_day(target_day, separate_num = 6, filter_lang="ja"):
separate_num = int(separate_num)
hour_range = 24 // separate_num
results_total = []
for loop_count in range(separate_num):
hour_add = (loop_count) * hour_range
results_now = []
time = get_random_time_within_hour(target_day, hour_range=hour_range, hour_add=hour_add)
try:
results_now = main_search(time=time, filter_lang=filter_lang)
except Exception as e:
print(e)
results_total = results_total + results_now
sleep(5)
return results_total
時刻リストと言語リストを与えて検索してもらう。
保存してもらう。
そのあたりのまとめ関数。
# 時刻リストと言語リスト→検索する。
def make_search_in_time_lang_list(random_time_list, filter_lang_list=["ja", "en", None]):
labels = []
for lang in filter_lang_list:
if lang is None:
labels.append("unspecified")
else:
labels.append(lang)
results_dict = {label: [] for label in labels}
for label, filter_lang in zip(labels, filter_lang_list):
results_total = []
for time in random_time_list:
results_now = []
try:
results_now = main_search(time=time, filter_lang=filter_lang)
except Exception as e:
print(e)
results_total = results_total + results_now
sleep(5)
results_dict[label] = results_total
return results_dict
# 保存用
def makejson_from_twlists(filename: str, twlist):
if '.json' not in filename:
filename = filename + ".json"
with open(filename, 'w') as f:
json.dump(twlist, f, indent=2, ensure_ascii=False)
# 日付与えて検索と保存をしてもらう。
def search_and_save_jaens_on(year=2024, month=9, day=3):
# ランダムに時間決めるよ
# 言語は中にコーディングして決めてるよ
target_day = datetime.date(year, month, day)
str_target_day = target_day.strftime('%Y%m%d')
print(str_target_day)
# 2言語ならseparate_numが12でも10000投稿のリミットは大丈夫なはず
separate_num=6
hour_range = 24 // separate_num
random_time_list = []
for loop_count in range(separate_num):
hour_add = (loop_count) * hour_range
time = get_random_time_within_hour(target_day, hour_range=hour_range, hour_add=hour_add)
print(time)
random_time_list.append(time)
filter_lang_list = ["ja", "en", None]
results_dict = make_search_in_time_lang_list(random_time_list=random_time_list, filter_lang_list=filter_lang_list)
for label in results_dict.keys():
makejson_from_twlists(filename="tw_ht_rand_6_" + str_target_day +"_" + label, twlist=results_dict[label])
return True
ここからは日付横断用
def search_and_save_jaens_matome(year=2024, month=9, day1=4, day2=6):
# day2はとりたい日付+1!
# 1ヶ月の中で指定する用の関数
for day in range(day1, day2):
search_and_save_jaens_on(year=year, month=month, day=day)
sleep(10)
return True
# 今日の前の日までのデータを取る。
def easy_routine(days_width=5):
today = datetime.date.today()
print(today)
year = today.year
month = today.month
day = today.day
# days_width = 5
if days_width < day:
# 5日間隔で今日が5th日ならやばいので分ける
day1 = day - days_width
day2 = day
search_and_save_jaens_matome(year=year, month=month, day1=day1, day2=day2)
else:
#2つに分割
day2 = day
search_and_save_jaens_matome(year=year, month=month, day1=1, day2=day2)
# 差分 開始が9/2で2日とるなら 8/31のを取る lm_delda=0
lm_delta = day - days_width
lm_end_day = None
if month == 1:
# まあ12月は閏じゃないからyear-1とかしなくてもいいんだが
lm_end_day = (calendar.monthrange(year-1, 12)[1])
day1 = lm_end_day-lm_delta
day2 = lm_end_day+1
search_and_save_jaens_matome(year=year-1, month=12, day1=day1, day2=day2)
else:
lm_end_day = (calendar.monthrange(year, month-1)[1])
day1 = lm_end_day-lm_delta
day2 = lm_end_day+1
search_and_save_jaens_matome(year=year, month=month-1, day1=day1, day2=day2)
return True