大分前にpubsubの実装方法を記載しましたが、
それをさらに実装していく方法を紹介します。(Laravel meetup Tokyo vol.3の内容です)
laravel4 websocket push
laravel4+redis pub/sub
今回さらに実装を進めていく上で、通常のwebsocketの
クライアント->websocket->クライアント の流れではなく、
クライアント->API(redisへ送信)->subscriberのwebsocketサーバがpush->クライアント
という、Redisを中間として実装します。
利用するもは前回までのものを組み合わせたものです
"require": {
"php": ">=5.4.0",
"laravel/framework": "4.1.*",
"ext-zmq": "*",
"ext-phpiredis": "*",
"predis/predis-async": "dev-master",
"cboden/Ratchet": "0.3.*",
"react/zmq": "0.2.*",
}
前回実装したpubsubとそんなに大きくは変わりませんが、
以下のように実装してみます。
use Predis\Async\Client as AsyncClient;
class Async implements AsyncInterface {
/** @var array */
protected $connection = [];
/** @var \ZMQContext */
protected $context;
/**
* @param \ZMQContext $context
*/
public function __construct(\ZMQContext $context)
{
$this->connection = \Config::get('database.redis.default');
$this->context = $context;
}
/**
* @return \React\EventLoop\LoopInterface
* @throws \Exception
*/
public function async()
{
$client = new AsyncClient($this->connection);
$client->connect(function ($client) {
$redis = new AsyncClient($this->connection, $client->getEventLoop());
// subscribe channel
$client->pubsub(
\Config::get('pubsub.basic_channel'), function ($event) use ($redis) {
$socket = $this->context->getSocket(\ZMQ::SOCKET_PUSH, 'push');
$socket->connect(\Config::get('app.socket_connection'));
$socket->send($event->payload);
});
});
if(!$client->isConnected())
{
throw new \Exception("redis not connect", 500);
}
return $client->getEventLoop();
}
/** その他色々 **/
}
Predis\Async\Clientは内部でreact/event-loopを使って実装されています。
redisから送信されるメッセージを受けて、
ZMQを使ってpushします。
pubsubのチャンネルなどはここで指定するようにすれば、
指定したものだけを受信する様になります。
pushはRatchetを利用して実装します。
class Push implements WampServerInterface
{
protected $subscribedTopics = array();
public function onSubscribe(ConnectionInterface $conn, $topic)
{
if (!array_key_exists($topic->getId(), $this->subscribedTopics))
{
$this->subscribedTopics[$topic->getId()] = $topic;
}
}
/**
* interfaceに沿って他のメソッドも実装してください
*/
public function push($data)
{
$entryData = json_decode($data, true);
foreach($this->subscribedTopics as $topic)
{
$topic->broadcast($entryData);
}
}
}
クライアントへの送信の実装は上記のようなものします。
subscribe以外にも細かくしていする場合はbroadcastの場合に何かしら処理を加えると
良いのではないでしょうか
次にwebsocketサーバを実装します。
CLIで起動させますので、前回紹介したものを利用します。
public function __construct(AsyncInterface $loop, WampServerInterface $wamp)
{
parent::__construct();
$this->loop = $loop;
$this->wamp = $wamp;
}
/**
* @return void
*/
public function fire()
{
$loop = $this->loop->async();
$this->info('redis subscribe start');
$this->pull($loop, $this->wamp);
$webSock = new \React\Socket\Server($loop);
$webSock->listen($this->option('port'), '0.0.0.0');
$webServer = new \Ratchet\Server\IoServer(
new \Ratchet\Http\HttpServer(
new \Ratchet\WebSocket\WsServer(
new \Ratchet\Wamp\WampServer($this->wamp)
)
), $webSock);
$loop->run();
}
/**
* @param $loop \React\EventLoop\LoopInterface
* @param $wamp \Ratchet\Wamp\WampServerInterface
* @return void
*/
protected function pull($loop, $wamp)
{
$context = new \React\ZMQ\Context($loop);
$pull = $context->getSocket(\ZMQ::SOCKET_PULL);
$pull->bind(\Config::get('app.socket_connection'));
$pull->on('message', array($wamp, 'push'));
}
pushとpubsubは既に実装しましたので、websocketサーバだけの実装です。
これでコマンドを叩くとwebsocketサーバが起動し、
pubsubを利用することができるようになりました。
あとはAPI等を通じてpublishします。
$result = \Redis::connection('default')
->publish(\Config::get('pubsub.basic_channel'), json_encode($array));
javascriptも以前のものを利用するだけです。
コマンドでpublishを実行しても、
pubsubを利用して各クライアントへpushしますので、是非phpで実装してみてください。
細かい実装はGitHubで公開していますので、興味がありましたら参考にしてみてください。
laravel-websocket