はじめに
TCP Socket を使って、プログラム間通信を行うPythonの並列計算プログラムを作りました。並列計算でMandelbrot集合の絵を描きます。
本来、分散コンピュータで並列計算をする場合は、MPIを使うのだと思うのですが、せっかく作ったので、また、Socketを直接使ったプログラム間通信プログラムは、私の今後のプログラミングにも役立つ可能性がありますし、誰か他の人の役に立つかもしれないので公開したいと思います。
IoTってIPを使った並列計算だと思うので、IoTLTのアドベントカレンダーに載せるの、許してください。
Mandelbrot 集合とはどんなものか?については以下のWikipediaの記事などに書かれています。
平面上の各点 $(x,y)$を複素数 $C=x+yi$とし、式
\begin{cases}
z_{n+1}=z^2_{n}+C \\
z_0=0
\end{cases}
を、$|z_{n}|$が一定値以内に収まっている間繰り返し、その繰り返し回数に応じて $(x,y)$に色をつけることにより、綺麗なフラクタル絵を描くことができます。$n$は無限に大きくなる可能性もあるので、繰返し回数の最大値を決め、その回数以内での繰り返し回数を数えます。なお、$z_{n+1}=z^2_{n}+C$は$z_{n+1}=z^2_{n}-C$としても構いません。今回作成したプログラムでは、$z_{n+1}=z^2_{n}-C$を使っています。
この式は、都合の良いことに、$C=x+yi$以外の値の影響を受けません。従って、平面上の複数の点について同時に計算することができます。並列計算によって描画時間を短くすることができる代表的な例の一つです。
普通ならこうすべき?
今のパソコンのCPUの多くはマルチコアなので、NumPyを使うことで内部的に並列計算をしてくれたりします。
奥村晴彦先生がMandelbrot集合の絵を描くプログラムを公表されています。
NumPy を使うことで高速化を行うとともに、NumPy.vectorizeを使うことで、for文をできるだけ使うことなしに、簡潔に記述されています。
奥村先生の説明のページでは、その上で Numba を使った高速化が示されています。
作成したプログラム
これは、複数のコンピュータを使って、Mandelbrot集合の絵の描画の高速化を試みる、べたな、並列計算プログラムです。NumPyは使っていません。
プログラム中でNumPyをimportする行はコメントアウトしてます。
NumPyを使うと、もっと速くなるかもしれません。
MasterプログラムとWorkerプログラムの2種類のプログラムがあって、Masterプログラムがどこか1つのコンピュータで動作し、Workerプログラムが別の複数のコンピュータで動作し、Masterと複数のWorkerの間で情報交換することで並列計算を行います。
MasterとWorkerは、1つのプログラムにまとめて、内部でif文でMasterとWorkerを区別することで、SPMD (Single Program, Multiple Data)にしています。
それぞれのコンピュータ間はIP reachable であることが必要になります。また、Masterとしてプログラムを動かすコンピュータの 9998番ポートを開けておく必要があります。
このプログラムを起動するときの引数によって、Masterプログラムとして動く場合と、Masterから指示を受けて計算を行うWorkerプログラムとして動く場合の区別がつくようにしています。
利用方法
1.
MasterとWorkerを動かすコンピュータを複数台用意しておきます。Masterは1つのWorkerと共有できます。(このご時世、PCもマルチコアなので、1台のPCで複数のWorkerを動かしても速くなるかもしれません)
2.
つぎに、上のプログラムをMasterとWorkerになるコンピュータそれぞれにコピーします。
3.
Masterになるコンピュータの端末(terminal)で、
$ python3 parallel_mandel.py master <np>
を実行します(python3はraspberry pi の場合)。ここで <np >は Workerの数です。このあと、
master step1, start TCP server, wait worker connection.
172.16.17.x
のように、Masterの IPアドレスが表示されます。(localhost, 127.0.1.1が表示されるかもしれません。ごめんなさい)。その後、
workerが<np>個起動され、masterに接続されるまで実行を待ちます。
4.
Workerになるコンピュータの端末(terminal)で、
$ python3 parallel_mandel.py client 172.16.17.x
を実行します。ここで、172.16.17.x がMasterのIPアドレスです。
5.
Masterの端末に、Workerが起動してTCP接続が行われるたびにそのメッセージが表示されます。3.で指定した<np>個すべてのWorkerがMasterに接続されたら、
master step2, send the pn(number of workers) and pid(assign) to workers.
send 4 0 to the 0th client
send 4 1 to the 1th client
...
master step 3. wait for the results of workers.
のように、各Workerに対して、Workerの数と、そのWorkerのIDが送信されることと、Workerからの計算結果を待つことを表示します。この後、Workerから途中の計算結果が届くたびに、Mandelbrot集合の表示の更新が行われます。
6.
すべての計算と表示が終了すると、MasterのMasterの端末に
nrec=1
nrec=2
...
master step 4.sum of all results of workers and output
lapse=38.64804553985596
OK?
が表示されます。lapse=の右は、計算と表示にかかった時間(秒)を表します。
プログラムの概要
Master プログラムが1つ以上のWorker プログラムに対して、計算する部分を指示し、
それぞれのWorkerプログラムは、その指示に従って、担当部分の計算を行います。
計算の割り当て
並列プログラムの計算時間をできるだけ短くするテクニックの1つとして、コンピュータの負荷をできるだけ均等にする、というのがあります。負荷が1箇所に集中すると、他が余裕があるのに、負荷が集中した箇所で時間がかかってしまう、という訳です。
今回、Masterはちょっと違うのですが、それぞれのWorkerの負荷をできるだけ均等にするために、負荷が重そうなところも軽そうなところも、
できるだけその負荷を分散するために、計算領域をx軸方向の1行ずつに分けて、それぞれを順番に別のコンピュータに割り当てるようにしています。
ソースコード
# parallel_mandel.py
# -*- coding: utf-8 -*-
#
import socket
import threading
import time
import sys
#import matplotlib.pyplot as plt
#import numpy as np
import tkinter as tk
import re
# 通信クラス
class Com:
HOST = ''
PORT = 9998
clxa = [] # 複数のclientのsocketとアドレスの組を格納するリスト
return_from_worker = []
server_receiver = None
client_receiver = None
def start_server(self):
handle_thread = threading.Thread(target=self.server_thread, args=())
# 生成した handle_threadの実行開始(start)
handle_thread.start()
def server_thread(self):
print("start server socket")
self.pid = 0
# socket.socket()...接続先のsocketを作成
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sock.binde ...socket をこのホスト('')のIPアドレスとPortに結合
self.soc.bind((self.HOST, self.PORT))
# sock.listen(10)...clientからの接続を待つ。10は最大の接続要求可能数
self.soc.listen(10)
while True:
# sock.accept()...client から接続があると、クライアントと接続する
# socket conと接続元IP address を返す
con, address = self.soc.accept()
clx = (con, address, self.pid) # clxに tuple (con, address,pid)を代入
self.clxa.append(clx) # clxaにclxを追加
print("[connect]{}".format(address))
self.pid = self.pid+1
# clientと送受信するためのthread, handler を生成。
handle_thread = threading.Thread(
target=self.server_handler, args=(clx,))
# 生成した handle_threadの実行開始(start)
handle_thread.start()
# client からsocketを通じてstreamの入力を行うthreadのhander
def server_handler(self, clx):
print("at server, connected, start handler")
con = clx[0] # clx[0] は con(socket),
address = clx[1] # clx[1] は address
pid = clx[2] # cls[2] は processor id
fx=con.makefile('r')
try:
while True:
l=fx.readline()
self.server_receiver.server_rx(l,pid)
except socket.error: # socketにエラーが発生したとき
print("socket error. exit")
con.close() # socketをclose
except:
import traceback
traceback.print_exc()
print("server_handler receive error.")
#recv_line = ""
#receiving=""
#while True:
# try:
# #data = con.recv(1024) # socket con から最大1024byte 受信 一般的には複数の行を含む。
# #recv_line = recv_line+data.decode("utf-8") #前の行が途中で終わっていた場合、その部分を先頭に加える。
# ##print("[receive]{} - {}".format(address, recv_line))
# #if recv_line.endswith('\n'): #行が途中で終わっていない。
# # #print("recv_line is endswith newline")
# # receiving="no rest"
# #else: #行が途中で終わっている。
# # #print("recv_line is not endswith newline")
# # receiving="rest"
# #lx=recv_line.splitlines()
# ##print("receiving="+receiving)
# #for il in range(len(lx)-1): #最後の行の手前まで、一行ずつ処理
# # l=lx[il]
# # self.server_receiver.server_rx(l, pid) #受け取った1行を処理
# #last_line=lx[len(lx)-1] #最後の行。
# ##print("last_line="+last_line)
# #if receiving=="rest": #行が途中で終わっている場合、その部分を次の行の先頭に加える。
# # #print("rest, recv_line="+last_line)
# # recv_line=last_line
# #else:
# # #print("no rest")
# # self.server_receiver.server_rx(last_line,pid)
# # recv_line=""
# except socket.error: # socketにエラーが発生したとき
# print("socket error. exit")
# con.close() # socketをclose
# break
# except:
# import traceback
# traceback.print_exc()
# print("server_handler receive error.")
# break
#
def send_ith_worker(self, line, i):
# print("send_ith_worker("+line+","+str(i)+")")
cx = self.clxa[i]
try:
cx[0].send(bytes(line+'\n','utf-8'))
except:
print("send_ith_worker error")
def start_client(self, host):
print("start client socket, connect, host="+host)
try:
# socket.socket ... socket を作成し、socに代入
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# IPアドレスがhostのserver socketにsocを接続
self.soc.connect((host, self.PORT))
# 受信担当の関数handlerをthreadでうごかすhandle_threadを生成。
self.handle_thread = threading.Thread(
target=self.client_handler, args=(self.soc,))
# handle_threadをstart
self.handle_thread.start()
except: # 接続時にエラーがあった場合の処理
import traceback
traceback.print_exc()
print("connect error.")
# 受信の処理。送信threadとは別に、並行して処理を行う。
def client_handler(self, soc):
print("at client, connected, start handler")
fx=soc.makefile('r')
try:
while True:
l=fx.readline()
(self.client_receiver).worker_rx(l)
except socket.error: # socketにエラーが発生したとき
print("socket error. exit")
con.close() # socketをclose
except:
import traceback
traceback.print_exc()
print("server_handler receive error.")
#recv_line = ""
#receiving=""
#while True:
# try:
# data = soc.recv(1024) # socket con から最大1024byte 受信 一般的には複数の行を含む。
# recv_line = recv_line+data.decode("utf-8") #前の行が途中で終わっていた場合、その部分を先頭に加える。
# #print("[receive]{} - {}".format(address, recv_line))
# if recv_line.endswith('\n'): #行が途中で終わっていない。
# #print("recv_line is endswith newline")
# receiving="no rest"
# else: #行が途中で終わっている。
# #print("recv_line is not endswith newline")
# receiving="rest"
# lx=recv_line.splitlines()
# #print("receiving="+receiving)
# for il in range(len(lx)-1): #最後の行の手前まで、一行ずつ処理
# l=lx[il]
# (self.client_receiver).worker_rx(l) #受け取った1行を処理
# last_line=lx[len(lx)-1] #最後の行。
# #print("last_line="+last_line)
# if receiving=="rest": #行が途中で終わっている場合、その部分を次の行の先頭に加える。
# #print("rest, recv_line="+last_line)
# recv_line=last_line
# else:
# #print("no rest")
# (self.client_receiver).worker_rx(last_line) #受け取った1行を処理
# recv_line=""
# except socket.error: # socketにエラーが発生したとき
# print("socket error. exit")
# soc.close() # socketをclose
# break
# except:
# import traceback
# traceback.print_exc()
# print("server_handler receive error.")
# break
def get_client_number(self):
return len(self.clxa)
def send_to_server(self, line):
self.soc.send(bytes((line+'\n'), "utf-8"))
#
# server, clientにおいて、受け取ったデータを処理するreceiverの設定
def set_server_receiver(self, rec):
self.server_receiver = rec
def set_client_receiver(self, rec):
self.client_receiver = rec
#
def close_all(self):
for cx in self.clxa:
cx[0].close()
# Mandel 並列計算クラス
class Mandel:
isMaster = False
results = []
cmap=["#000000","#050000","#000500","#000005",
"#100000","#001000","#000010",
"#200000","#002000","#000020",
"#505000","#500050","#005050",
"#7f0000","#007f00","#00007f"]
receiving=""
M = 100
def mandel(self, c):
k = 0
z = 0
while k < self.M and abs(z) < 2:
z = z**2 - c
k += 1
return k
def server_continue_thread(self,dmy):
if self.isMaster:
# master step 3: worker からの結果を待つ
print("master step 3. wait for the results of workers.")
while self.nrec < self.pn:
time.sleep(0.01)
# master step 4: 結果の総和と出力
print("master step 4.sum of all results of workers and output")
end_time = time.time()
print("lapse="+str(end_time-self.start_time))
dmy=input("OK?")
self.com.close_all()
self.root.destroy()
exit(1)
def draw_point(self, x, y, z):
#print("draw_point x="+str(x)+" y="+str(y)+" z="+str(z))
color = "#000000"
if z >= 100:
color = "#ff0000"
elif z > 80:
color = "#00ff00"
elif z > 60:
color = "#0000ff"
elif z > 40:
color = "#ffff00"
elif z > 30:
color = "#0fff0f"
elif z > 20:
color = "#fff0f0"
elif z > 10:
color = "#ff00ff"
elif z > 8:
color = "#00ffff"
elif z>0:
ix=int(2*z)
if ix <16:
color=self.cmap[ix]
#print("x="+str(x)+" y="+str(y))
ix = (x+1.0)*((self.xmax)/(3.2))
iy = (y+1.2)*((self.ymax)/(2.4))
#print("x="+str(x)+" y="+str(y)+" z="+str(z)+" ix="+str(ix)+" iy="+str(iy)+" color="+color)
if self.canvas != None:
self.canvas.create_rectangle(float(ix), float(iy), float(ix), float(iy), fill=color, width=0)
#print("end draw_point")
def __init__(self, mx, px):
# SPMD Single Program, Multiple Data で表現
# px: mx が masterのときははworkerの数、workerのときは masterのIPアドレス。
self.com = Com()
if mx == "master":
self.isMaster = True
self.pn = int(px)
self.com.set_server_receiver(self)
else:
self.isMaster = False
self.master_ip = px
self.px = 0
self.pn = 0
self.com.set_client_receiver(self)
self.xmax = 640
self.ymax = 480
if self.isMaster:
# for display
self.root = tk.Tk()
self.root.geometry("700x500")
# Canvasの作成
self.canvas = tk.Canvas(
self.root, bg="white", width=self.xmax, height=self.ymax)
# Canvasを配置
self.canvas.pack(fill=tk.BOTH, expand=True)
# master step 1: tcpサーバー起動、worker の接続を待つ
print("master step1, start TCP server, wait worker connection.")
host = socket.gethostname()
# ipアドレスを取得、表示
ip = socket.gethostbyname(host)
print(ip) # 192.168.○○○.○○○
print("workder number="+str(self.pn))
self.nrec = 0
self.results = [0.0]*self.pn
# workerからの接続待ちstart
self.com.start_server()
# workerの数がpxになるまで待つ。
i = 0
while i < self.pn:
i = self.com.get_client_number()
# master sterp 2: wokerに、worker数と担当部分を送る。
print(
"master step2, send the pn(number of workers) and pid(assign) to workers.")
self.nrec = 0
self.start_time = time.time()
for i in range(self.pn):
sending = str(self.pn)+" "+str(i)
print("send "+sending+" to the "+str(i)+"th client")
self.com.send_ith_worker(sending, i) # send n
continue_thread = threading.Thread(
target=self.server_continue_thread, args=(None,), daemon=True)
continue_thread.start()
self.root.mainloop()
else:
# worker step 1: masterに接続
print("worker step1, connect to the master.")
(self.com).start_client(self.master_ip)
# worker step 2: master からの総数(pn)と割り当て通知を待つ
print("worker step2, wait until receiving pn and pid.")
while self.pn == 0:
time.sleep(0.001)
# worker step 3. 担当部分のpiの計算
print("worker step3, calculate the assinge part of pi")
s = 0.0
# 割り当てられた部分のmandelの計算
for i in range(0, self.ymax):
if self.pid == i % self.pn: # 割り当て部分は、この式で求める。
for xx in range(0, self.xmax):
#yy = (self.ymax/self.pn)*i+self.pid
y = -1.2+float(i)*2.4/float(self.ymax)
x = -1.0+xx*3.2/float(self.xmax)
z = self.mandel(x+y*1j)
self.send_result(x, y, z)
self.send_end_makar()
self.com.close_all()
exit(1)
def server_rx(self, line, pid):
#print("line="+line)
rx=line.split(',')
kx = rx[0]
if kx == "p":
#print("len_rx="+str(len_rx))
try:
self.draw_point(float(rx[1]), float(rx[2]), float(rx[3]))
except:
import traceback
traceback.print_exc()
print("except receiving="+self.receiving+" line="+line)
return
elif kx == "e":
self.nrec = self.nrec+1
print("nrec="+str(self.nrec))
def worker_rx(self, data):
print("worker_rx data="+data)
# print("pn="+str(self.pn))
rx = data.split()
self.pn = int(rx[0])
self.pid = int(rx[1])
print("pn="+str(self.pn))
def send_result(self, x, y, z):
xline = "p,"+str(x)+","+str(y)+","+str(z)
#print("c->s "+xline)
self.com.send_to_server(xline)
def send_end_makar(self):
xline = "e,0.0,0.0,0.0"
self.com.send_to_server(xline)
# 起動
# master: $ python3 parallel_mandel.py master <np>
# <np>は workerの数。
# workerが<np>個起動され、masterに接続されるまで待つ。
# master-ip=172.16.17.x #masterを起動すると、masterのIPアドレスが表示される。
#
# worker: $ python3 parallel_mandel.py client 172.16.17.x
# masterのIPを指定して clientを起動
if __name__ == "__main__":
# master: python3 parallel_mandel.py master <no.of clients>
# client: python3 parallel_mandel.py worker <serverのIPアドレス>
args = sys.argv
calpi = Mandel(args[1], args[2])
class Com: はプログラム間通信に関する機能をまとめたクラスです。もともとrawな recv を使ってデータの受信をしていたのですが、makefile とreadlineを使うことによって、大分、短くなりました。元のrecvを使ったプログラムはコメントアウトすることで残しています。recvを使うと、TCPなのにデータの取りこぼしを防ぐために、結構苦労しました。きっと何か良い方法があると思うのですが、とりあえず、makefileとreadlineが良いのかな?
class Mandel: がMandelbrot集合を並列計算して描画するプログラムの部分です。
実行例
手元にRaspberry Pi 3のクラスタがあるので、それを使って実行してみました。
Workerをそれぞれ異なるRaspberry Pi 3 に割り当てます。
Worker が2つの場合
実行時間、227.0秒
Workerが4つの場合
実行時間、38.6秒。
あれ? 227.0の半分は113.5くらいになるはずなんだけど、5倍くらい速くなってますね。なぜだろ?