1
2

More than 3 years have passed since last update.

[Python3 入門 22日目]11章 並行処理とネットワーク(11.1〜11.3)

Posted at

11.1 並行処理

コンピュータで何かを待っているときは、以下の二つの理由である。

  • I/Oバウンド:I/O処理待ち
  • CPUバウンド:CPU待ち

並行処理に関する用語。

  • 同期的:葬式の行列のように順番に処理すること。
  • 非動的:タスクが独立して処理されること。

11.1.1 キュー

  • キューの要素は片方の端に追加され、反対側の端から取り出される。

11.1.2 プロセス

dishes.py

import multiprocessing as mp

def washer(dishes, output):
    for dish in dishes:
        print("Washing",dish, "dish")
        output.put(dish)

#get()はキューからアイテムを取り除きそれを返す。
#task_doneはget()の後に呼び出されると処理が完了した事をキューに伝える。
#join()はキューにある全てのアイテムが取り出されるまでブロックする。
def dryer(input):
    while True:
        dish=input.get()
        print("Drying",dish,"dish")
        input.task_done()

dish_queue=mp.JoinableQueue()
dryer_proc=mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon=True
dryer_proc.start()

dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join()

実行結果

python dishes.py
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish
Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert dish

11.1.3 スレッド

  • スレッドはプロセス内で実行され、プロセス内の全てのものにアクセスできる。
  • multiprocessingとthreadingの違いはthreadingにはterminate()関数が無いという違いがある。
threads.py

import threading

def do_this(what):
    whoami(what)

#threading.current_thread()は関数呼び出し処理のスレッドに対応するオブジェクトを生成する。
def whoami(what):
    print("Thread %s says: %s" % (threading.current_thread(),what))

#スレッドオブジェクトを生成するとスレッドのstart()メソッドを呼び出して活動開始する。
#targetはrunメソッドによって起動される呼び出し可能オブジェクト。
#argsはtargetを呼び出すときの引数タプル。
if __name__=="__main__":
    whoami("I am the main program")
    for n in range(4):
        p=threading.Thread(target=do_this, args=("I am function %s" % n,))
        p.start()

実行結果

python threads.py
Thread <_MainThread(MainThread, started 4530769344)> says: I am the main program
Thread <Thread(Thread-1, started 123145448275968)> says: I am function 0
Thread <Thread(Thread-2, started 123145453531136)> says: I am function 1
Thread <Thread(Thread-3, started 123145448275968)> says: I am function 2
Thread <Thread(Thread-4, started 123145453531136)> says: I am function 3

thread_dishes.py

import threading, dish_queue
import time

def washer(dishes, dish_queue):
    for dish in dishes:
        print("Washing",dish)
        time.sleep(5)
        dish_queue.put(dish) #itemをキューにいれる。

def dryer(dish_queue):
    while True:
        dish=dish_queue.get() #キューからアイテムを取り除き、それを返す。
        print("Drying", dish)
        time.sleep(10)
        dish_queue.task_done() #get()の後にtask_done()を呼び出し、取り出したタスクに対する処理が完了したことを示す。

dish_queue =queue.Queue() #FIFOのキューのコンストラクタ。
for n in range(2):
    dryer_thread=threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()

dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join() #キューにある全てのアイテムが取り出されて処理されるまでブロック。タスクが完了したらブロック解除。

11.1.4 グリーンスレッドとgevent

  • geventはsocketなどのPythonの標準のオブジェクトの多くを書き換え、ブロックせずに geventのメカニズムを使うようにさせる。
gevent_test.py
import gevent
from gevent import monkey
monkey.patch_socket()

hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

実行結果
python gevent_test.py
66.6.44.4
104.27.172.75
104.18.63.71

gevent_monkey.py
import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

実行結果
python gevent_monkey.py
66.6.44.4
104.27.173.75
104.18.62.71

11.1.5 twisted

  • 非同期型のイベント駆動ネットワーキングフレームワーク。データ受信や接続切断といったイベントに関数を結び付けるとそれらのイベントが発生した時に結びつけれらた関数が呼び出される。(コールバック)
knock_sever.py

from twisted.internet import protocol, reactor

class Knock(protocol.Protocol):
    def dataReceived(self, data):
        print(Client, data)
        if data.startswith("Knock, knock"):
            response = "Who is there?"
        else:
            response = data + " who?"
        print(Server, response)
        self.transport.write(response)

class KnockFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Knock()

reactor.listenTCP(8000, KnockFactory())
reactor.run()

knock_client.py
from twisted.internet import reactor, protocol

class KnockClient(protocol.Protocol):
    def connectionMade(self):
        self.transport.write("Knock, knock")

    def dataReceived(self, data):
        if data.startswith("Who is there"):
            response = "Disappearing client"
            self.transport.write(response)
        else:
            self.transport.loseConnection()
            reactor.stop()

class KnockFactory(protocol.ClientFactory):
    protocol = KnockClient

def main():
    f = KnockFactory()
    reactor.connectTCP('localhost', 8000, f)
    reactor.run()

if __name__ == '__main__':
    main()

11.1.7 Redis

  • Redisリストを使えば、キューを手っ取り早く作れる。
redis_washer.py

#"dishes"というリストををRedisサーバー内に生成。
import redis
conn=redis.Redis()
print("Washer is starting")
dishes=["salad","bread","entree","dessert"]
for dish in dishes:
    msg=dish.encode("utf-8")
    conn.rpush("dishes",msg)#rpushは新しい要素を末尾に追加している。
    print("Washed",dish)
conn.rpush("dishes","quit")
print("Washer is done")

redis_dryer.py
#"dishes"になっているメッセージを待ち、それぞれ乾燥させたことを示すメッセージを表示する。
import redis
conn=redis.Redis()
print("Dryer is starting")
while True:
    msg=conn.blpop("dishes")
    if not msg:
        break
    val=msg[1].decode("utf-8")
    if val=="quit":
        break
    print("Dried",val)
print("Dishes are dried")

実行結果
$ python redis_dryer.py &
[1] 43950

$ Dryer is starting

$ python redis_washer.py
Washer is starting
Washed salad
Dried salad
Washed bread
Dried bread
Washed entree
Dried entree
Washed dessert
Dried dessert
Washer is done
Dishes are dried
[1]+  Done                    python redis_dryer.py


redis_dryer2.py

#dryerプロセスを複数作る。
#番兵を探すのではなくdryerプロセスにタイムアウト機能追加。
def dryer():
    import redis
    import os
    import time
    conn=redis.Redis()
    pid=os.getpid()
    timeout=20
    print("Dryer process % is starting" %pid)
    while True:
        msg=conn.blpop("dishes",timeout)#リストの先頭要素を取り出し(LPOP)キーと共に返す。
        if not msg:
            break
        val=msg[1].decode("utf-8")#BLPOPは2つの要素の配列で最初の要素はキー、2番目の要素がポップされた値となる。
        #よってmsg[0]ではなくmsg[1]となる。
        if val=="quit":
            break
        print("&%s: dried %s" % (pid,val))
        time.sleep(0.1)
    print("Dryer process %s is done" %pid)

import multiprocessing
DRYERS=3
for num in range(DRYERS):
    p = multiprocessing.Process(target=dryer)
    p.start()

実行結果

python redis_dryer2.py &
[1] 44162

$ Dryer process  44179s starting
Dryer process  44178s starting
Dryer process  44180s starting
Dryer process 44180 is done
Dryer process 44178 is done
Dryer process 44179 is done

[1]+  Done                    python redis_dryer2.py


11.2 ネットワーク

11.2.1 パターン

  • 一般的なパターンは要求/応答(クライアント/サーバー)
  • パブリッシュ/サブスクライブはパブリッシャがデータを送り出し、サブスクライブバがコピーを受け取る。また、サブスクライバは特定のタイプデータ(トピック)だけを受け取りたいと指定できる。
  • トピックに対するサブスクライバがなければそのデータは無視される。

11.2.2 パブリッシュ/サブスクライブモデル

  • キューではなくブロードキャストである。

11.2.2.1 Redis

redis_pub.py
import redis
import random

conn = redis.Redis()
cats = ['siamese', 'persian', 'maine coon', 'norweigian forest']
hats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']
for msg in range(10):
    cat = random.choice(cats)
    hat = random.choice(hats)
    print('Publish: %s wears a %s' % (cat, hat))
    conn.publish(cat, hat)

redis_sub.py
import redis
conn=redis.Redis()

topics=["maine coon", "persian"]
sub=conn.pubsub()
sub.subscribe(topics)
for msg in sub.listen():
    if msg["type"]=="message":
        cat=msg["channel"]
        hat=msg["data"]
        print("Subscribe: %s wears a %s" % (cat, hat))

実行結果

$ python redis_pub.py
Publish: maine coon wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a stovepipe
Publish: siamese wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: persian wears a bowler

$ python redis_sub.py
Subscribe: b'persian' wears a b'fedora'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'maine coon' wears a b'bowler'


11.2.2.2 ZeroMQ

  • ZeroMQは中央のサーバーという存在がないので個々のパブリッシャが全てのサブスクライバに書き込みをする。
zmq_pub.py
import zmq
import random
import time
host="*"
port=6789
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind('tcp://%s:%s' % (host, port))
cats=["siamese", "persian", "maine coon", "norwegian forest"]
hats=["stovepipe", "bowler", "tam-o-shanter", "fedora"]
time.sleep(1)
#トピックと値の文字列にUTF-8を使っていることに注意する。
for msg in range(10):
    cat=random.choice(cats)
    cat_bytes=cat.encode("utf-8")
    hat=random.choice(hats)
    hat_bytes=hat.encode("utf-8")
    print("Publish: %s wears a %s" % (cat, hat))
    pub.send_multipart([cat_bytes, hat_bytes])

zmq_sub.py
import zmq
host="127.0.0.1"
port=6789
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect('tcp://%s:%s' % (host, port))
topics=["maine coon", "persian"]
for topic in topics:
    sub.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
while True:
    cat_bytes, hat_bytes=sub.recv_multipart()
    cat=cat_bytes.decode("utf-8")
    hat=hat_bytes.decode("utf-8")
    print("Subscribe: %s wears a %s" % (cat, hat))

実行結果

$ python zmq_pub.py
Publish: maine coon wears a fedora
Publish: maine coon wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: norwegian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: maine coon wears a bowler

$ python zmq_sub.py
Subscribe: maine coon wears a fedora
Subscribe: maine coon wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: maine coon wears a bowler

11.2.3 TCP/IP

  • インターネットは接続の開設、データの交換、接続の切断、タイムアウトの処理といったことをどのようにすべきかを決めた規則を基礎としている。これらをプロトコルと呼び、レイヤに分けられている。

  • UDP:短いデータの交換に使われる。

  • TCP:UDPより寿命の長い接続のために使われる。

11.2.4 ソケット

udp_server.py
from datetime import datetime
import socket

server_address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
#第一のメソッドはソケットを作り、第二のメソッドはソケットにバインドする。(そのIPアドレスとポートに届いたあらゆるデータをリスンする。)
#AF_INETはインターネットを作るという意味。
#SOCK_DGRAMはデータグラムを送受信するという意味でUDPを使うということ。
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

#recvfromはデータグラムが届くのを待っている。
data, client=server.recvfrom(max_size)

print("At", datetime.now(), client , "said", data)
server.sendto(b"Are you talking to me?", client)
server.close()

udp_client.py
import socket
from datetime import datetime

server_address=("localhost",6789)
max_size=4096

print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b"Hey!", server_address)
data, server=client.recvfrom(max_size)
print("At", datetime.now(), server, "said", data)
client.close()

実行結果

$ python udp_server.py
Starting the server at 2020-02-01 09:51:33.707462
Waiting for a client to call.
At 2020-02-01 09:52:24.053328 ('127.0.0.1', 54667) said b'Hey!'

$ python udp_client.py
Starting the client at 2020-02-01 09:52:48.897087
At 2020-02-01 09:52:48.898221 ('127.0.0.1', 6789) said b'Are you talking to me?'

tcp_client.py
from datetime import datetime
import socket

address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
#ストリーミングプロトコルであるTCPを使うためにSOCK_STREAMを使う。
client=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#ストリームをセットアップするためにconnect()を追加
client.connect(address)
#UDPサーバーが応答のためにclient.sendto()を呼び出していたのに注意。
client.sendall(b"Hey!")
data=client.recv(max_size)
print("At", datetime.now(), "someone replied", data)
client.close()

tcp_server.py
from datetime import datetime
import socket

address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
server=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(5)

client, addr=server.accept()
data=client.recv(max_size)


print("At", datetime.now(), client , "said", data)
client.sendall(b"Are you talking to me?")
server.close()
server.close()

実行結果

$ python tcp_server.py
Starting the server at 2020-02-01 10:16:53.333266
Waiting for a client to call.
At 2020-02-01 10:16:57.520042 <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 6789), raddr=('127.0.0.1', 49223)> said b'Hey!'


$ python tcp_client.py
Starting the server at 2020-02-01 10:15:25.298030
At 2020-02-01 10:15:25.301961 someone replied b''


11.2.5 ZeroMQ

  • ZeroMQはライブラリだが、強化版ソケットと呼ばれることがある。
    • メッセージの全体の交換
    • 接続の再試行
    • 送受信でタイミングが合わない時にデータを守るためのバッファリング。
zmq_server.py
import zmq

host="127.0.0.1"
port=6789
#Context()は状態を管理するZeroMQオブジェクトを生成
context=zmq.Context()
#サーバーは応答REPを出す。
server=context.socket(zmq.REP)
#サーバーが特定のIPアドレスとポードをリスンするようにしている。
server.bind("tcp://%s:%s" %(host, port))
while True:
#wait for next request from recv()
    request_bytes=server.recv()
    request_str=request_bytes.decode("utf-8")
    print("That voice in my head says: %s" %request_str)
    reply_str="Stop saying:%s" %request_str
    reply_bytes=bytes(reply_str, "utf-8")
    server.send(reply_bytes)

zmq_client.py
import zmq

host="127.0.0.1"
port=6789

context=zmq.Context()
#クライアントはサーバーに要求REQを出す。
client=context.socket(zmq.REQ)
#bind()ではなくconnect()を使う。
client.connect("tcp://%s:%s" %(host, port))
for num in range(1,6):
#wait for next request from recv()
    request_str="message #%s" % num
    request_bytes=request_str.encode("utf-8")
    client.send(request_bytes)
    reply_bytes=client.recv()
    reply_str=reply_bytes.decode("utf-8")
    print("Sent %s, received %s" % (request_str, reply_str))

実行結果

$ python zmq_client.py
That voice in my head says: message #1
Sent message #1, received Stop saying:message #1
That voice in my head says: message #2
Sent message #2, received Stop saying:message #2
That voice in my head says: message #3
Sent message #3, received Stop saying:message #3
That voice in my head says: message #4
Sent message #4, received Stop saying:message #4
That voice in my head says: message #5
Sent message #5, received Stop saying:message #5


python zmq_server.py &
[2] 47417
$ Traceback (most recent call last):
  File "zmq_server.py", line 8, in <module>
    server.bind("tcp://%s:%s" %(host, port))
  File "zmq/backend/cython/socket.pyx", line 550, in zmq.backend.cython.socket.Socket.bind
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Address already in use

11.2.7 インターネットサービス

11.2.7.1 DNS


>>> import socket
>>> socket.gethostbyname("www.crappytaxidermy.com")
`66.6.44.4`
>>> socket.gethostbyname_ex("www.crappytaxidermy.com")
(`crappytaxidermy.com`, [`www.crappytaxidermy.com`], [`66.6.44.4`])

>>> socket.getaddrinfo("www.crappytaxidermy.com",80)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_DGRAM: 2>, 17, ``, (`66.6.44.4`, 80)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]
>>> socket.getaddrinfo("www.crappytaxidermy.com",80,socket.AF_INET,
... socket.SOCK_STREAM)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]

>>> import socket
>>> socket.getservbyname("http")
80
>>> socket.getservbyport(80)
`http`


11.2.9 リモート処理

11.2.9.1 RPC

xmlrpc_server.py

from xmlrpc.server import SimpleXMLRPCServer

def double(num):
    return num*2

server=SimpleXMLRPCServer(("localhost",6666))
server.register_function(double,"double")
server.serve_forever()
xmlrpc_client.py
import xmlrpc.client

proxy=xmlrpc.client.ServerProxy("http://localhost:6666/")
num=7
result=proxy.double(num)
print("Double %s is %s" % (num, result))

実行結果

$ python xmlrpc_client.py
Double 7 is 14

$ python xmlrpc_server.py
127.0.0.1 - - [01/Feb/2020 14:54:50] "POST / HTTP/1.1" 200 -

msgpack_server.py
from msgpackrpc import Server, Address

class Services():
    def double(self, num):
        return num*2

server =Server(Services())
server.listen(Address("localhost",5555))
server.start()


msgpack_client.py
from msgpackrpc import Client,Address

client=Client(Address("localhost",5555))
num=8
result=client.call("double",num)
print("Double %s is %s" % (num, result))


実行結果
$ python msppack_client.py
Double 8 is 16

11.3 復習課題

11-1 プレーンなsocketを使って現在時サービスを実装しよう。クライアントがサーバーにtimeという文字列を送ると、ISO文字で現在の日時を返すものとする。

udp_time_server.py
import socket
from datetime import datetime

server_address=("localhost",6111)
max_size=4096

print("Starting the client at", datetime.now())
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

while True:
    data, client_addr=server.recvfrom(max_size)
    if data == b"time":
        now=str(datetime.utcnow())
        data=now.encode("utf-8")
        server.sendto(data, client_addr)
        print("Server sent", data)
server.close()


udp_time_client.py
import socket
from datetime import datetime
from time import sleep

address=("localhost",6111)
max_size=4096

print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
    sleep(5)
    client.sendto(b"time", address)
    data, server_addr=client.recvfrom(max_size)
    print("Client read", data)
client.close()

実行結果
$ python udp_time_server.py
Starting the client at 2020-02-01 17:11:51.527771
Server sent b'2020-02-01 08:11:59.365796'
Server sent b'2020-02-01 08:12:04.370495'
Server sent b'2020-02-01 08:12:09.371627'

$ python udp_time_client.py
Starting the client at 2020-02-01 17:10:03.510044
Client read b'2020-02-01 08:10:08.514726'
Client read b'2020-02-01 08:10:13.521450'
Client read b'2020-02-01 08:10:18.527667'
Client read b'2020-02-01 08:10:23.529492'
Client read b'2020-02-01 08:10:28.531994'
Client read b'2020-02-01 08:10:33.535134'
Client read b'2020-02-01 08:10:38.541067'


11-2 ZeroMQのREQ、REPソケットを使って同じことをしてみよう。

zmq_time_server.py

import zmq
from datetime import datetime

host="127.0.0.1"
port=1111

context=zmq.Context()
server=context.socket(zmq.REP)
server.bind("tcp://%s:%s" % (host, port))
print("Server started at", datetime.utcnow())

while True:
    message=server.recv()
    if message == b"time":
        now=datetime.utcnow()
        reply=str(now)
        server.send(bytes(reply,"utf-8"))
        print("Server sent", reply)
zmq_time_client.py
import zmq
from datetime import datetime
from time import sleep

host="127.0.0.1"
port=1111

context=zmq.Context()
server=context.socket(zmq.REQ)
server.bind("tcp://%s:%s" % (host, port))
print("Client started at", datetime.utcnow())

while True:
    sleep(5)
    requst=b"time"
    client.send(request)
    reply=client.recv()
    print("Client sent", reply)

実行結果

$ python zmq_time_server.py
Server started at 2020-02-01 08:27:16.448842

11-3 XMLRPCで同じことをしてみよう。

xmlrpc_time_server.py
from xmlrpc.server import SimpleXMLRPCServer
from datetime import datetime

def current_time():
    now = str(datetime.now())
    print('Server sent %s', now)
    return now

server = SimpleXMLRPCServer(("localhost", 6789))
server.register_function(current_time, "current_time")
server.serve_forever()


cmlrpc_time_client.py
import xmlrpc.client
from datetime import datetime
from time import sleep

proxy = xmlrpc.client.ServerProxy("http://localhost:6789/")
while True:
    sleep(3)
    result = proxy.current_time()
    print("Current time is %s" % result)

実行結果

$ python xmlrpc_time_server.py
Server sent %s 2020-02-01 17:44:06.341654
127.0.0.1 - - [01/Feb/2020 17:44:06] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:09.346517
127.0.0.1 - - [01/Feb/2020 17:44:09] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:12.352605
127.0.0.1 - - [01/Feb/2020 17:44:12] "POST / HTTP/1.1" 200 -

$ python cmlrpc_time_client.py
Current time is 2020-02-01 17:44:06.341654
Current time is 2020-02-01 17:44:09.346517
Current time is 2020-02-01 17:44:12.352605

11-4

redis_choc_supply.py
import redis
import random
from time import sleep

conn=redis.Redis()
varieties=["T","C","C","N"]
conveyor="Chocolates"

while True:
    seconds=random.random()
    sleep(seconds)
    piece=random.choice(varieties)
    conn.rpush(conveyor, piece)

redis_lucy.py
import redis
from datetime import datetime
from time import sleep

conn=redis.Redis()
timeout=10
conveyor="Chocolates"
while True:
    sleep(0.5)
    msg=conn.blpop(conveyor, timeout)
    remaining=conn.llen(conveyor)
    if msg:
        piece=msg[1]
        print("Lucy got a", piece, "at", datetime.utcnow(),
        ", only", remaining, "left")

実行結果
$ python redis_lucy.py
Lucy got a b'T' at 2020-02-01 09:05:54.780153 , only 116 left
Lucy got a b'N' at 2020-02-01 09:05:55.282109 , only 117 left
Lucy got a b'T' at 2020-02-01 09:05:55.783487 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:56.284971 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:56.787798 , only 118 left
Lucy got a b'T' at 2020-02-01 09:05:57.289434 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:57.794357 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:58.295897 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:58.800536 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.303087 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.805465 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.308003 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.810408 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.312918 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.818497 , only 119 left
Lucy got a b'N' at 2020-02-01 09:06:02.324028 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:02.826697 , only 119 left
Lucy got a b'T' at 2020-02-01 09:06:03.329229 , only 120 left
Lucy got a b'T' at 2020-02-01 09:06:03.835205 , only 120 left


11-5 ZeroMQを使って7.3節の単語を一つずつパブリッシュしよう。また、母音で始まる単語を表示するZeroMQサブスクライバと、5字の単語を表示する別のサブスクライバを書こう。

poem_pub.py
import string
import zmq
from time import sleep

host="127.0.0.1"
port=9999
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind("tcp://%s:%s" % (host, port))
sleep(1)

with open("mammonth.txt","rt") as poem:
    words=poem.read()
for word in words.split():
    word=word.strip(string.punctuation)
    data=word.encode("utf-8")
    if word.startswith(("a", "e", "i", "u","o","A","E","I","U","O")):
        print("vowels",data)
        pub.send_multipart([b"vowels", data])
    if len(word) ==5:
        print("five",data)
        pub.send_multipart([b"five", data])

poem_sub.py
import string
import zmq

host="127.0.0.1"
port=9999
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect("tcp://%s:%s" % (host,port))
sub.setsockopt(zmq.SUBSCRIBE, b"vowels")
sub.setsockopt(zmq.SUBSCRIBE, b"five")

while True:
    topic, word=sub.recv_multipart()
    print(topic,word)

実行結果
#最初の数行
$ python poem_pub.py
five b'queen'
vowels b'of'
five b'Lying'
vowels b'at'
vowels b'ease'
vowels b'evening'
five b'flies'


$ python poem_sub.py
b'five' b'queen'
b'vowels' b'of'
b'five' b'Lying'
b'vowels' b'at'
b'vowels' b'ease'
b'vowels' b'evening'
b'five' b'flies'
b'five' b'seize'

感想

かなり雑な復習となった。ボリューミーだったが、最近では自らサーバ立てなくてもAWSやOpen Stack等のクラウドを使うのかな。並列処理やmultiprocessingの理解が曖昧なので使いながら覚えるしかないのかな。

参考文献

「Bill Lubanovic著 『入門 Python3』(オライリージャパン発行)」

「queue --- 同期キュークラス」
https://docs.python.org/ja/3/library/queue.html#queue.Queue.task_done

「threading --- スレッドベースの並列処理」
https://docs.python.org/ja/3/library/threading.html

「multiprocessing --- プロセスベースの並列処理¶」
https://docs.python.org/ja/3/library/multiprocessing.html#pipes-and-queues

「Redis」
http://redis.shibu.jp/commandreference/lists.html

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2