はじめに
-
WebSocketを使用するようなプロダクトの場合において、ロードバランサーといったNW機器の更改をすると毎回頭を悩ませるのが、TCPの疎通性はあるけど、Websocket通信が正常であるのか、だと思います
-
TCP80/443レベルで疎通確認はもちろんするものの、いきなり本番環境に組み込むのが非常に怖いため、社内で構築、試験段階でWebScoket通信を簡易的に確認するためのツールをPythonで作ってみました
-
ツールとして以下を用意しました
- WebSocketクライアント兼WebSocketサーバツール(for Windows)
- WindowsPCを複数台用意できる場合は、サーバとしても利用可能
- WebSocketサーバツール(for Linux)
- このとき手元にたまたまラズパイが転がっていたので、ラズパイにUbuntuをインストールして環境を用意しました
- WebSocketクライアント兼WebSocketサーバツール(for Windows)
-
いずれも簡易的にWebSocket通信を確認するだけなので、本番アプリのように
認証後にWebsocket通信を開始する
ようなことはできない点に注意してください
それができたらインフラなんてせずに、アプリ作る側になってます
使い方
Windowsの場合(WebSocketサーバ)
- WebSocketTool_xxx.exeを起動する
- 起動後、以下パラメータを入力する
- 待ち受けIPアドレス(例:localhost, 192.168.100.10など)
- 待ち受けポート(1-65535の範囲内)
- WebSocketサーバ開始ボタンを押下すると、WebSocketサーバが起動する
- WebSocketサーバ側からソケットを接続すること(実装が)が困難であったため、アプリ再起動ボタンを押下して強制的に再起動する
- 再起動後、再度上記設定を実施する必要がある
- Quitボタン、またはxを押下すると、アプリを終了する
- 起動後、以下パラメータを入力する
Windowsの場合(WebSocketクライアント)
- 前提として、本番システムのように
前段で認証が入る場合は接続エラー
となり使用できないことに注意すること - WebSocketTool_xxx.exeを起動する
- また、WebSocketサーバを別ウインドウ(別端末)で起動しておくこと
- 起動後、以下パラメータを入力する
- WebSocket接続先(例:wss://test-server.jp, ws://192.168.100.10など)
- 接続先のポート(1-65535の範囲内)
- メッセージ(内容は任意。また、クライアントの時刻を取得してメッセージに付加して送信する)
- 送信間隔(1-30秒の範囲)
- WebSocket接続&メッセージ送信ボタンを押下すると、WebSocket接続を開始し、上記パラメータでメッセージを送信する
- WebSocket切断ボタンを押下すると、メッセージ送信を停止する。WebSocketサーバとのソケットが生きている場合は、再度WebSocket接続&メッセージ送信ボタンを押下すると同じセッションでメッセージ送信を再開する
- WebSocketサーバ側の障害等でセッションが切れた場合は、再度WebSocket接続&メッセージ送信ボタンを押下して新規セッションを開始する
- Quitボタン、またはxを押下すると、アプリを終了する
- 起動後、以下パラメータを入力する
Linuxの場合(WebSocketサーバ)
-
前提として、Python3環境と、各種ライブラリをインストールしておく必要がある
pip install -r requirements.txt
-
websocket-server_for_linux.pyをサーバに配置する
-
上記スクリプトに実行権限を付与する
chmod +x ./websocket-server_for_linux.py
-
以下コマンドでWebSocketサーバを起動する
./websocket-server_for_linux.py --host 192.168.1.1 --port 1234
- --hostでサーバの待ち受けIPを指定する。(指定しない場合はlocalhostを自動で使用する)
- --portでサーバの待ち受けポートを指定する。(指定しない場合は8080を自動で使用する)
-
WebSocketサーバを終了する場合は、Ctrl+Cで終了する
ソースコード
requirements.txt
- Pythonで使用するモジュールは以下の通り。事前にpip installをお願いします
websocket-client==1.6.3
websocket-server @ git+https://github.com/Pithikos/python-websocket-server@56af8aeed025465e70133f19f96db18113e50a91
websockets==11.0.3
WebSocketTool.py
- GUI化するために、
PySimpleGUI
を使用しています
この辺りを参考にしています
- また、WebSocketのモジュールを利用するので、以下を参考に作成しています
- 注意点として、SSLサーバ証明書を利用したWebScoket通信もできるようになりますが、
オプションでSSLハンドシェイクを無視することもできるので、安直に使用しないよう
に注意してください
余談ですが...(失敗談)
- 余談ですが、ロードバランサーでSSLアクセラレーション機能を使用しており、証明書のインポート設定に不備があり、ツールを利用しての通信が失敗した
- しかしそこでロードバランサーの不備を疑わず、ツールを改良してSSLハンドシェイクを無視した形で社内環境での疎通試験を実施した
- 本番環境に投入後、一部アプリケーションの動作確認が取れず、
あえなく切り戻し
import websocket
import PySimpleGUI as sg
import datetime
import threading
import time
import logging
from websocket_server import WebsocketServer
import os
import sys
import signal
#####WebSocetクライアント################
def on_message(ws, message):
#print("受信: " + message)
window['-OUTPUT-'].update("受信: " + message + '\n', append=True, autoscroll=True)
def on_error(ws, error):
window['-OUTPUT-'].update("WebSocket connection closed, because: " + str(error))
def on_close(ws, close_status_code, close_msg):
window['-OUTPUT-'].update("### クライアントが切断しました ###\n" + "WebSocket connection closed, because: " + str(close_msg) + "\n", append=True, autoscroll=True)
def on_open(ws, msg, time_peoriod,uri,port):
def run(*args):
while True:
time.sleep(time_peoriod)
send_msg = msg + str(datetime.datetime.now())
if ws.sock and ws.sock.connected:
ws.send(send_msg)
window['-OUTPUT-'].update("[送信:" + uri + ":" + str(port) + "へ送信:\nメッセージ:\n===" + send_msg + '===]\n', append=True, autoscroll=True)
threading.Thread(target=run).start()
#WebSocketクライアントのメイン処理
def ws_client(uri,port,msg,time_period):
global ws
websocket.enableTrace(True)
time_period = time_period
ws = websocket.WebSocketApp(uri + ":" + str(port),
on_open=lambda ws: on_open(ws, msg, time_period,uri,port), # on_open関数にmsgを渡す
on_message=on_message,
on_error=on_error,
on_close=on_close)
try:
threading.Thread(target=ws.run_forever).start()
except Exception as e:
window['-OUTPUT-'].update("Error sending message: " + e, append=True, autoscroll=True)
#WebSocketクライアントの停止処理
def stop_ws_client():
global ws
if ws is not None:
try:
ws.send("### クライアントが切断しました ###\n")
except Exception as e:
window['-OUTPUT-'].update("Error sending message: " + e, append=True, autoscroll=True)
finally:
ws.close()
ws = None
#Websocketクライアントの初期設定
ws = None
time_period = 1
time_flg = False
ws_flg = True
########################################
#####WebSocetサーバ################
def new_client(client, server):
server.send_message_to_all(client["address"][0]+":" +str(client["address"][1]) + "が接続しました\n")
#window['-OUTPUT2-'].update("### クライアントが切断しました ###" + '\n', append=True, autoscroll=True)
def on_recieve(client, server, message):
window['-OUTPUT2-'].update("[受信:" + client["address"][0]+":" + str(client["address"][1]) + "から受信\nメッセージ:\n=== " + message + '===]\n', append=True, autoscroll=True)
def ws_server(host, port):
global server
server = WebsocketServer(port=port, host=host, loglevel=logging.INFO)
server.set_fn_new_client(new_client)
server.set_fn_message_received(on_recieve)
server.run_forever()
def stop_ws_server():
global server
if server is not None:
server.server_close()
server = None
#Websocketサーバの初期設定
server = None
ws_server_flg = True
##################################
##ポップアップウインドウ位置調整
def pop_location():
#初期化
pos = [0,0]
#現在のウインドウ位置を取得
pos_now = window.CurrentLocation()
pos[0] = pos_now[0]
pos[1] = pos_now[1]
#ポップアップウインドウを表示する位置を調整
pos = (pos[0] + 120, pos[1] + 200)
return pos
#1つのタブでのみ操作を許可する
tab_selct = None #Trueの場合はWebSocketクライアント
#テーマ
#テーマの種類は、sg.preview_all_look_and_feel_themes()で確認可能
sg.theme("LightGray2")
#表示する画面の設定
#WebSocketクライアント用タブ
tab_layout1=[
[sg.Frame("接続先",
[
[sg.Text("WebSocket接続先を入力してください:")],
[sg.Text("HTTP通信の場合はws://を、HTTPS通信の場合はwss://を使用します")],
[sg.InputText("ws://localhost",key="server_uri")],
[sg.Text("接続先のポート番号を入力してください:")],
[sg.Spin(values=list(range(0, 65536)), initial_value=8080,key="send_port" ,size=(10, 1))]
], expand_x=True, expand_y=True
)
],
[sg.Frame("メッセージ",
[
[sg.Text("送信するメッセージを入力してください:")],[sg.InputText("test",key="client_send_msg", size=(40, 1))],
[sg.Text("送信間隔を指定してください:")],
[sg.Text("※最小1秒/最大30秒")],
[sg.Spin(values=list(range(0, 31)), initial_value=1, size=(5, 1), key="time_period")]
]
)
],
[
sg.Button("WebSocket接続&メッセージ送信", key="client_connect"),
sg.Button("WebSocket切断", key="client_disconnect")
],
[sg.Button("Quit",key="client_quit")],
[sg.Multiline(size=(50, 10), expand_x=True, expand_y=True, key='-OUTPUT-')]
]
#WebSocketサーバ用タブ
tab_layout2=[
[sg.Frame("待ち受け情報",
[
[sg.Text("待ち受けのIPアドレスを入力してください:")],
[sg.InputText("localhost",key="server_ip" ,size=(10, 1))],
[sg.Text("待ち受けのポート番号を入力してください:")],
[sg.Spin(values=list(range(0, 65536)), initial_value=8080,key="server_port" ,size=(10, 1))]
], expand_x=True, expand_y=True
)
],
[
sg.Button("WebSocketサーバ開始", key="server_connect"),
sg.Button("アプリ再起動", key="server_restart")
],
[sg.Button("Quit",key="server_quit")],
[sg.Multiline(size=(50, 10), expand_x=True, expand_y=True, key='-OUTPUT2-')]
]
layout = [
[sg.TabGroup(
[
[sg.Tab("WebSocketクライアント", tab_layout1, background_color='#fdf5e6', title_color='#ff0000', expand_x=True, expand_y=True)],
[sg.Tab("WebSocketサーバ", tab_layout2, background_color='#f0ffff', title_color='#ff0000', expand_x=True, expand_y=True)]
]
)
]
]
#ウインドウ
window=sg.Window("WebSocketテストツール", layout, resizable=True)
#無限ループで画面を表示
while True:
event,values = window.read()
if event == sg.WIN_CLOSED:
break
####WebSocketクライアント画面処理####
if event == "client_quit":
#ポップアップウインドの表示
confirm = sg.popup_yes_no("終了しますか?", title="exit", location=pop_location())
if confirm == "Yes":
threading.Thread(target=stop_ws_client).start()
break
#WebSocketを開始する(クライアント側)
elif event == "client_connect":
#ポップアップウインドの表示
confirm = sg.popup_yes_no("WebSocketを接続しますか?", title="WebSocket接続", location=pop_location())
if confirm == "Yes":
if tab_selct != False:
if ws_flg:
uri = values["server_uri"]
port = values["send_port"]
time_period = values["time_period"]
msg = str(values["client_send_msg"]) + " "
threading.Thread(target=ws_client, args=(uri, port, msg, time_period)).start()
#WSが二重起動できないようにする
ws_flg = False
#WSサーバを開始できないようにする
tab_selct = True
else:
sg.popup("既にWebSocket接続を開始しています。", title="WebSocket接続中", location=pop_location())
else:
sg.popup("WebSocketサーバを開始しています。\nクライアントを開始するにはWebSocketサーバを停止してください。", title="WebSocketサーバ起動中", location=pop_location())
#WebSocketを切断する(クライアント側)
elif event == "client_disconnect":
confirm = sg.popup_yes_no("WebSocketを切断しますか?", title="WebSocket切断", location=pop_location())
if confirm =="Yes":
if ws is not None:
threading.Thread(target=stop_ws_client).start()
#flgを元に戻し、WS送信ができるようにする
ws_flg = True
#WSサーバを開始できるようにする
tab_selct = None
else:
sg.popup("WebSocket接続を開始していません。", title="WebSocket未接続", location=pop_location())
###########################
####WebSocketサーバ画面処理####
if event == "server_quit":
confirm = sg.popup_yes_no("終了しますか?", title="exit", location=pop_location())
if confirm == "Yes":
if server is not None:
threading.Thread(target=ws_server, args=(host,port)).start()
break
#WebSocketを開始する(サーバ側)
elif event == "server_connect":
confirm = sg.popup_yes_no("WebSocketを接続しますか?", title="WebSocket接続", location=pop_location())
if confirm == "Yes":
if tab_selct != True:
if ws_server_flg:
port = values["server_port"]
host = values["server_ip"]
threading.Thread(target=ws_server, args=(host,port)).start()
#WSサーバを二重起動できないようにする
ws_server_flg = False
#WSクライアントを開始できないようにする
tab_selct = False
else:
sg.popup("WebSocketクライアントを開始しています。\nサーバを開始するにはWebSocketクライアントを停止してください。", title="WebSocketクライアント起動中", location=pop_location())
#WebSocketサーバを再起動する(サーバ側)
elif event == "server_restart":
confirm = sg.popup_yes_no("WebSocketサーバを再起動しますか?", title="WebSocketサーバ再起動", location=pop_location())
if confirm =="Yes":
#flgを元に戻し、WSサーバを開始できるようにする
ws_server_flg = True
#flgを元に戻し、WSクライアントを開始できるようにする
tab_selct = None
os.execl(sys.executable, sys.executable, *sys.argv)
#########################
window.close()
os.kill(os.getpid(), signal.SIGTERM)
websocket-server_for_linux.py
- こちらもWebSocketを使用しますが、GUI化しない、サーバで待ち受けるだけということで、Windows版と比べて簡素化されています
#!/usr/bin/env python3
import argparse
import logging
from websocket_server import WebsocketServer
def new_client(client, server):
server.send_message_to_all(client["address"][0]+":" +str(client["address"][1]) + "が接続しました\n")
def on_recieve(client, server, message):
print( client["address"][0]+":" + str(client["address"][1]) + "から受信 : " + message)
def ws_server(host, port):
server = WebsocketServer(port=port, host=host, loglevel=logging.INFO)
server.set_fn_new_client(new_client)
server.set_fn_message_received(on_recieve)
try:
server.run_forever()
except KeyboardInterrupt:
print("Ctrl+Cが入力されました。サーバーを停止します。")
server.server_close()
#メイン処理
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='WebSocket server.')
parser.add_argument('--host', type=str, default='localhost', help='Host for the WebSocket server.')
parser.add_argument('--port', type=int, default=8080, help='Port for the WebSocket server.')
args = parser.parse_args()
#threading.Thread(target=ws_server, args=(args.host, args.port)).start()
ws_server(args.host, args.port)
exe化
- WindowsPCで.exeで配布して利用したい場合は、WebSocketTool.pyをauto_py_to_exeモジュールを使用することで.exe化できます
まとめ
- これで社内環境を利用した検証、試験が捗り、インフラエンジニアとしてはうれしい限りです
- 自社プロダクトなので、本来はきちんと予算建てして社内にミニチュアの検証環境を準備するべきでしょうが、中々そうもいかない場合が往々にしてあると思いますので、参考にしてもらえると幸いです