0
0

【Python】Traffic Generatorを作った

Last updated at Posted at 2023-10-15

背景

新しいプラットフォーム上にアプリを移設したのに伴いMetricsとLogを確認したい。そのためにある程度の規模のリクエストを一定時間以上送信し続ける必要がある。このような場合、通常Apache Jmeterを用いているが、今回はリクエストごとにUserAgentとリクエストにのせるパラメータをランダムに切り替えたいので自前で用意した。

要件

  • リクエストごとにUserAgentとクエリパラメータをランダムに切り替えたい
  • 単位時間あたりのトラフィックをある程度大きくしたい

Code

traffic_generator.py
# coding:utf-8
import requests
import random
import threading
from time import sleep

user_agents = [
    # Safari User-Agents
    "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPad; CPU OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
    # Chrome User-Agents
    "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Mobile Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
    # Microsoft Edge User-Agents
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Edg/117.0.0.0 Safari/537.36 EdgA/117.0.0.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Edg/117.0.0.0 Safari/537.36",
]

# please change accordingly
queries = ["query1", "query2"]
num_requests = random.randint(1, 10) 

def send_requests():
    while True:
        try:
            with open("url.txt", "r") as file:
                urls = file.readlines()
                if not urls:
                    print("No more shops in the list. Exiting...")
                    break  # Exit the loop if there are no more shops to process
                url = random.choice(urls).rstrip()
            
            user_agent = random.choice(user_agents)
            query = random.choice(queries)
            header = {'User-Agent': user_agent}

            # please change "?query=" accordingly
            request_url = url + "?query=" + query

            print(request_url)
            response = requests.get(request_url, headers=header)
            print("Status Code:", response.status_code)
            sleep(0.5)
        except Exception as e:
            print("An error occurred:", str(e))
            continue  # Continue to the next iteration if an error occurs

threads = []
for _ in range(num_requests):
    thread = threading.Thread(target=send_requests)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()
url.txt
url_1
url_2
url_3
...
url_N
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0