LoginSignup
56
55

More than 5 years have passed since last update.

LocustでWebsocketの負荷試験をする

Posted at

LocustでWebsocketの負荷試験をする

locust良いですよね。 locust。最近の負荷試験は全部locustで済ませています。複雑なシナリオでもpythonでかけるのですごい楽です。

とはいえ最近は常時接続型のアプリも増えてきて単純なreq/resモデルではうまく負荷がかけれない状況も多い感じです。

というわけで、LocustのタスクでWebsocketを使った負荷をかけて、Webの画面から確認できるようにしてみました。

websocketlocust.png

デフォルトだと単位がmsなんですが、ソケット通信でmsだと普通に0が連発されてしまうので単位はμsになります。
コードみるとわかりますが単にTaskの中でsocketつくって通信し、結果をlocustに通知してるだけなのでlocust本体にはまったく手を入れていません。

locustは設計がシンプルで分散環境の構築部分とリクエストしたりその結果を集計する部分が分かれてるのでこういう事が簡単に出来るのも良いです。

locustfile.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function

import json
import uuid
import time
import gevent

from websocket import create_connection
import six

from locust import HttpLocust, TaskSet, task
from locust.events import request_success


class ChatTaskSet(TaskSet):
    def on_start(self):
        self.user_id = six.text_type(uuid.uuid4())
        ws = create_connection('ws://127.0.0.1:5000/chat')
        self.ws = ws

        def _receive():
            while True:
                res = ws.recv()
                data = json.loads(res)
                end_at = time.time()
                response_time = int((end_at - data['start_at']) * 1000000)
                request_success.fire(
                    request_type='WebSocket Recv',
                    name='test/ws/chat',
                    response_time=response_time,
                    response_length=len(res),
                )

        gevent.spawn(_receive)

    @task
    def sent(self):
        start_at = time.time()
        body = json.dumps({'message': 'hello, world', 'user_id': self.user_id, 'start_at': start_at})
        self.ws.send(body)
        request_success.fire(
            request_type='WebSocket Sent',
            name='test/ws/chat',
            response_time=int((time.time() - start_at) * 1000000),
            response_length=len(body),
        )



class ChatLocust(HttpLocust):
    task_set = ChatTaskSet
    min_wait = 0
    max_wait = 100

ポイントは events.request_success.fire で結果を通知する事と、受け取り用の gevent thread を立ち上げる事です。
普通に受け取りを@taskの中で定義するとその間taskが止まり、同じTaskSet内の他のtaskもストップしてしまいます。

ちなみに負荷をかけたサンプルのEcho&PubSubサーバーは以下のスクリプトになります。

server.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function

from collections import defaultdict
import json

from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from flask import Flask, request
from werkzeug.exceptions import abort


app = Flask(__name__)

ctr = defaultdict(int)


@app.route('/echo')
def echo():
    ws = request.environ['wsgi.websocket']
    if not ws:
        abort(400)

    while True:
        message = ws.receive()
        if message is not None:
            r = json.loads(message)
            ctr[r['user_id']] += 1

        ws.send(message)


@app.route('/report')
def report():
    return '\n'.join(['{}:\t{}'.format(user_id, count) for user_id, count in ctr.items()])


socket_handlers = set()


@app.route('/chat')
def chat():
    ws = request.environ['wsgi.websocket']
    socket_handlers.add(ws)

    while True:
        message = ws.receive()
        for socket_handler in socket_handlers:
            try:
                socket_handler.send(message)
            except:
                socket_handlers.remove(socket_handler)


if __name__ == '__main__':
    http_server = WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    http_server.serve_forever()

ファイルは以下のレポジトリにもupしてあります。
https://gist.github.com/yamionp/9112dd6e54694d594306

56
55
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
56
55