LocustでWebsocketの負荷試験をする
locust良いですよね。 locust。最近の負荷試験は全部locustで済ませています。複雑なシナリオでもpythonでかけるのですごい楽です。
とはいえ最近は常時接続型のアプリも増えてきて単純なreq/resモデルではうまく負荷がかけれない状況も多い感じです。
というわけで、LocustのタスクでWebsocketを使った負荷をかけて、Webの画面から確認できるようにしてみました。
デフォルトだと単位がmsなんですが、ソケット通信でmsだと普通に0が連発されてしまうので単位はμsになります。
コードみるとわかりますが単にTaskの中でsocketつくって通信し、結果をlocustに通知してるだけなのでlocust本体にはまったく手を入れていません。
locustは設計がシンプルで分散環境の構築部分とリクエストしたりその結果を集計する部分が分かれてるのでこういう事が簡単に出来るのも良いです。
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
import json
import uuid
import time
import gevent
from websocket import create_connection
import six
from locust import HttpLocust, TaskSet, task
from locust.events import request_success
class ChatTaskSet(TaskSet):
def on_start(self):
self.user_id = six.text_type(uuid.uuid4())
ws = create_connection('ws://127.0.0.1:5000/chat')
self.ws = ws
def _receive():
while True:
res = ws.recv()
data = json.loads(res)
end_at = time.time()
response_time = int((end_at - data['start_at']) * 1000000)
request_success.fire(
request_type='WebSocket Recv',
name='test/ws/chat',
response_time=response_time,
response_length=len(res),
)
gevent.spawn(_receive)
@task
def sent(self):
start_at = time.time()
body = json.dumps({'message': 'hello, world', 'user_id': self.user_id, 'start_at': start_at})
self.ws.send(body)
request_success.fire(
request_type='WebSocket Sent',
name='test/ws/chat',
response_time=int((time.time() - start_at) * 1000000),
response_length=len(body),
)
class ChatLocust(HttpLocust):
task_set = ChatTaskSet
min_wait = 0
max_wait = 100
ポイントは events.request_success.fire で結果を通知する事と、受け取り用の gevent thread を立ち上げる事です。
普通に受け取りを@taskの中で定義するとその間taskが止まり、同じTaskSet内の他のtaskもストップしてしまいます。
ちなみに負荷をかけたサンプルのEcho&PubSubサーバーは以下のスクリプトになります。
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
from collections import defaultdict
import json
from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from flask import Flask, request
from werkzeug.exceptions import abort
app = Flask(__name__)
ctr = defaultdict(int)
@app.route('/echo')
def echo():
ws = request.environ['wsgi.websocket']
if not ws:
abort(400)
while True:
message = ws.receive()
if message is not None:
r = json.loads(message)
ctr[r['user_id']] += 1
ws.send(message)
@app.route('/report')
def report():
return '\n'.join(['{}:\t{}'.format(user_id, count) for user_id, count in ctr.items()])
socket_handlers = set()
@app.route('/chat')
def chat():
ws = request.environ['wsgi.websocket']
socket_handlers.add(ws)
while True:
message = ws.receive()
for socket_handler in socket_handlers:
try:
socket_handler.send(message)
except:
socket_handlers.remove(socket_handler)
if __name__ == '__main__':
http_server = WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
http_server.serve_forever()
ファイルは以下のレポジトリにもupしてあります。
https://gist.github.com/yamionp/9112dd6e54694d594306