PHPでStream Consumingしたい、ということでRedis Streamsを使って試してみたという記録です。Swooleを使ってサーバー1台でスケールするのかも試してみました。
関連ドキュメントをざっと読んで試してみた程度の内容なので間違っている箇所などあるかもしれません。
Redis Streams
Redis5で実装されたStreaming API. Kafkaインスパイアな作りになっている。
PHPからRedis Streamsの使用
phpredisの4.2でサポートされた模様。以前は、導入が楽だという理由でpredisを使うことが多かったけど、predisではサポートされていない。predisは完全に開発が止まってそう。
Redis Server
試すだけならDockerでいいですね。
docker run -d --rm -p 6379:6379 redis:5.0.9
一応Streams APIが動くか確認したかったのでredis-cli
など使って
$ redis-cli
> XADD mystream * key value
> XREAD COUNT 1 STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1596783209134-0"
2) 1) "key"
2) "value"
みたいな結果になることを確認する。コマンドが複雑で手で打つと(私は)だいたい何回かエラーになるのでコピペする方が楽だと思う。
XADD, XREADのコマンドの詳細はredisのサイトを見てください。
Redis StreamsのConsumerにはメッセージングシステムは無く、Pub/SubのようにSubscribeしておけばProduceされたことを通知される、という挙動にはならない。
Consumer
ざっくりこんな感じのコードで動いた。
<?php
$redis = new Redis();
$redis->connect('redis-server', 6379);
$consumer = new Consumer($redis);
while (true) {
$values = $consumer->consume();
foreach ($values[$streamName] ?? [] as $consumed) {
foreach ($consumed as $key => $value) {
print_r($value);
}
}
}
class Consumer {
private $redis;
public function __construct(Redis $redis) {
$this->redis = $redis;
$this->createGroupIfNotExists();
}
private function createGroupIfNotExists() {
// XREADではなくXREADGROUPを使うため、事前にConsumerGroupが存在することを保証する
// わざわざxInfoで確認してるけど、XGROUPにはGROUPがなかったときに作成する機能もあり
// ライブラリもサポートしているので素直にxGroupのMKSTREAMオプションを有効にする方が良いでしょう
$groupInfo = $this->redis->xInfo('GROUP', 'mystream', 'mygroup');
if (!$groupInfo) {
$this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>');
}
// これと等価
// '>' の意味は概要を後述しますが、Redis StreamsのIntroductionを読むと良いです。
// $this->redis->xGroup('CREATE', 'mystream', 'mygroup', '>', $mkStream = true);
}
public function consume() {
return $this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);
}
}
なぜXREAD
ではなくXREADGROUP
を使うかというと、スケーラビリティのためと、単純にどこまでのIDを読み取ったか管理するのがめんどくさいから。XREAD
にはRedis側にどこのキーまで読んだか、というオフセットを記録するような仕組みが無いようなので、Consumer単位で自動的に未取得部分のレコードを取得したい場合はXREADGROUP
を使用する必要がありそう。
$this->redis->xReadGroup('group1', 'consumer1', ['mystream' => '>'], $count = 10, $block = 2000);
consume
メソッドのこの行の意味は
Consumer Group group1
の Consumer consumer1
で、mystream
というキーのストリームから、group1で未取得のレコードを最大10件取得する。すべてのレコードを取得済みの場合、レコードが追加されることを最大2,000msec待つ。
という挙動になる。このうち、 未取得のレコードを最大10件取得する
を実現するために ['mystream' => '>']
と指定をする。
スケーラビリティ
Consumerがスケールするのかを見てみる。
KafkaだとConsumerをスケールさせるためにはTopicにパーティションを設定し、Consumer GroupにConsumerを追加したり、Fan Outさせたりするのだと思う(実運用したことが無いので自信は無い)。
一方Redis Streamsにはこういうパーティションの概念は無さそう。しかし、XREADGROUP
の '>'
はGROUPに対しての値なので、GROUPにConsumerを追加してXREADGROUP
を叩けば自動的にそれぞれのConsumerで取得できる。リバランスも気にする必要は無い。
サーバー台数(プロセス数)が増えてBLOCKのリクエストが溜まったときのレイテンシーどうなんだろう。READリクエストがキューに積まれて順番に配信されるのかな。特定のConsumerが特定のパーティションに紐づくわけでもないのでホットスポットも考えなくて良い、となるのかな。
とりあえずは単純にサーバー台数(プロセス数)を増やせばスケールできるのだと思うが、別プロセスで動かすとなるとConsumerの名前決めがめんどくさそう。名前はGROUP内でユニークにしろという制約があるので、マジメにやると大変そう。同じ名前にしても動きはするので、consumerの状態を見るとか、メトリクスを管理するとか運用上の理由が大きいのかな。
Swoole
Redis Streamsにはいわゆるメッセージングシステムが無く、BLOCKを使いながら同期的に処理できるのでPlain-PHPと相性がいいんじゃないか?と思うけど、Swooleを使って単一プロセスでスケールできるか試してみる。
方針として、XREADGROUP
は単一プロセスで十分なスループットだという前提の上で、値を取得したらConsumerのワーカー部分を並行で稼働できようにしている。XREADGROUP
が追いつかないようになったらRedisへのリクエストも多重化すれば良さそう。
コードはこんな感じになった。
<?php
$messageBufferChannel = new Co\Channel(3);
// Worker
Co\run(function () use ($messageBufferChannel) {
// Worker内の処理を多重度を制限するためのChannel
$concurrencyCapChannel = new Co\Channel(5);
while (true) {
$values = $messageBufferChannel->pop();
if ($values) {
$concurrencyCapChannel->push(true);
go(function() use ($values, $concurrencyCapChannel) {
echo "got values " . count($values['mystream']) . "\n";
$concurrencyCapChannel->pop();
});
}
}
});
// Consumer
Co\run(function () use ($messageBufferChannel) {
$redis = new Redis();
$redis->connect('redis-server', 6379);
while (true) {
$values = $redis->xReadGroup('group1', 'consumer10', ['mystream' => '>'], $count = 10, $block = 2000);
if ($values) {
$messageBufferChannel->push($values);
}
}
});
Goもしばらく書いてなかったのでだいぶ難航した。こんな感じで良かったんだったかな…
ちなみにSwooleをDockerで動かすには https://www.swoole.co.uk/docs/get-started/try-docker を参考にすれば良いのだけど、ここで紹介されているイメージにはphpredisが入っていないので
RUN pecl install redis && docker-php-ext-enable redis
をDockerfileに適当に追加してビルドしてる。
試しに書いてはみたものの、Swooleについての知識が薄くて実際にプロダクションで使うにはもっと調べないと不安で使えないな。