LoginSignup
17
15

More than 5 years have passed since last update.

geventやasync/awaitで切断検知付きソケットサーバを実装

Last updated at Posted at 2015-12-09

アプセトネデブ って覚えてますか?私の仕事はこの存在がなければ誕生すらしていなかったと思います。OSI参照モデルの覚え方です。

OSI参照モデルの中でもトランスポート層のソケット通信を行うソケットサーバを並行処理を駆使して実装するお話です。この記事は、今年の春にスゴイ級エンジニアにC#のTaskやasync/awaitについて教えてもらったときの内容をPythonで書き換えた記事となっています。部屋の片隅で埃かぶってた『マスタリングTCP/IP』片手に実装。

目的

  • 並列(Parallel)なノンブロッキング(non-blocking)処理を独力で書けるようになる。
  • geventとasync/awaitでそれぞれ軽量スレッドを使った処理を実装できるようになる。
  • C10k問題がうっすら理解できるようになる。

マルチスレッド/プロセスまとめ(Ruby編)の記事を読んで、そろそろ復習しようかと思ってコード書きました。よくまとめられているので出てくる単語の意味が分からなかったら読んでみると良いと思います。(自分自身も読みながら書きました。)

概要とSocketサーバ仕様

TCP/IPでトランスポート層の機能を果たす代表的なプロトコルが、「TCP」と「UDP」です。TCPやUDPを利用して通信するときには、ソケットとよばれるOSのAPIが広く使われています。今回はソケットサーバとクライアントを実装していきます。その過程でgevent版とasync/await版の軽量スレッドを利用します。

ソケットサーバ仕様と動作フロー

echoサーバを拡張した仕様です。前回分と今回分を応答(echo)する機能と、切断検知機能の仕様を盛り込んでいます。

  1. クライアントからソケット通信を受け付ける
  2. クライアントからメッセージを受信したら、前回受信分と今回受信分を応答(echo)する
  3. 通信が切断していたら、ソケット通信を閉じる

■ 完成品の動作gif
loop.gif

クライアント仕様と動作フロー

通信を開いたあと1秒毎に計10回ソケットサーバと送受信を行い切断します。
ソケット通信を行うクライアントは100個同時起動され、100ループ分処理を行います。

  1. ソケットサーバに接続する
  2. サーバにHelloWorld{n} と送信する。n=0からスタート。
  3. サーバから前回と今回の送信内容を受信する
  4. 1秒まつ ← 今回のキモ
  5. n++ して2. の処理に戻る
  6. n >= 10 になれば通信を切断する
クライアントの実行イメージ
サーバ接続
送信:Hello World!:0
受信:Hello World!:0
送信:Hello World!:1
受信:Hello World!:0 Hello World!:1
....
送信:Hello World!:8
受信:Hello World!:7 Hello World!:8
送信:Hello World!:9
受信:Hello World!:8 Hello World!:9
サーバから切断

失敗事例:普通に書くと並列動作しないサーバが出来上がる

仕様ではクライアントはソケット通信を同時100接続起動して、それぞれ1秒おきに計10回通信してから切断していきます。ソケットサーバをシリアル(直列)なコードで書くと、最初の接続が完了して切断されないと次の接続を開始できないためスループットが極めて悪くなります。

py35_tcp_non_parallel_server.py
# -*- coding: utf-8 -*-
import socket

# INET の STREAM ソケットを作る
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# そのソケットを公開ホストのウェルノウンポートにつなぐ
server.bind(('127.0.0.1', 1234))
# サーバソケットになる
server.listen(10000)

while True:
    client_socket, address = server.accept()
    while True:
        # 受信
        fp = client_socket.makefile()
        line = fp.readline()
        print(line, type(line))
        if not line:
            # 切断されてたらループ終了
            break

        # 応答
        client_socket.send(line.encode('utf-8', 'ignore'))
        print('send:{}'.format(line))

■ 実行結果
通信毎に順番に処理しているため、大変遅い。
wrong_loop.gif

Python3.5 gevent版のソケットサーバ

geventだとwait_readwait_write が存在するので接続のTimeout処理が実装できました。便利。gevent版とasync/await版は互換性があるように書いています。

geventのmonkey_patch版

ポイントはgevent.monkey.patch_socket()sock.accept() がnon-blockingな処理に変更されている点と、gevent.spawn でサーバの処理を軽量スレッドに放りこんでいる点

gevent_tcp_server_by_monkey_patch.py
# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()


def task_stream(client_socket):
    _prev_message = ""
    while True:
        # 受信
        fp = client_socket.makefile()
        line = fp.readline()
        if not line:
            # 切断されてたらループ終了
            break

        # 応答
        s = _prev_message + line
        client_socket.send(s.encode('utf-8', 'ignore'))
        print('send:{}'.format(s))
        _prev_message = line


def gen_server(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((host, port))
    sock.listen(10000)
    name = 1
    while True:
        conn, addr = sock.accept()
        gevent.spawn(worker, str(name), conn, addr)
        name += 1


def worker(name, sock, addr):
    print('new connection!')
    task = gevent.spawn(task_stream(sock))
    group.add(task)


def main():
    host, port = '127.0.0.1', 1234
    server = gevent.spawn(gen_server, host, port)
    group.add(server)
    group.join()
    server.kill()

group = Group()
main()

geventのStreamServer版

wait_read で通信が来るまでnon-blockingで待機している点とタイムアウト処理を追加している点がポイント

gevent_tcp_server_by_stream_server.py
# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write


class TooLong(Exception):
    pass


def handle(socket, address):
    print('new connection!')

    fp = socket.makefile()
    try:
        _prev_message = ""
        while True:
            wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
            line = fp.readline()
            if line:
                # 前回受信分を含めた応答文字列を生成
                s = _prev_message + line
                # 送信
                socket.send(s.encode('utf-8', 'ignore'))
                fp.flush()
                print('send:{}'.format(_prev_message + line))
                _prev_message = line
    except TooLong:
        print('timeout')

    # Timeoutが発生したら切断する
    socket.shutdown(py_socket.SHUT_RDWR)
    socket.close()


pool = Pool(10000)  # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()

gevent_tcp_client.py
# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time

HOST = '127.0.0.1'
PORT = 1234


def client():
    conn = socket.create_connection((HOST, PORT))
    for x in range(10):
        message = 'Hello World!:{}\n'.format(x)
        # 送信
        conn.send(message.encode('utf-8', 'ignore'))
        recv_message = conn.recv(3000)
        print('recv:' + recv_message.decode('utf-8'))
        # 1秒待つ
        gevent.sleep(1)
    conn.close()


def main():
    for x in range(100):
        jobs = [gevent.spawn(client) for x in range(100)]
        gevent.joinall(jobs)


main()


Python3.5 async/await版のソケットサーバ

async def task_echo(reader, writer) でreaderとwriterが引数になっているが気持ち悪いですが、Python3 のdocumentにソケット通信を扱うサンプルにはそう書いてあったのでそのまま実装してみました。StreamReader でラップしてるからなんでしょうけど、何か気持ち悪い.. non-blockingなソケット通信を行えるクラスが存在すればStreamReader から書き換えることが出来るので今後修正を加える可能性があります。

py35_tcp_server.py
# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def task_echo(reader, writer):
    print('new connection!')
    _prev_message = ''
    while True:
        line = await reader.readline()
        if line == b'':
            # 切断検知 切断されてるときは、なぜかreader.readline()がb''を返却するためループを抜ける
            break
        if line:
            writer.write(line)
            await writer.drain()
            print('send:{}'.format(_prev_message + line.decode()))
            _prev_message = line.decode()

    # 切断を検知したらソケット通信を閉じる
    print("Close the client socket")
    await writer.drain()
    writer.close()


def main():
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
    server = loop.run_until_complete(coro)

    # Serve requests until CTRL+c is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()


main()

py35_tcp_client.py
# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def tcp_echo_client():
    reader, writer = await asyncio.open_connection(HOST, PORT)
    for x in range(10):
        message = 'Hello World!:{}\n'.format(str(x))
        # 送信
        writer.write(message.encode())
        # 受信
        data = (await reader.read(100)).decode()
        print('recv:{}'.format(data))
        # 1秒待つ
        await asyncio.sleep(1)
    writer.close()
    return "1"


def main():
    loop = asyncio.get_event_loop()
    for x in range(100):
        tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
        loop.run_until_complete(tasks)
    loop.close()

main()


まとまってないまとめ

実力不足でまとめきれませんでした

上記コードの致命的な問題

ソケットサーバとして上記コード群は致命的な問題を抱えています。1CPUしか扱えないのです。GIL(Global Interpreter Lock)の制約で1つのCPUしか扱えないPythonは少し残念です。(5年以上前から言われてて未だに改善しないからこれが限界なのでしょう)

主観で話せば本当に欲しかったものって軽量スレッドつかって並行処理を書けば、利用者が意識しなくてもマルチプロセッサで並列処理され超早いC#や、軽量スレッド作り放題でスレッド間はメッセージパッシングで通信自由自在、利用者はCPUの数なんて一切認識しないErlangみたいな軽量スレッドだったわけです。

Pythonのマルチプロセッサー対応はGIL問題を回避するためにフォークするか、デーモンで複数起動するしかありません。

ただサーバ規模が10台や100台になると他の言語でも同様の現象が起こります。異なるサーバ間を論理的な単一マシンとして扱って稼働する実用的な言語って私は知りません。Pythonはシングルプロセスで動作するものと割り切って設計してしまいCPUの数だけデーモンを稼働させてしてしまえばいいのです。必然的にバックエンドはメッセージキューサービスかストレージを利用する設計になるので、簡単に複数台のサーバにスケールするはずです。

もちろん副作用として設計は複雑になります。chatサービスを実装するとして、複数サーバに分散して接続している同一chatルームのアクティブなユーザに投稿時のpush通知を送るのってどう設計するのが正しいのでしょうか?

軽量スレッドを扱えるようになるメリット

軽量スレッドを扱えるようになると、外部と通信する効率良いサーバを書くことが出来るようになります。通信待ちしている処理を非同期にすることで、待ち時間を実際は待たず並行して他の作業にCPUを割り振ることが出来るようになるからです。

geventとasync/awaitの違い

外部ライブラリのgevent(Greenlet)じゃないと扱えなかった軽量スレッドがPython3.4 のasyncioが実装されてpure Pythonで扱えるようになりました。さらにPython3.5のasync/awaitが実装され、より簡潔な記述で軽量スレッドを扱えるようになりました。

async/await版を書いているときblockingなIOをどうやったらnon-blockingIOになるのかずっと考えて続けていました。async/await版を書いたあとに、geventを触ると出来るだけ暗黙的にIOをnon-blocking処理に変換してくれる設計思想って素晴らしいなーと思います。

geventのチュートリアルより抜粋
gevent が本当の力を発揮するのは、ネットワークやIOバウンドの、協調的に スケジュールできる関数を
実行するために利用する場合です。 gevent はすべての詳細部分の面倒を見て、可能な限りネットワーク
ライブラリが 暗黙的に greenlet コンテキストを譲渡するようにします。

geventには痒いところに手が届く便利な関数やクラスが多く、当分はgevent を利用することが多いだろうと思います。

実務ではTCPか UDPを利用しよう

生ソケットは、巨大なデータを送信すると到達順序が保証されていないためバラバラで届いたり、切断検知が上手にできないので、実務ではTCPか UDPプロトコルを必ず利用しましょう。

宿題1 - node.jsで書こう

社内でトップクラスのエンジニアに軽量スレッドについて相談したら、node.js で前回と今回受信分を応答するechoサーバを書くと、普通に書くとcallback地獄で大変 だよ。でも一度書いておくと良い経験になるよとアドバイスをもらったので、いつか挑戦したいです。

宿題2 - geventのTimeoutクラスの再実装

geventのコードブロックの実行時間に対して制約を掛けれるTimeoutクラスが便利そうなので、pure Pythonで再実装しておきたいです。

geventチュートリアル-Timeouts
import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

参考

マスタリングTCP/IP
gevent チュートリアル
gevent: Asynchronous I/O made easy
18.5. asyncio – 非同期 I/O、イベントループ、コルーチンおよびタスク
Tutorial: Writing a TCP server in Python

17
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
15