背景
新しいプラットフォーム上にアプリを移設したのに伴いMetricsとLogを確認したい。そのためにある程度の規模のリクエストを一定時間以上送信し続ける必要がある。このような場合、通常Apache Jmeter
を用いているが、今回はリクエストごとにUserAgentとリクエストにのせるパラメータをランダムに切り替えたいので自前で用意した。
要件
- リクエストごとにUserAgentとクエリパラメータをランダムに切り替えたい
- 単位時間あたりのトラフィックをある程度大きくしたい
Code
traffic_generator.py
# coding:utf-8
import requests
import random
import threading
from time import sleep
user_agents = [
# Safari User-Agents
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
# Chrome User-Agents
"Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
# Microsoft Edge User-Agents
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Edg/117.0.0.0 Safari/537.36 EdgA/117.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Edg/117.0.0.0 Safari/537.36",
]
# please change accordingly
queries = ["query1", "query2"]
num_requests = random.randint(1, 10)
def send_requests():
while True:
try:
with open("url.txt", "r") as file:
urls = file.readlines()
if not urls:
print("No more shops in the list. Exiting...")
break # Exit the loop if there are no more shops to process
url = random.choice(urls).rstrip()
user_agent = random.choice(user_agents)
query = random.choice(queries)
header = {'User-Agent': user_agent}
# please change "?query=" accordingly
request_url = url + "?query=" + query
print(request_url)
response = requests.get(request_url, headers=header)
print("Status Code:", response.status_code)
sleep(0.5)
except Exception as e:
print("An error occurred:", str(e))
continue # Continue to the next iteration if an error occurs
threads = []
for _ in range(num_requests):
thread = threading.Thread(target=send_requests)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
url.txt
url_1
url_2
url_3
...
url_N