2つのタスクを別のスレッドで実行し、各スレッドでキューに値を入れてEventをセットする。メインスレッドは5秒間処理を行わず、その間、サブスレッドがキューに値をいれる。q1の要素数が0じゃなければpopし、q2の要素が3以上ならpopする。
import threading
import time
from collections import deque
import random
class MyEvent():
def __init__(self) -> None:
self.event1 = threading.Event()
self.event2 = threading.Event()
class MyDeque():
def __init__(self) -> None:
self.q1 = deque([])
self.q2 = deque([])
def pop_q1(event: MyEvent, q: MyDeque):
# 要素数0じゃなければ実行
if q.q1:
print('main_thread pop_q1', q.q1.popleft())
event.event1.clear()
def pop_q2(event: MyEvent, q: MyDeque):
# 要素数3以上なら実行
if len(q.q2) >= 3:
print('main_thread pop_q2', q.q2.popleft())
event.event2.clear()
def is_wait_event(event: MyEvent):
return True if event.event1.is_set() or event.event2.is_set() else False
def main_thread(event: MyEvent, q: MyDeque):
begin = time.time()
while True:
try:
if is_wait_event(event):
pop_q1(event, q)
pop_q2(event, q)
time.sleep(0.01)
except KeyboardInterrupt as e:
print(e)
def task1(event: MyEvent, q: MyDeque):
while True:
# キューへ追加したらeventセット
time.sleep(random.random())
q.q1.append('q1_elem')
print(f'task1 append elems: {len(q.q1)}')
event.event1.set()
time.sleep(random.random())
def task2(event: MyEvent, q: MyDeque):
while True:
# キューへ追加したらeventセット
time.sleep(random.random())
q.q2.append('q2_elem')
print(f'task2 append elems: {len(q.q2)}')
event.event2.set()
time.sleep(random.random())
my_q = MyDeque()
my_event = MyEvent()
target_list = [task1, task2]
for target in target_list:
th = threading.Thread(target=target, args=(my_event, my_q,), daemon=True)
th.start()
# 5秒間スリープしてスレッドの処理を行わせる
time.sleep(5)
main_thread(my_event, my_q)
実行結果
task1 append th1 1
th1
task2 append th2 1
task1 append th1 1th1
task1 append th1 1th1
task2 append th2 2