Edited at

Laravel webscoket(push & pubsub) 後編

More than 5 years have passed since last update.

大分前にpubsubの実装方法を記載しましたが、

それをさらに実装していく方法を紹介します。(Laravel meetup Tokyo vol.3の内容です)

laravel4 websocket push

laravel4+redis pub/sub

今回さらに実装を進めていく上で、通常のwebsocketの

クライアント->websocket->クライアント の流れではなく、

クライアント->API(redisへ送信)->subscriberのwebsocketサーバがpush->クライアント

という、Redisを中間として実装します。

利用するもは前回までのものを組み合わせたものです


composer.json

"require": {

"php": ">=5.4.0",
"laravel/framework": "4.1.*",
"ext-zmq": "*",
"ext-phpiredis": "*",
"predis/predis-async": "dev-master",
"cboden/Ratchet": "0.3.*",
"react/zmq": "0.2.*",
}

前回実装したpubsubとそんなに大きくは変わりませんが、

以下のように実装してみます。


use Predis\Async\Client as AsyncClient;

class Async implements AsyncInterface {

/** @var array */
protected $connection = [];
/** @var \ZMQContext */
protected $context;

/**
* @param \ZMQContext $context
*/

public function __construct(\ZMQContext $context)
{
$this->connection = \Config::get('database.redis.default');
$this->context = $context;
}

/**
* @return \React\EventLoop\LoopInterface
* @throws \Exception
*/

public function async()
{
$client = new AsyncClient($this->connection);
$client->connect(function ($client) {
$redis = new AsyncClient($this->connection, $client->getEventLoop());
// subscribe channel
$client->pubsub(
\Config::get('pubsub.basic_channel'), function ($event) use ($redis) {
$socket = $this->context->getSocket(\ZMQ::SOCKET_PUSH, 'push');
$socket->connect(\Config::get('app.socket_connection'));
$socket->send($event->payload);
});
});

if(!$client->isConnected())
{
throw new \Exception("redis not connect", 500);
}
return $client->getEventLoop();
}
/** その他色々 **/
}

Predis\Async\Clientは内部でreact/event-loopを使って実装されています。

redisから送信されるメッセージを受けて、

ZMQを使ってpushします。

pubsubのチャンネルなどはここで指定するようにすれば、

指定したものだけを受信する様になります。

pushはRatchetを利用して実装します。

class Push implements WampServerInterface

{
protected $subscribedTopics = array();

public function onSubscribe(ConnectionInterface $conn, $topic)
{
if (!array_key_exists($topic->getId(), $this->subscribedTopics))
{
$this->subscribedTopics[$topic->getId()] = $topic;
}
}
/**
* interfaceに沿って他のメソッドも実装してください
*/

public function push($data)
{
$entryData = json_decode($data, true);

foreach($this->subscribedTopics as $topic)
{
$topic->broadcast($entryData);
}
}
}

クライアントへの送信の実装は上記のようなものします。

subscribe以外にも細かくしていする場合はbroadcastの場合に何かしら処理を加えると

良いのではないでしょうか

次にwebsocketサーバを実装します。

CLIで起動させますので、前回紹介したものを利用します。


PubSubCommand.php

    public function __construct(AsyncInterface $loop, WampServerInterface $wamp)

{
parent::__construct();
$this->loop = $loop;
$this->wamp = $wamp;
}

/**
* @return void
*/

public function fire()
{
$loop = $this->loop->async();
$this->info('redis subscribe start');
$this->pull($loop, $this->wamp);

$webSock = new \React\Socket\Server($loop);
$webSock->listen($this->option('port'), '0.0.0.0');

$webServer = new \Ratchet\Server\IoServer(
new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer(
new \Ratchet\Wamp\WampServer($this->wamp)
)
), $webSock);

$loop->run();
}

/**
* @param $loop \React\EventLoop\LoopInterface
* @param $wamp \Ratchet\Wamp\WampServerInterface
* @return void
*/

protected function pull($loop, $wamp)
{
$context = new \React\ZMQ\Context($loop);
$pull = $context->getSocket(\ZMQ::SOCKET_PULL);
$pull->bind(\Config::get('app.socket_connection'));
$pull->on('message', array($wamp, 'push'));
}


pushとpubsubは既に実装しましたので、websocketサーバだけの実装です。

これでコマンドを叩くとwebsocketサーバが起動し、

pubsubを利用することができるようになりました。

あとはAPI等を通じてpublishします。

$result = \Redis::connection('default')

->publish(\Config::get('pubsub.basic_channel'), json_encode($array));

javascriptも以前のものを利用するだけです。

コマンドでpublishを実行しても、

pubsubを利用して各クライアントへpushしますので、是非phpで実装してみてください。

細かい実装はGitHubで公開していますので、興味がありましたら参考にしてみてください。

laravel-websocket