はじめに
最近、Pythonで並列処理させるthreading
を使う機会があった。
ハードのI/O待ちが発生する組み込み開発において活躍してくれた。
この機会にthreadingを完全に理解したという状態にしておきたい。
自分がよく使いそうなケース別にサンプルを書いてみた。
正確な理解には公式ドキュメントを。
今回のサンプルは、GitHubに全部まとめてUPしている。
記事の章立てと番号は合わせている。
事例
Case01
メインスレッドの終了と同時に、生成したスレッドを終了させたい
→threading.Thread(daemon=True)
python main.py
-> ctrl+C
しても終了できないゾンビスレッドの生成を抑制できる。
ちゃんとスレッドの終了をコントロールしたい場合はthreading.Event()
が公式的に推奨。
import threading
import time
def threadFoo():
while True:
print("foo")
time.sleep(1)
if __name__ == "__main__":
#daemonをTrueにすると、メインスレッドと同時に終了できる
thread = threading.Thread(target=threadFoo, name='thread1',daemon = True)
thread.start()
time.sleep(5)
print("---end---")
Case02
生成したスレッドの処理が終わるまでメインスレッドをブロックしたい
->threading.Thread.join()
を使う
thread処理が終了するまでmainスレッドの処理をブロックできる。
メインスレッドと生成したスレッドで同期的な処理が書ける。
import threading
import time
def threadFoo():
for i in range(5):
print("foo" + str(i))
time.sleep(1)
if __name__ == "__main__":
thread = threading.Thread(target=threadFoo, name='thread1',daemon=True)
thread.start()
thread.join()
print("---end---")
foo0
foo1
foo2
foo3
foo4
---end---
Case03
生成したスレッドの処理が終わるまでメインスレッドをブロックしたいが、
メインスレッドのブロックに時間制約を設けたい。
->threading.Thread.join(timeout=**)
生成スレッドの処理が永遠に終わらなく、メインスレッドがデッドロックになってしまう状態を防げる。
import threading
import time
def threadFoo():
for i in range(5):
print("foo" + str(i))
time.sleep(1)
if __name__ == "__main__":
thread = threading.Thread(target=threadFoo, name='thread1',daemon=True)
thread.start()
thread.join(timeout=2)
print("---end---")
time.sleep(5)
foo0
foo1
foo2
---end---
foo3
foo4
Case04
生成したスレッドをメインスレッドから途中で止めたい
->threading.Event()
import threading
import time
event = threading.Event()
def threadFoo():
while True:
print("thread start")
time.sleep(0.2)
#eventのフラグが立つまで待ち受ける(defalut:False)
event.wait()
print("thread end")
thread = threading.Thread(target=threadFoo,daemon=True)
thread.start()
ii = 0
print("-------------------defalut-----------------------")
while True:
for i in range(5):
time.sleep(0.3)
print("main thread" + str(i))
if ii%2 == 0:
#set()によって待ち受け状態が解放される
print("-------------------event.set()-----------------------")
event.set()
else:
#clear()でフラグを落とす
print("-------------------event.clear()-----------------------")
event.clear()
ii += 1
-------------------defalut-----------------------
main thread0
main thread1
main thread2
main thread3
main thread4
-------------------event.set()-----------------------
thread end
thread start
thread end
thread start
main thread0
thread end
thread start
main thread1
thread end
thread start
thread end
thread start
main thread2
thread end
thread start
main thread3
thread end
thread start
thread end
thread start
main thread4
-------------------event.clear()-----------------------
main thread0
main thread1
main thread2
main thread3
main thread4
Case05
生成したスレッドをメインスレッドから途中で止めたいが、
生成スレッドのブロックに時間制約を付けたい
->threading.Event().wait(timeout=**)
import threading
import time
event = threading.Event()
def threadFoo():
while True:
print("thread start")
time.sleep(0.1)
#timeoutを使うことでデッドロックを解放できる
event.wait(timeout=3)
print("thread end")
thread = threading.Thread(target=threadFoo,daemon=True)
thread.start()
while True:
for i in range(5):
time.sleep(0.3)
print("main thread" + str(i))
Case06
生成した複数のスレッドが行う処理の競合を防ぐ
->threading.Lock().acquire()
bad example:排他にならない競合処理
import threading
import time
b = 0
def threadFoo(addVal):
global b
localb = b
time.sleep(3)
localb += addVal
b = localb
print("val in thread: " + str(b))
thread1 = threading.Thread(target=threadFoo,args=(1,),daemon=True)
time.sleep(1)
thread2 = threading.Thread(target=threadFoo,args=(2,),daemon=True)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("final val: " + str(b))
good example:threading.Lock()
を活用して排他にする
import threading
import time
lock = threading.Lock()
b = 0
def threadFoo(lock, addVal):
global b
with lock:
localb = b
time.sleep(3)
localb += addVal
b = localb
print("val in thread: " + str(b))
thread1 = threading.Thread(target=threadFoo,args=(lock, 1,),daemon=True)
time.sleep(1)
thread2 = threading.Thread(target=threadFoo,args=(lock, 2,),daemon=True)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("final val: " + str(b))
Case07
生成したスレッドを識別したい
->threading.Thread(name=**)
or threading.Thread().ident
indentはスレッドの識別子IDである。
明示的にスレッド名name
を付けなくてもIDを取得できる。
import threading
import time
def threadFoo():
#localに使う変数を定義できる
i = threading.local()
i=0
while i<3:
time.sleep(1)
i+=1
thread = threading.Thread(target=threadFoo,name="nameOfThread",daemon=True)
thread.start()
print(thread.ident)
print(thread.name)
thread.join()
print("---end---")
Case08
生成したスレッドとメインスレッド間でデータのやり取りをしたい
->queue.Queue()
queue.Queue()
はFIFOである。
import queue
import threading
import time
queue01 = queue.Queue() # main-> newthread
queue02 = queue.Queue() # newThread -> main
def threadFoo(q1,q2):
time.sleep(1)
localVal = threading.local()
while True:
if q1.empty():
time.sleep(0.2)
print("thread waiting..")
else:
localVal = q1.get()
print("thread q1.get():"+str(localVal))
sendVal = localVal+100
print("thread q1.put():"+str(sendVal))
q2.put(sendVal)
if __name__ == "__main__":
thread = threading.Thread(target=threadFoo, args=(queue01,queue02,),daemon = True)
thread.start()
i=0
while True:
print("main q1.put():"+str(i))
queue01.put(i)
time.sleep(1)
if queue02.empty():
pass
else:
val = queue02.get()
print("thread q2.get():"+str(val))
print("------------")
i+=1
Case09
キューを消費する際にロングポーリングさせたい
->queue.Queue().get(timeout=**)
タイムアウト時間が来てもキューがからの場合はエラーとなるため、tryで書く必要あり。
消費する側のスレッドがブロックされるが、empty()時に待たせたりする処理を書かなくて良くなる。
import queue
import threading
import time
queue = queue.Queue() # newthread -> main
def threadFoo(q):
i = 0
while True:
time.sleep(1)
q.put(i)
print("thread put:" + str(i))
i+=1
if __name__ == "__main__":
thread = threading.Thread(target=threadFoo, args=(queue,),daemon = True)
thread.start()
while True:
try:
print("main get:"+str(queue.get(timeout=3)))
except :
print("error")
以上。また事例が増えたら追加する。