0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PHPでRedis Streams操作のメモ

Posted at

PHPでStream Consumingしたい、ということでRedis Streamsを使って試してみたという記録です。Swooleを使ってサーバー1台でスケールするのかも試してみました。
関連ドキュメントをざっと読んで試してみた程度の内容なので間違っている箇所などあるかもしれません。

Redis Streams

Redis5で実装されたStreaming API. Kafkaインスパイアな作りになっている。

PHPからRedis Streamsの使用

phpredisの4.2でサポートされた模様。以前は、導入が楽だという理由でpredisを使うことが多かったけど、predisではサポートされていない。predisは完全に開発が止まってそう。

Redis Server

試すだけならDockerでいいですね。

docker run -d --rm -p 6379:6379 redis:5.0.9

一応Streams APIが動くか確認したかったのでredis-cliなど使って

$ redis-cli
> XADD mystream * key value
> XREAD COUNT 1 STREAMS mystream 0-0
1) 1) "mystream"
   2) 1) 1) "1596783209134-0"
         2) 1) "key"
            2) "value"

みたいな結果になることを確認する。コマンドが複雑で手で打つと(私は)だいたい何回かエラーになるのでコピペする方が楽だと思う。
XADD, XREADのコマンドの詳細はredisのサイトを見てください。

Redis StreamsのConsumerにはメッセージングシステムは無く、Pub/SubのようにSubscribeしておけばProduceされたことを通知される、という挙動にはならない。

Consumer

ざっくりこんな感じのコードで動いた。

<?php

$redis = new Redis();
$redis->connect('redis-server', 6379);

$consumer = new Consumer($redis);

while (true) {
    $values = $consumer->consume();
    foreach ($values[$streamName] ?? [] as $consumed) {
        foreach ($consumed as $key => $value) {
            print_r($value);
        }
    }
}

class Consumer {
    private $redis;
    public function __construct(Redis $redis) {
        $this->redis = $redis;
        $this->createGroupIfNotExists();
    }

    private function createGroupIfNotExists() {
        // XREADではなくXREADGROUPを使うため、事前にConsumerGroupが存在することを保証する
        // わざわざxInfoで確認してるけど、XGROUPにはGROUPがなかったときに作成する機能もあり
        // ライブラリもサポートしているので素直にxGroupのMKSTREAMオプションを有効にする方が良いでしょう
        $groupInfo = $this->redis->xInfo('GROUP', 'mystream', 'mygroup');
        if (!$groupInfo) {
            $this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>');
        }
        // これと等価
        // '>' の意味は概要を後述しますが、Redis StreamsのIntroductionを読むと良いです。
        // $this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>', $mkStream = true);
    }

    public function consume() {
        return $this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);
    }
}

なぜXREADではなくXREADGROUPを使うかというと、スケーラビリティのためと、単純にどこまでのIDを読み取ったか管理するのがめんどくさいから。XREADにはRedis側にどこのキーまで読んだか、というオフセットを記録するような仕組みが無いようなので、Consumer単位で自動的に未取得部分のレコードを取得したい場合はXREADGROUPを使用する必要がありそう。

$this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);

consumeメソッドのこの行の意味は
Consumer Group group1 の Consumer consumer1 で、mystreamというキーのストリームから、group1で未取得のレコードを最大10件取得する。すべてのレコードを取得済みの場合、レコードが追加されることを最大2,000msec待つ。
という挙動になる。このうち、 未取得のレコードを最大10件取得する を実現するために ['mystream' => '>'] と指定をする。

スケーラビリティ

Consumerがスケールするのかを見てみる。

KafkaだとConsumerをスケールさせるためにはTopicにパーティションを設定し、Consumer GroupにConsumerを追加したり、Fan Outさせたりするのだと思う(実運用したことが無いので自信は無い)。

一方Redis Streamsにはこういうパーティションの概念は無さそう。しかし、XREADGROUP'>' はGROUPに対しての値なので、GROUPにConsumerを追加してXREADGROUPを叩けば自動的にそれぞれのConsumerで取得できる。リバランスも気にする必要は無い。
サーバー台数(プロセス数)が増えてBLOCKのリクエストが溜まったときのレイテンシーどうなんだろう。READリクエストがキューに積まれて順番に配信されるのかな。特定のConsumerが特定のパーティションに紐づくわけでもないのでホットスポットも考えなくて良い、となるのかな。

とりあえずは単純にサーバー台数(プロセス数)を増やせばスケールできるのだと思うが、別プロセスで動かすとなるとConsumerの名前決めがめんどくさそう。名前はGROUP内でユニークにしろという制約があるので、マジメにやると大変そう。同じ名前にしても動きはするので、consumerの状態を見るとか、メトリクスを管理するとか運用上の理由が大きいのかな。

Swoole

Redis Streamsにはいわゆるメッセージングシステムが無く、BLOCKを使いながら同期的に処理できるのでPlain-PHPと相性がいいんじゃないか?と思うけど、Swooleを使って単一プロセスでスケールできるか試してみる。

方針として、XREADGROUPは単一プロセスで十分なスループットだという前提の上で、値を取得したらConsumerのワーカー部分を並行で稼働できようにしている。XREADGROUPが追いつかないようになったらRedisへのリクエストも多重化すれば良さそう。

コードはこんな感じになった。

<?php

$messageBufferChannel = new Co\Channel(3);

// Worker
Co\run(function () use ($messageBufferChannel) {
    // Worker内の処理を多重度を制限するためのChannel
    $concurrencyCapChannel = new Co\Channel(5);

    while (true) {
        $values = $messageBufferChannel->pop();
        if ($values) {
            $concurrencyCapChannel->push(true);
            go(function() use ($values, $concurrencyCapChannel) {
                echo "got values " . count($values['mystream']) . "\n";
                $concurrencyCapChannel->pop();
            });
        }
    }
});

// Consumer
Co\run(function () use ($messageBufferChannel) {
    $redis = new Redis();
    $redis->connect('redis-server', 6379);

    while (true) {
        $values = $redis->xReadGroup('group1', 'consumer10', ['mystream' => '>'], $count = 10, $block = 2000);
        if ($values) {
            $messageBufferChannel->push($values);
        }
    }
});

Goもしばらく書いてなかったのでだいぶ難航した。こんな感じで良かったんだったかな…

ちなみにSwooleをDockerで動かすには https://www.swoole.co.uk/docs/get-started/try-docker を参考にすれば良いのだけど、ここで紹介されているイメージにはphpredisが入っていないので

RUN pecl install redis && docker-php-ext-enable redis

をDockerfileに適当に追加してビルドしてる。

試しに書いてはみたものの、Swooleについての知識が薄くて実際にプロダクションで使うにはもっと調べないと不安で使えないな。

関連ドキュメント

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?