Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

Redis+Pythonでpub/subを試す

More than 1 year has passed since last update.

Redis+Pythonでpub/subを試す

はじめに

Redisに触れる事があり、KVSだと思っていたRedisにpub/sub機能があることを知りました。
せっかくなのでpub/subの挙動を試してみました。
ついでにpythonの並列動作も確認がてら実装してみました。
言語はPython3、redis.pyを使用しています。

サンプルはこちら
https://github.com/ryoutoku/redis-sample

今回作成した環境は以下です。

  1. Vagrantを使用してVirtualBox(ゲスト側)にUbuntu14.04+Redisサーバを起動
  2. Redis内のデータに対して変更があった場合、そのデータのkeyをchannelにしてkeyのデータをpubで通知するpub側スクリプト
  3. Redisに購読(sub)を登録し、Redisサーバにデータをpushするsub側スクリプト

3.でpushしたデータを、2.で検知し通知、3.で受ける、という感じです。

イメージ的にはこんな感じ
archtecture.png

今回はpub/subのスクリプトを全てホスト側で行っていますが、ゲスト側で行っても問題はありません。

必要環境

使用するRedisの機能について

私自身Redisについて詳しくないので今回の記事で使用する機能を以下にざっくり説明します。

  • Key Value Store(KVS)
    • いわゆるNoSQLというやつで、keyに対してValueを紐付けてデータを管理する
    • Pythonでいうdictのようなもの
    • 今回はtestというkeyにlistのデータとして追加する
  • pub/sub
    • メッセージ通知のモデル
    • Publish(通知)とSubscribe(購読)という2つの役割がある:参考
    • channelという概念があり、channelを決めて購読、通知を行う事で、そのchannelに対してのみデータ通知などが行える

コード解説

以下のコードのキモ(という私が実装で時間のかかった所)をそれぞれ説明します。

  • Vagrantfile
  • Publisher.py
  • Subscriber.py

Vagrantfile

Vagrantfileでは主に以下の事を行っています。

  1. ゲスト側のIPを固定(192.168.56.101)
  2. Redisの設定ファイル
  3. Redisの設定ファイルの配置
Vagrantfile
Vagrant.configure("2") do |config|

  # ホストオンリーアダプタをipアドレス固定で追加
  config.vm.network "private_network", ip: "192.168.56.101"

 # 中略 ##########################################

  # redisの設定ファイルのコピー
  config.vm.provision "file", source: "./conf/redis.conf", destination: "/home/vagrant/redis.conf"

  config.vm.provision "shell", inline: <<-SHELL
    apt-get update

    # pip, redis.pyインストール
    apt-get install python-pip -y
    pip install redis

    # redisインストール
    sudo apt-get install redis-server -y

    # redis設定ファイルを移動
    mv -f /home/vagrant/redis.conf /etc/redis/redis.conf

    # redisの起動
    redis-server &
  SHELL
end

キモになるところは以下です。

  • redisはデフォルトではローカルIP(127.0.0.)しか接続を受け付けないため、設定ファイルを上書きする必要がある
    • 直接上書きできないので、/home/vagrant以下にredis.confをコピー
    • コピーしたredis.confを/etc/redisに移動した後redisを起動

pub(publisher)

pubの動作としては、以下のような感じです。

  1. while
    1. redisのkeyを全て取得し、keyに対して
    2. keydataが追加されていればkey=channelとしてdataを通知
    3. dataの最終がendの場合終了

publisherのコア部分を以下に示します。

publisher.py
class Publisher(object):
    def send_message(self):

        while True:
            keys = self._connection.keys()

            is_break = False
            for key in keys:
                old_data = self._data.setdefault(key, [])
                data = self._connection.lrange(key, 0, -1)

                if len(old_data) == len(data):
                    continue

                key_str = key.decode('utf-8')
                data_str = [x.decode('utf-8') for x in data]

                print("publish:", data_str)

                self._connection.publish(key_str, data_str)
                self._data[key] = data
                if data_str[-1] == "end":
                    self._connection.rpop(key)
                    is_break = True
            if is_break:
                break

キモになるところは以下です。
* redisのkey,dataはbyte型であるため、decodeする必要がある

sub(subscriber)

subでは次の動作をする様になっています。

  1. 別プロセスを起動し、channel='test'としてデータ購読
  2. while
    1. 標準入力から、文字列を取得
    2. 取得した文字列をkey=testとしてredisにデータを追加
    3. 取得した文字列がendの場合は終了

subscriberのコア部分を以下に示します。

subscriber.py
class SubscriberSubject(object):
    _re_format = "\'(.*?)\'"

   # 中略 ##########################################

    def _receive_core(self, channel, end_word):
        pubsub = self._connection.pubsub()
        pubsub.subscribe([channel])
        for data in pubsub.listen():
            [x(data) for x in self._callbacks]
            if isinstance(data['data'], bytes):
                data_str = self._re.findall(data['data'].decode('utf-8'))
                if data_str[-1] == end_word:
                    break
        pubsub.unsubscribe()

    def start_receive(self, channel, end_word='end'):
        if self._job:
            return
        self._job = Process(target=self._receive_core,
                            args=(channel, end_word))
        self._job.start()

    def end_receive(self):
        self._job.join()

    def add_data(self, key, value):
        self._connection.rpush(key, value)

if __name__ == "__main__":

    # 中略 ##########################################
    # sub
    subject = SubscriberSubject(host, port)
    subject.add_callback(
        lambda x: print("callback:", xt)
    )
    subject.start_receive(channel)

    # 中略 ##########################################
    # 標準入力部
    while(True):
        data = sys.stdin.readline().strip()
        subject.add_data(channel, data)
        if data == "end":
            subject.end_receive()
            break

キモになるところは以下です。
* 1度目のpubsub.listen()では、取得したdictにdata:1が設定されている
* 2度目以降のpubsub.listen()では、取得したdictにdata:b[(データ)]が設定されている
* bytes型なのでdecodeした後、正規表現(_re_format)でstr型として分割

動作

ターミナルを2つ起動しそれぞれを実行します
* python publisher.py
* redisのデータを確認して、変更があれば通知
* python subscriber.py
* 標準入力で入力されたデータをredisに追加
* publisherからデータを購読

動作はこんな感じ。
ターミナルの上がpublisher.py、下がsubscriber.pyです。

qqded-bbfis.gif

  1. 下で入力した値をredisに登録
  2. 上でredis内のデータをpublish(データ表示)
  3. 下で受け取ったデータを表示
ryoutoku
なんだかんだフルスタックエンジニアっぽくなっているエンジニア。 浅く広くで知識と技術が器用貧乏になりがちである。 ソフトウェアアーキとか設計とか考えるのが好き。 フリーランスですが、MPSという団体でプログラム作ったりなんだりもしてます。 https://www.mpsamurai.org/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away