前置き
Pythonでサーバをマルチクライアント対応にさせたいとき、よく実装されるのはthreadingやmultiprocessingを使用して実装するケースが多いと思います。
そもそもクライアント毎にスレッドまたはプロセスを割り当てなければならないのは、***recv()やsend()***によって処理が完了するまでソケットオブジェクトがブロックされるので新たなクライアントとのコネクションが確立できないこと、サーバ自身のソケットとクライアント用に新規作成されるソケットを同時に扱えないこと(競合)が原因です。
しかし、selectやselectorsモジュールを使用すればスレッドやプロセスをフォークさせる必要はありません。select(selectors)は監視対象の複数のソケットオブジェクトに対して定期的に問い合わせを行い、送受信が可能かどうか(読み込み、書き込み、例外)をチェックします。そして、準備が整っているソケットオブジェクトをリストに格納して返します。返されたオブジェクトリストをfor文
でオブジェクト毎に処理してあげれば、複数のクライアントを捌くことが可能となります。
また、selectなどの多重化I/Oとスレッドやマルチプロセスを組み合わせれば、かなりのスケーラビリティが実現できるでしょう。
長ったらしい説明はこれぐらいにして、実装方法を見ていきましょう!!
環境
- Ubuntu 18.04
- Python 3.6
実装
実装方法として、selectとselectorsモジュールの2つを紹介します。
selectorsモジュールは3.4で追加されたので検証する際は3.4以上を使用して下さい。
ソースコードはこちらにあります。
client
サーバがマルチクライアント対応できているか確認するためのデモ用の簡易的なクライアントを準備しました。
1 import sys
2 import socket
3 from time import sleep
4 from threading import Thread
5
6 SERVER = '127.0.0.1'
7 PORT = 9999
8
9 def client(timeout=0):
10 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
11 try:
12 sock.connect((SERVER, PORT))
13 except:
14 sys.exit(sys.exc_info()[1])
15
16 try:
17 if sock.sendall(b'hello') is not None:
18 sock.shutdown(socket.SHUT_WR)
19 except:
20 sys.exit(sys.exc_info()[1])
21 else:
22 sleep(timeout)
23
24 if __name__ == '__main__':
25 c1 = Thread(target=client, args=(3,))
26 c2 = Thread(target=client, args=(1,))
27 c3 = Thread(target=client)
28 c1.start()
29 c2.start()
30 c3.start()
selectモジュール
selectモジュールでは、通常のソケットオブジェクト、ポーリングオブジェクト、カーネルキューオブジェクト、カーネルイベントオブジェクト、エッジポーリングオブジェクト、/dev/pollポーリングオブジェクトを扱うことができます。
ここではポーリングオブジェクトを使用します。
1 import sys
2 import socket
3 import select
4
5 HOST = '127.0.0.1'
6 PORT = 9999
7
8 fd_to_socket = {}
9
10 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
11 sock.bind((HOST, PORT))
12 sock.listen(5)
13 print("Server is listening at {}:{}".format(HOST, PORT))
14
15 sock.setblocking(False)
16
17 poll = select.poll()
18 poll.register(sock, select.POLLIN)
19
20 try:
21 while True:
22 ready = poll.poll()
23
24 if not ready:
25 break
26
27 for fd, event in ready:
28 if fd == sock.fileno():
29 connection, client_address = sock.accept()
30 print("Accepted from", client_address)
31
32 fd_to_socket[connection.fileno()] = connection
33 poll.register(connection, select.POLLIN)
34 else:
35 connection = fd_to_socket[fd]
36 raw_data = connection.recv(1024)
37
38 if not raw_data:
39 print("Closed connection to", connection.getpeername())
40 poll.unregister(connection)
41 connection.close()
42 break
43
44 print("Reveived {} from {}".format(raw_data.decode(), connection.getpeername()))
45 except KeyboardInterrupt:
46 print("EXIT")
47 except:
48 print(sys.exc_info()[1])
17行目
ポーリングオブジェクトの作成
18行目
監視対象にサーバ自身のソケットオブジェクトを追加
監視イベントは読み出し可能なデータの存在
22行目
ポーリングを行った結果をもとに(fd, event)のタプルを返す
28~33行目
サーバ自身のソケットオブジェクトに対し読み込みデータがあるということは、新規コネクションを確立しようとしているので、コネクションをアクセプトして新規ソケットを監視対象に追加
35~44行目
既存のコネクションソケットオブジェクトに対し読み込みデータがあるということは、クライアントからのパケットを受信したことを意味するのでそれを読み込む。データがなければクライアントとの接続は切れたということなので、監視対象から外してコネクションを閉じる。
検証
実際にマルチクライアント対応ができているか確認してみましょう。
demo_select.py
を実行したら、別タブでdemo_client.py
を実行します。
$ python3 demo_select.py
Server is listening at 127.0.0.1:9999
Accepted from ('127.0.0.1', 35714)
Accepted from ('127.0.0.1', 35716)
Reveived hello from ('127.0.0.1', 35714)
Reveived hello from ('127.0.0.1', 35716)
Accepted from ('127.0.0.1', 35718)
Reveived hello from ('127.0.0.1', 35718)
Closed connection to ('127.0.0.1', 35718)
Closed connection to ('127.0.0.1', 35716)
Closed connection to ('127.0.0.1', 35714)
ポート番号が違うものが3つある、つまり3つのクライアントからの接続が確立していることからマルチクライアント対応ができていることが分かりますね。
selectorsモジュール
1 import sys
2 import socket
3 import selectors
4
5 HOST = '127.0.0.1'
6 PORT = 9999
7
8 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
9 sock.bind((HOST, PORT))
10 sock.listen(5)
11 print("Server is listening at {}:{}".format(HOST, PORT))
12 sock.setblocking(False)
13
14 with selectors.DefaultSelector() as selector:
15 selector.register(sock, selectors.EVENT_READ)
16
17 try:
18 while True:
19 ready = selector.select()
20
21 if not ready:
22 break
23
24 for key, _ in ready:
25 if key.fileobj == sock:
26 try:
27 connection, client_address = sock.accept()
28 print("Accepted from", client_address)
29 except:
30 sys.exc_info()[1]
31 else:
32 selector.register(connection, selectors.EVENT_READ)
33 else:
34 connection = key.fileobj
35
36 raw_data = connection.recv(1024)
37
38 if not raw_data:
39 print("Closed connection to", connection.getpeername())
40 selector.unregister(connection)
41 connection.close()
42 break
43
44 print("Received {} from {}".format(raw_data.decode(), connection.getpeername()))
45 except KeyboardInterrupt:
46 print("EXIT")
47 except:
48 sys.exc_info()[1]
14行目
selectorの作成
selectorsモジュールの基底クラスはコンテキストマネージャーをサポートしているのでwith文
を使用可能
15行目
サーバ自身のソケットオブジェクトを監視対象に追加
監視するイベントは読み込みのみ
19行目~
使用しているモジュールは違えど、やっていることはdemo_select.py
とほとんど変わらない
検証
実際にマルチクライアント対応ができているか確認してみましょう。
demo_selectors.py
を実行したら、別タブでdemo_client.py
を実行します。
$ python3 demo_selectors.py
Server is listening at 127.0.0.1:9999
Accepted from ('127.0.0.1', 35126)
Accepted from ('127.0.0.1', 35128)
Received hello from ('127.0.0.1', 35126)
Accepted from ('127.0.0.1', 35130)
Received hello from ('127.0.0.1', 35128)
Received hello from ('127.0.0.1', 35130)
Closed connection to ('127.0.0.1', 35130)
Closed connection to ('127.0.0.1', 35128)
Closed connection to ('127.0.0.1', 35126)
ポート番号が違うものが3つある、つまり3つのクライアントからの接続が確立していることからマルチクライアント対応ができていることが分かりますね