Redis+Pythonでpub/subを試す
はじめに
Redisに触れる事があり、KVSだと思っていたRedisにpub/sub機能があることを知りました。
せっかくなのでpub/subの挙動を試してみました。
ついでにpythonの並列動作も確認がてら実装してみました。
言語はPython3、redis.pyを使用しています。
サンプルはこちら
https://github.com/ryoutoku/redis-sample
今回作成した環境は以下です。
- Vagrantを使用してVirtualBox(ゲスト側)にUbuntu14.04+Redisサーバを起動
- Redis内のデータに対して変更があった場合、そのデータのkeyをchannelにしてkeyのデータをpubで通知するpub側スクリプト
- Redisに購読(sub)を登録し、Redisサーバにデータをpushするsub側スクリプト
3.でpushしたデータを、2.で検知し通知、3.で受ける、という感じです。
今回はpub/subのスクリプトを全てホスト側で行っていますが、ゲスト側で行っても問題はありません。
必要環境
- Vagrant
- Python3
- redis-py
使用するRedisの機能について
私自身Redisについて詳しくないので今回の記事で使用する機能を以下にざっくり説明します。
- Key Value Store(KVS)
- いわゆるNoSQLというやつで、keyに対してValueを紐付けてデータを管理する
- Pythonでいうdictのようなもの
- 今回は
test
というkeyにlistのデータとして追加する
- pub/sub
- メッセージ通知のモデル
- Publish(通知)とSubscribe(購読)という2つの役割がある:参考
- channelという概念があり、channelを決めて購読、通知を行う事で、そのchannelに対してのみデータ通知などが行える
コード解説
以下のコードのキモ(という私が実装で時間のかかった所)をそれぞれ説明します。
- Vagrantfile
- Publisher.py
- Subscriber.py
Vagrantfile
Vagrantfileでは主に以下の事を行っています。
- ゲスト側のIPを固定(192.168.56.101)
- Redisの設定ファイル
- Redisの設定ファイルの配置
Vagrant.configure("2") do |config|
# ホストオンリーアダプタをipアドレス固定で追加
config.vm.network "private_network", ip: "192.168.56.101"
# 中略 ##########################################
# redisの設定ファイルのコピー
config.vm.provision "file", source: "./conf/redis.conf", destination: "/home/vagrant/redis.conf"
config.vm.provision "shell", inline: <<-SHELL
apt-get update
# pip, redis.pyインストール
apt-get install python-pip -y
pip install redis
# redisインストール
sudo apt-get install redis-server -y
# redis設定ファイルを移動
mv -f /home/vagrant/redis.conf /etc/redis/redis.conf
# redisの起動
redis-server &
SHELL
end
キモになるところは以下です。
- redisはデフォルトではローカルIP(127.0.0.)しか接続を受け付けないため、設定ファイルを上書きする必要がある
- 直接上書きできないので、/home/vagrant以下にredis.confをコピー
- コピーしたredis.confを/etc/redisに移動した後redisを起動
pub(publisher)
pubの動作としては、以下のような感じです。
-
while
- redisの
key
を全て取得し、key
に対して -
key
にdata
が追加されていればkey
=channel
としてdata
を通知 -
data
の最終がend
の場合終了
- redisの
publisherのコア部分を以下に示します。
class Publisher(object):
def send_message(self):
while True:
keys = self._connection.keys()
is_break = False
for key in keys:
old_data = self._data.setdefault(key, [])
data = self._connection.lrange(key, 0, -1)
if len(old_data) == len(data):
continue
key_str = key.decode('utf-8')
data_str = [x.decode('utf-8') for x in data]
print("publish:", data_str)
self._connection.publish(key_str, data_str)
self._data[key] = data
if data_str[-1] == "end":
self._connection.rpop(key)
is_break = True
if is_break:
break
キモになるところは以下です。
- redisのkey,dataはbyte型であるため、decodeする必要がある
sub(subscriber)
subでは次の動作をする様になっています。
- 別プロセスを起動し、
channel='test'
としてデータ購読 -
while
- 標準入力から、文字列を取得
- 取得した文字列を
key=test
としてredisにデータを追加 - 取得した文字列が
end
の場合は終了
subscriberのコア部分を以下に示します。
class SubscriberSubject(object):
_re_format = "\'(.*?)\'"
# 中略 ##########################################
def _receive_core(self, channel, end_word):
pubsub = self._connection.pubsub()
pubsub.subscribe([channel])
for data in pubsub.listen():
[x(data) for x in self._callbacks]
if isinstance(data['data'], bytes):
data_str = self._re.findall(data['data'].decode('utf-8'))
if data_str[-1] == end_word:
break
pubsub.unsubscribe()
def start_receive(self, channel, end_word='end'):
if self._job:
return
self._job = Process(target=self._receive_core,
args=(channel, end_word))
self._job.start()
def end_receive(self):
self._job.join()
def add_data(self, key, value):
self._connection.rpush(key, value)
if __name__ == "__main__":
# 中略 ##########################################
# sub
subject = SubscriberSubject(host, port)
subject.add_callback(
lambda x: print("callback:", xt)
)
subject.start_receive(channel)
# 中略 ##########################################
# 標準入力部
while(True):
data = sys.stdin.readline().strip()
subject.add_data(channel, data)
if data == "end":
subject.end_receive()
break
キモになるところは以下です。
- 1度目の
pubsub.listen()
では、取得したdictにdata:1
が設定されている - 2度目以降の
pubsub.listen()
では、取得したdictにdata:b[(データ)]
が設定されている- bytes型なのでdecodeした後、正規表現(
_re_format
)でstr型として分割
- bytes型なのでdecodeした後、正規表現(
動作
ターミナルを2つ起動しそれぞれを実行します
- python publisher.py
- redisのデータを確認して、変更があれば通知
- python subscriber.py
- 標準入力で入力されたデータをredisに追加
- publisherからデータを購読
動作はこんな感じ。
ターミナルの上がpublisher.py、下がsubscriber.pyです。
- 下で入力した値をredisに登録
- 上でredis内のデータをpublish(データ表示)
- 下で受け取ったデータを表示