10
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pythonにおけるthreading実践

Posted at

はじめに

最近、Pythonで並列処理させるthreadingを使う機会があった。
ハードのI/O待ちが発生する組み込み開発において活躍してくれた。
この機会にthreadingを完全に理解したという状態にしておきたい。
自分がよく使いそうなケース別にサンプルを書いてみた。

正確な理解には公式ドキュメントを。

今回のサンプルは、GitHubに全部まとめてUPしている。
記事の章立てと番号は合わせている。

事例

Case01

メインスレッドの終了と同時に、生成したスレッドを終了させたい
threading.Thread(daemon=True)

python main.py-> ctrl+Cしても終了できないゾンビスレッドの生成を抑制できる。
ちゃんとスレッドの終了をコントロールしたい場合はthreading.Event()が公式的に推奨。

import threading
import time


def threadFoo():
    while True:
        print("foo")
        time.sleep(1)

if __name__ == "__main__":

    #daemonをTrueにすると、メインスレッドと同時に終了できる
    thread = threading.Thread(target=threadFoo, name='thread1',daemon = True)
    thread.start()
    time.sleep(5)
    print("---end---")

Case02

生成したスレッドの処理が終わるまでメインスレッドをブロックしたい
->threading.Thread.join()を使う

thread処理が終了するまでmainスレッドの処理をブロックできる。
メインスレッドと生成したスレッドで同期的な処理が書ける。

import threading
import time


def threadFoo():
    for i in range(5):
        print("foo" + str(i))
        time.sleep(1)

if __name__ == "__main__":
    thread = threading.Thread(target=threadFoo, name='thread1',daemon=True)
    thread.start()

    thread.join()
    print("---end---")
foo0
foo1
foo2
foo3
foo4
---end---

Case03

生成したスレッドの処理が終わるまでメインスレッドをブロックしたいが、
メインスレッドのブロックに時間制約を設けたい。
->threading.Thread.join(timeout=**)

生成スレッドの処理が永遠に終わらなく、メインスレッドがデッドロックになってしまう状態を防げる。

import threading
import time


def threadFoo():
    for i in range(5):
        print("foo" + str(i))
        time.sleep(1)

if __name__ == "__main__":
    thread = threading.Thread(target=threadFoo, name='thread1',daemon=True)
    thread.start()

    thread.join(timeout=2)
    print("---end---")

    time.sleep(5)
foo0
foo1
foo2
---end---
foo3
foo4

Case04

生成したスレッドをメインスレッドから途中で止めたい
->threading.Event()

import threading
import time

event = threading.Event()

def threadFoo():
    while True:
        print("thread start")
        time.sleep(0.2)
        #eventのフラグが立つまで待ち受ける(defalut:False)
        event.wait()
        print("thread end")

thread = threading.Thread(target=threadFoo,daemon=True)
thread.start()

ii = 0

print("-------------------defalut-----------------------")
while True:
    for i in range(5):
        time.sleep(0.3)
        print("main thread" + str(i))

    if ii%2 == 0:
        #set()によって待ち受け状態が解放される
        print("-------------------event.set()-----------------------")
        event.set()
    else:
        #clear()でフラグを落とす
        print("-------------------event.clear()-----------------------")

        event.clear()

    ii += 1

-------------------defalut-----------------------
main thread0
main thread1
main thread2
main thread3
main thread4
-------------------event.set()-----------------------
thread end
thread start
thread end
thread start
main thread0
thread end
thread start
main thread1
thread end
thread start
thread end
thread start
main thread2
thread end
thread start
main thread3
thread end
thread start
thread end
thread start
main thread4
-------------------event.clear()-----------------------
main thread0
main thread1
main thread2
main thread3
main thread4

Case05

生成したスレッドをメインスレッドから途中で止めたいが、
生成スレッドのブロックに時間制約を付けたい
->threading.Event().wait(timeout=**)

import threading
import time

event = threading.Event()

def threadFoo():
    while True:
        print("thread start")
        time.sleep(0.1)
        #timeoutを使うことでデッドロックを解放できる
        event.wait(timeout=3)
        print("thread end")

thread = threading.Thread(target=threadFoo,daemon=True)
thread.start()

while True:
    for i in range(5):
        time.sleep(0.3)
        print("main thread" + str(i))

Case06

生成した複数のスレッドが行う処理の競合を防ぐ
->threading.Lock().acquire()

bad example:排他にならない競合処理

import threading
import time

b = 0


def threadFoo(addVal):
    global b
    localb = b
    time.sleep(3)
    localb += addVal
    b = localb
    print("val in thread: " + str(b))

thread1 = threading.Thread(target=threadFoo,args=(1,),daemon=True)
time.sleep(1)
thread2 = threading.Thread(target=threadFoo,args=(2,),daemon=True)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("final val: " + str(b))

good example:threading.Lock()を活用して排他にする

import threading
import time

lock = threading.Lock()
b = 0

def threadFoo(lock, addVal):
    global b
    with lock:
        localb = b
        time.sleep(3)
        localb += addVal
        b = localb
        print("val in thread: " + str(b))

thread1 = threading.Thread(target=threadFoo,args=(lock, 1,),daemon=True)
time.sleep(1)
thread2 = threading.Thread(target=threadFoo,args=(lock, 2,),daemon=True)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("final val: " + str(b))

Case07

生成したスレッドを識別したい
->threading.Thread(name=**) or threading.Thread().ident

indentはスレッドの識別子IDである。
明示的にスレッド名nameを付けなくてもIDを取得できる。

import threading
import time


def threadFoo():
    #localに使う変数を定義できる
    i = threading.local()
    i=0
    while i<3:
        time.sleep(1)        
        i+=1

thread = threading.Thread(target=threadFoo,name="nameOfThread",daemon=True)
thread.start()

print(thread.ident)
print(thread.name)

thread.join()

print("---end---")

Case08

生成したスレッドとメインスレッド間でデータのやり取りをしたい
->queue.Queue()

queue.Queue()はFIFOである。

import queue
import threading
import time

queue01 = queue.Queue() # main-> newthread
queue02 = queue.Queue() # newThread -> main

def threadFoo(q1,q2):
    time.sleep(1)
    localVal = threading.local()
    while True:
        if q1.empty():
            time.sleep(0.2)
            print("thread waiting..")
        else:
            localVal = q1.get()
            print("thread q1.get():"+str(localVal))
            
            sendVal = localVal+100
            print("thread q1.put():"+str(sendVal))
            q2.put(sendVal)


if __name__ == "__main__":
    thread = threading.Thread(target=threadFoo, args=(queue01,queue02,),daemon = True)
    thread.start()

    i=0
    while True:
        print("main q1.put():"+str(i))
        queue01.put(i)

        time.sleep(1)
        if queue02.empty():
            pass
        else:
            val = queue02.get()
            print("thread q2.get():"+str(val))
            print("------------")

        i+=1

Case09

キューを消費する際にロングポーリングさせたい
->queue.Queue().get(timeout=**)

タイムアウト時間が来てもキューがからの場合はエラーとなるため、tryで書く必要あり。
消費する側のスレッドがブロックされるが、empty()時に待たせたりする処理を書かなくて良くなる。

import queue
import threading
import time

queue = queue.Queue() # newthread -> main

def threadFoo(q):
    i = 0
    while True:
        time.sleep(1)
        q.put(i)
        print("thread put:" + str(i))
        i+=1

if __name__ == "__main__":
    thread = threading.Thread(target=threadFoo, args=(queue,),daemon = True)
    thread.start()

    while True:
        try:
            print("main get:"+str(queue.get(timeout=3)))
        except :
            print("error")

以上。また事例が増えたら追加する。

10
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?