import os
import random
import threading
import time
from multiprocessing import Process, Queue
def write_and_read(q, write_threads_count):
write_threads = [threading.Thread(target=write, args=(q,)) for _ in range(write_threads_count)]
read_thread = threading.Thread(target=read, args=(q,))
for thread in write_threads:
thread.start()
read_thread.start()
for thread in write_threads:
thread.join()
read_thread.join()
def write(q):
for value in ["A", "B", "C"]:
print(f"Put {value} to queue from {threading.current_thread().name}")
q.put(value)
time.sleep(random.random())
def read(q):
while True:
try:
value = q.get(timeout=2)
print(f"Get {value} from queue.")
except Exception:
# queueが空
break
def main():
q = Queue()
write_threads_count = 1
write_and_read_process = Process(target=write_and_read, args=(q, write_threads_count))
write_and_read_process.start()
write_and_read_process.join()
if __name__ == "__main__":
main()
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme