この投稿は、並列処理して、ループ計算時間を短縮するpythonプログラムを作成した時の記録です。並列処理のクラスターには上の写真の4台のRaspberry Pi 4Bを使いました。Raspberry Pi 4Bは大きめのヒートシンクが必須です。今回はまだファンは回していません。もう少し発熱が激しくなったら水冷にしようかとも思っていました。
主題の今回作成したPythonプログラムは動くようになり、4台のクラスターでの並列処理で、下表のように計算時間を4秒から1.1秒へ、約1/4に短縮できました。また1台だけで1スレッドから4スレッドへ変えただけでも、計算時間を8秒から4秒へ短縮できました。 この部分削除start **** これはスレッドセーフでないコードの頻度が少ないため、GILによる影響が小さくこの部分削除end **** 1台内の4個のスレッドによる並列処理が効果的であった例外です。しかしプログラムが完成するまでには多くの紆余曲折(エラーが出続ける)がありました。この奮戦記では、特にエラーをクリアするのに苦労した箇所をピックアップして紹介します。また本稿の末尾に作成したプログラムを添付しています。
機種 | Raspberry Pi 4B | Raspberry Pi 4B | Raspberry Pi 4B |
---|---|---|---|
台数 | 1台 | 1台 | 4台 |
スレッド数 | 1 スレッド | 4 スレッド | 4 スレッド(各Pi) |
処理時間 | 8秒 | 4秒 | 1.1秒 |
import モジュール | time | time threading | time threading socket |
LANネットワーク構成とOS
奮戦記の前に、今回使ったネットワーク構成を簡単に紹介します。4台のRaspberry Pi 4Bのクラスターと1台のRaspberry Pi Zeroで構成しています。OSは全てRaspberry Pi OS with desktop : 2020-12-02-raspios-buster-armhf.img です。各Piは、コンソールからキーボード・マウス・モニターそしてタッチでの操作、またはLANネットワーク経由のsshでの操作、どちらも可能にしました。プログラムの都合上、全て固定IPアドレスにしました。
各PiのOSインストールおよび初期設定はコンソールから実施しました。具体的な初期設定のポイントは4点です。 ① OSインストール用のSDカードに空のsshファイルを追加します。 ② 同じくSDカードのconfig.txtファイルの末尾にHDMIモニター用設定項目を追加します。そしてPiの電源をONにして起動します。 ③ 起動後、上図右のようにネットワーク設定で固定IPアドレスにします。 最後の設定④はキーボード機種により必要ないかもしれません。今回、テンキーの無いコンパクトな新しいキーボードを買ったので'@'がキーボードから入力できませんでした。④ キーボード設定: Localisationで、TimezoneはAsia Tokyo、Keyboardは105, US, US、LocaleはEnglish, US, US, UTF-8にしました。これで'@'がキーボードから入力可能になりました。
ここまでの初期設定は無難に通過しました。しかし次のpythonのプログラム作成時に、解決不可能(初心者の私には)と思えるエラーが続出しました。ここからが奮戦記の始まりです。またプログラム作成中はWindows 10 (x64)上で、Python 3.7.9 IDLEを使っています。さらにWindowsから各Piへのssh接続にはPuttyを、同じくWindowsから各Piへのファイル転送にはWinSCPを使いました。Raspberry Piとの暗号化接続を自動でやってくれ操作もシンプルで優れもののソフトで大変助かりました。ベストセラーです。
最初は、解決不可能と思ったポイント
・1台での4スレッド処理
・各スレッド計算値の合計
・浮動小数点の2台間のsocket通信
・リストの2台間のsocket通信
まず、1台での4スレッド処理でつまずきました。Googleでずっと探しました。しかしありません。これはダメかなとあきらめながら、以下のサイトを見たりもしました。
https://docs.python.org/ja/3/library/threading.html?highlight=thread#threading.Thread
定義などがわかりますが、何を書いてあるのかわからい部分も多く、自分のプログラム自体は動きませんでした。multiprocessingがいいようなことを書いてあったので試してみました。しかし全く動きません。threadingもmultiprocessingも、とにかく並列処理はあきらめました。
先に2台間のsocket通信に進みます。このsocket通信は逆にあっさりと順調で、相互のモニターに受信した文字列が次々と表示されます。3台、4台と増やしていっても、socket通信での文字列の送受信はうまくいきました。この時点で複数台間での並列処理の原型が一応完成しました。しかし目標は多重ループ計算ですから、割算をすれば浮動小数点の計算結果が出ます。これを複数台からsocket通信で受信し、そして合計値を出さなければなりません。しかし、この浮動小数点の2台間のsocket通信でつまずきました。
浮動小数点の2台間のsocket通信
うまくいっていた文字列のsocket通信で、文字列を浮動小数点に変えただけです。突然、float(浮動小数点)はbyte型には変換できませんとエラーが出ました。socket通信ではbyte型しか送受信できず、byte型にencodeして送信し、受信側でdecodeして文字列に戻すと理解していました。しかしエラー表示で浮動小数点はbyte型にencodeできないことがわかりました。このエラーは、送信側で文字列に変換するstr、受信側で文字列を浮動小数点に変換するfloatで、各1行を追加するだけで、以下のようにスッキリとクリアしました。
送信側
# 浮動小数点の数値
total21 = 12345.6789
# 浮動小数点の数値を文字列にする
total21str = str(total21) #### strで文字列へ変換
# 接続中のsocket21から文字列をbyte型にencodeして送信
socket21.send(bytes(total21str, 'utf-8')
受信側
# 接続中のclientsocket21で、encodeされたbyte型の文字列受信
total21strEncode = clientsocket21.recv(1016)
# decodeして文字列に戻す
total21str = total21strEncode.decode("utf-8")
# 文字列を浮動小数点の数値に戻す
total21 = float(total21str) #### floatで浮動小数点の数値へ変換
同様に、文字列を整数に変換するintを使えば、整数の数値もsocket通信できます。次に、整数の数値および浮動小数点の数値が混在したリストをまとめてsocket通信できないかを試行しました。エラー表示が出ました。リストはbyte型にencodeできません。
リストの2台間のsocket通信
これも文字列にして送信することを試みました。以外にあっさりと文字列に変換され送信側は以下のようにクリアしました。
送信側
# リスト
lists21 = [10.01, 1234]
# リストを文字列にする
list21str = str(lists21) #### strで文字列へ変換
# 接続中のsocket21から文字列list21strをbyte型にencodeして送信
socket21.send(bytes(list21str, 'utf-8')
受信側
# 接続中のclientsocket21で、encodeされたbyte型の文字列受信
list21strEncode = clientsocket21.recv(1016)
# decodeして文字列に戻す
list21str=list21strEncode.decode("utf-8")
list21str = list21str.replace('[', '')####
list21str = list21str.replace(']', '')####
lists21str=[]
lists21str=list21str.split(', ')
# ["10.01", "1234"]
lists21=[]
lists21.append(float(lists21str[0]))
lists21.append(int(lists21str[1]))
# [10.01, 1234]
受信側は全くスッキリとはいきませんでしたが、どうにか送信前のリストを取得することができました。
1台での4スレッド処理
再びthreadingにチャレンジしました。Googleでずっと探しました。意外なことにargsの引数の後に","1個追加することで解決しました。Googleでずっと探していた時に、どこかで見たことがあったのですが削除していました。なぜ解決したのか再び探したのですが原因は分かりませんでした。
追記:関数の引数を丸括弧()のタプルとした場合、要素が1個のpythonのタプルの仕様と同様に、1個の引数の後つまり末尾にカンマ','が必要ということでした。
各スレッド計算値の合計
動き始めたスレッドでの並列処理で、各スレッドの計算値を合計しなければなりません。しかしスレッドの中の変数は使えませんでした。returnで戻しても使えませんでした。計算したが合計値が出せないとどうしようもありません。ここで長らく止まりました。これもappendをtotals(リスト)に適用すれば解決できました。
totals.append(total21)
以下のサイトに詳しく載っていました。
https://docs.python.org/ja/3/tutorial/datastructures.html
======= この部分を削除 start =======
通常は何も気にならない1行です。しかし次に示す「作成したpythonプログラム」中では、グローバルなtotals(リスト)を、各threadが同時期に共通して使います。つまり今回の「作成したpythonプログラム」中では、スレッドセーフでないコードになってしまいます。しかしPythonではGILが効いて「各thread間の競合を自動で回避」してくれます。ソフトウェア―版のDMAチャンネルに似た優れた機能があるようです。
======= この部分を削除 end =======
PythonのGILとはマルチスレッディングでも1coreに限定し、キーボード・マウス・モニターへの余力を確保する機能でした。「各thread間の競合を自動で回避」してくれる機能ではありませんでした。前記は認識違いでした。Raspberry Pi 3と4のCPUの4core全てを使うにはマルチプロセッシングでプログラムを構築しなければなりませんでした。但しキーボード・マウス・モニターがほとんど反応しなくなります。
作成したpythonプログラム
cluster21.pyをPiフォルダーに置いて、python3 で実行して待機。他の3台はcluster21.pyをコピペし、それぞれ 22, 23, 24 へ「すべて置換」する。同様にPiフォルダーに置いて、python3 で実行して待機。以下、pythonプログラムらしくない、pythonプログラムなのですが、公開します。
import time
import threading
import socket
total = 0
totalsG0 = []####変更追加
totalsG1 = []####変更追加
totalsG2 = []####変更追加
totalsG3 = []####変更追加
# totals = []####変更コメントアウト
def calc_1s(n):
if n == 0:
print(n)
total0 = 0
for t0 in range(list21lint, list21mint):
for u0 in range(L0):
total0 += t0*u0*list21cfloat0
print(f"End of {n}")
totalsG0.append(total0)####修正追加
print(total0)
print(totalsG0)
#totals.append(total0)####GIL適用は1回だけ ←間違いコメントアウト
elif n == 1:
print(n)
total1 = 0
for t1 in range(list21nint, list21oint):
for u1 in range(L1):
total1 += t1*u1*list21dfloat1
print(f"End of {n}")
totalsG1.append(total1)####修正追加
print(total1)
print(totalsG1)
#totals.append(total1)####GIL適用は1回だけ ←間違いコメントアウト
elif n == 2:
print(n)
total2 = 0
for t2 in range(list21pint, list21qint):
for u2 in range(L2):
total2 += t2*u2*list21efloat2
print(f"End of {n}")
totalsG2.append(total2)####修正追加
print(total2)
print(totalsG2)
#totals.append(total2)####GIL適用は1回だけ ←間違いコメントアウト
elif n == 3:
print(n)
total3 = 0
for t3 in range(list21rint, list21sint):
for u3 in range(L3):
total3 += t3*u3*list21ffloat3
print(f"End of {n}")
totalsG3.append(total3)####修正追加
print(total3)
print(totalsG3)
#totals.append(total3)####GIL適用は1回だけ ←間違いコメントアウト
# 計算用リスト受信
socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket21.bind(('192.168.2.21', 60021))
socket21.listen(5)
clientsocket21, address21 = socket21.accept()
list21strEncode = b''
list21strEncode = clientsocket21.recv(1016)
list21str=list21strEncode.decode("utf-8")
print(list21str)
list21str = list21str.replace('[', '')
list21str = list21str.replace(']', '')
lists21str=[]#lists21str:文字列用にリスト新設
lists21str=list21str.split(', ')
list21aint=int(lists21str[0])
list21bint=int(lists21str[1])
list21cfloat0=float(lists21str[2])
list21cfloat1=list21cfloat0
list21cfloat2=list21cfloat0
list21cfloat3=list21cfloat0
list21dfloat0=float(lists21str[3])
list21dfloat1=list21dfloat0
list21dfloat2=list21dfloat0
list21dfloat3=list21dfloat0
list21efloat0=float(lists21str[4])
list21efloat1=list21efloat0
list21efloat2=list21efloat0
list21efloat3=list21efloat0
list21ffloat0=float(lists21str[5])
list21ffloat1=list21ffloat0
list21ffloat2=list21ffloat0
list21ffloat3=list21ffloat0
list21gfloat=float(lists21str[6])
list21hfloat=float(lists21str[7])
list21ifloat=float(lists21str[8])
list21jfloat=float(lists21str[9])
list21kfloat=float(lists21str[10])
list21lint=int(lists21str[11])
list21mint=int(lists21str[12])
list21nint=int(lists21str[13])
list21oint=int(lists21str[14])
list21pint=int(lists21str[15])
list21qint=int(lists21str[16])
list21rint=int(lists21str[17])
list21sint=int(lists21str[18])
list21tint=int(lists21str[19])
lists21=[]#lists21:数値用にリスト新設
lists21.clear()####
lists21.append(int(lists21str[0]))
lists21.append(int(lists21str[1]))
lists21.append(float(lists21str[2]))
lists21.append(float(lists21str[3]))
lists21.append(float(lists21str[4]))
lists21.append(float(lists21str[5]))
lists21.append(float(lists21str[6]))
lists21.append(float(lists21str[7]))
lists21.append(float(lists21str[8]))
lists21.append(float(lists21str[9]))
lists21.append(float(lists21str[10]))
lists21.append(int(lists21str[11]))
lists21.append(int(lists21str[12]))
lists21.append(int(lists21str[13]))
lists21.append(int(lists21str[14]))
lists21.append(int(lists21str[15]))
lists21.append(int(lists21str[16]))
lists21.append(int(lists21str[17]))
lists21.append(int(lists21str[18]))
lists21.append(int(lists21str[19]))
print(lists21)
clientsocket21.send(bytes("OK 21 Start ....", 'utf-8'))
clientsocket21.close()
socket21.close()
if list21aint == 1:
# calc_1s Calcuration Start
L0 = list21bint
L1 = list21bint
L2 = list21bint
L3 = list21bint
initial_time = time.time()
print(f"CPU is calculating now !")
# Start Threads
threads = []
for n in range(4):# 4 thread
thread = threading.Thread(target=calc_1s, args=(n,))#
thread.start()
threads.append(thread)
# Wait Threads
for thread in threads:
thread.join()
total += totalsG0[0]####変更追加
total += totalsG1[0]####変更追加
total += totalsG2[0]####変更追加
total += totalsG3[0]####変更追加
#for x in totals:#####変更コメントアウト
# total += x#####変更コメントアウト
print(f"Answer :")
print (total)
print(f"Time :")
print(str(time.time() - initial_time))
# Calculation Result Return === Fixed 20 ===
socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket21.connect(('192.168.2.20', 60021))
totalstr = str(total)
socket21.send(bytes(totalstr, 'utf-8'))
msg21 = b''
msg21 = socket21.recv(16)
socket21.close()
print(msg21.decode("utf-8"))
time.sleep(10) # for display
import time
import threading
import socket
## 4000回の2重ループ計算用のリスト 4台のRaspberry Pi 4B で 約1.1秒
lists21 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 0, 251, 251, 501, 501, 751, 751, 1001, 19]
lists22 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 1001, 1251, 1251, 1501, 1501, 1751, 1751, 2001, 19]
lists23 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 2001, 2251, 2251, 2501, 2501, 2751, 2751, 3001, 19]
lists24 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 3001, 3251, 3251, 3501, 3501, 3751, 3751, 4001, 19]
total = 0.
totalsG21 = [0.]####変更追加
totalsG22 = [0.]####変更追加
totalsG23 = [0.]####変更追加
totalsG24 = [0.]####変更追加
# totals = []#**#変更コメントアウト
def connectto_ipports(n):
if n == 60021:
print(n)
socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket21.connect(('192.168.2.21', 60021))
list21str=''
list21str=str(lists21)
socket21.send(bytes(list21str, 'utf-8'))#リストをclusterへ送信
msg21 = b''
msg21 = socket21.recv(16)
socket21.close()
print(msg21.decode("utf-8"))
elif n == 60022:
print(n)
socket22 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket22.connect(('192.168.2.22', 60022))
list22str=''
list22str=str(lists22)
socket22.send(bytes(list22str, 'utf-8'))#リストをclusterへ送信
msg22 = b''
msg22 = socket22.recv(16)
socket22.close()
print(msg22.decode("utf-8"))
elif n == 60023:
print(n)
socket23 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket23.connect(('192.168.2.23', 60023))
list23str=''
list23str=str(lists23)
socket23.send(bytes(list23str, 'utf-8'))#リストをclusterへ送信
msg23 = b''
msg23 = socket23.recv(16)
socket23.close()
print(msg23.decode("utf-8"))
elif n == 60024:
print(n)
socket24 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket24.connect(('192.168.2.24', 60024))
list24str=''
list24str=str(lists24)
socket24.send(bytes(list24str, 'utf-8'))#リストをclusterへ送信
msg24 = b''
msg24 = socket24.recv(16)
socket24.close()
print(msg24.decode("utf-8"))
def accept_ports(n):
if n == 60021:
print(n)
socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket21.bind(('192.168.2.20', 60021))
socket21.listen(5)
clientsocket21, address21 = socket21.accept()
total21en = b''
total21en = clientsocket21.recv(1016)
total21 = float(total21en.decode("utf-8"))
totalsG21.append(total21)###修正追加
del totalsG21[0]###修正追加
#totals.append(total21)####GIL適用は1回だけ ←間違いコメントアウト
print(total21)
clientsocket21.send(bytes("Thank you ......", 'utf-8'))
clientsocket21.close()
socket21.close()
elif n == 60022:
print(n)
socket22 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket22.bind(('192.168.2.20', 60022))
socket22.listen(5)
clientsocket22, address22 = socket22.accept()
total22en = b''
total22en = clientsocket22.recv(1016)
total22 = float(total22en.decode("utf-8"))
totalsG22.append(total22)###修正追加
del totalsG22[0]###修正追加
#totals.append(total22)####GIL適用は1回だけ ←間違いコメントアウト
print(total22)
clientsocket22.send(bytes("Thank you ......", 'utf-8'))
clientsocket22.close()
socket22.close()
elif n == 60023:
print(n)
socket23 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket23.bind(('192.168.2.20', 60023))
socket23.listen(5)
clientsocket23, address23 = socket23.accept()
total23en = b''
total23en = clientsocket23.recv(1016)
total23 = float(total23en.decode("utf-8"))
totalsG23.append(total23)###修正追加
del totalsG23[0]###修正追加
#totals.append(total23)####GIL適用は1回だけ ←間違いコメントアウト
print(total23)
clientsocket23.send(bytes("Thank you ......", 'utf-8'))
clientsocket23.close()
socket23.close()
elif n == 60024:
print(n)
socket24 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket24.bind(('192.168.2.20', 60024))
socket24.listen(5)
clientsocket24, address24 = socket24.accept()
total24en = b''
total24en = clientsocket24.recv(1016)
total24 = float(total24en.decode("utf-8"))
totalsG24.append(total24)###修正追加
del totalsG24[0]###修正追加
#totals.append(total24)####GIL適用は1回だけ ←間違いコメントアウト
print(total24)
clientsocket24.send(bytes("Thank you ......", 'utf-8'))
clientsocket24.close()
socket24.close()
threads = []
# Thread Client Start
for n in range(60024, 60025):
thread = threading.Thread(target=connectto_ipports, args=(n,)) #
thread.start()
threads.append(thread)
# Wait Threads
for thread in threads:
thread.join()
# Result Display
print("All Clusters Calculation Start")
threads.clear()
# Accept Thread Server Start
for n in range(60024, 60025):
thread = threading.Thread(target=accept_ports, args=(n,)) #
thread.start()
threads.append(thread)
# Wait Threads
for thread in threads:
thread.join()
total += totalsG21[0]####変更追加
total += totalsG22[0]####変更追加
total += totalsG23[0]####変更追加
total += totalsG24[0]####変更追加
# for x in totals:#####変更コメントアウト
# total += x #####変更コメントアウト
# Result Display
print("Calculation Result:")
print(total)
time.sleep(10) # for display
# total20.py 固定IPアドレス 192.168.2.20 (pyを最後に実行して、計算開始)
# cluster21.py 固定IPアドレス 192.168.2.21 (pyを先に実行して待機)
# cluster22.py 固定IPアドレス 192.168.2.22 (pyを先に実行して待機)
# cluster23.py 固定IPアドレス 192.168.2.23 (pyを先に実行して待機)
# cluster24.py 固定IPアドレス 192.168.2.24 (pyを先に実行して待機)
## OSは全てRaspberry Pi OS with Desktop : 2020-12-02-raspios-buster-armhf.img
最後までご覧いただき、ありがとうございました。
今回はQiitaの主旨に同意して投稿していますので、公開したプログラムは書き換え、改変などでのご使用は自由です。著作権に関する問題も発生しません。
ただし、Raspberry Pi 4Bを使う場合にはCPUに特に大きめのヒートシンクが必須です。LAN通信の頻度の少ない今回のプログラムのような場合、LANチップは熱くなりません。しかし計算時間が継続するとCPUの激しい発熱で分かるように相当電力を使っています。電源USB Cタイプの後ろにある黒い小さなチップも熱くなりますので、ファンでの風流なども必要です。