27
27

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Laravel webscoket(push & pubsub) 後編

Last updated at Posted at 2014-04-24

大分前にpubsubの実装方法を記載しましたが、
それをさらに実装していく方法を紹介します。(Laravel meetup Tokyo vol.3の内容です)
laravel4 websocket push
laravel4+redis pub/sub

今回さらに実装を進めていく上で、通常のwebsocketの
クライアント->websocket->クライアント の流れではなく、
クライアント->API(redisへ送信)->subscriberのwebsocketサーバがpush->クライアント
という、Redisを中間として実装します。

利用するもは前回までのものを組み合わせたものです

composer.json
"require": {
    "php": ">=5.4.0",
    "laravel/framework": "4.1.*",
    "ext-zmq": "*",
    "ext-phpiredis": "*",
    "predis/predis-async": "dev-master",
    "cboden/Ratchet": "0.3.*",
    "react/zmq": "0.2.*",
}

前回実装したpubsubとそんなに大きくは変わりませんが、
以下のように実装してみます。


use Predis\Async\Client as AsyncClient;

class Async implements AsyncInterface {

    /** @var array */
    protected $connection = [];
    /** @var \ZMQContext */
    protected $context;

    /**
     * @param \ZMQContext $context
     */
    public function __construct(\ZMQContext $context)
    {
        $this->connection = \Config::get('database.redis.default');
        $this->context = $context;
    }

    /**
     * @return \React\EventLoop\LoopInterface
     * @throws \Exception
     */
    public function async()
    {
        $client = new AsyncClient($this->connection);
        $client->connect(function ($client) {
            $redis = new AsyncClient($this->connection, $client->getEventLoop());
            // subscribe channel
            $client->pubsub(
                \Config::get('pubsub.basic_channel'), function ($event) use ($redis) {
                $socket = $this->context->getSocket(\ZMQ::SOCKET_PUSH, 'push');
                $socket->connect(\Config::get('app.socket_connection'));
                $socket->send($event->payload);
            });
        });

        if(!$client->isConnected())
        {
            throw new \Exception("redis not connect", 500);
        }
        return $client->getEventLoop();
    }
/** その他色々 **/
}

Predis\Async\Clientは内部でreact/event-loopを使って実装されています。
redisから送信されるメッセージを受けて、
ZMQを使ってpushします。
pubsubのチャンネルなどはここで指定するようにすれば、
指定したものだけを受信する様になります。

pushはRatchetを利用して実装します。

class Push implements WampServerInterface
{
    protected $subscribedTopics = array();

    public function onSubscribe(ConnectionInterface $conn, $topic)
    {
        if (!array_key_exists($topic->getId(), $this->subscribedTopics))
        {
	    $this->subscribedTopics[$topic->getId()] = $topic;
	}
    }
    /**
     * interfaceに沿って他のメソッドも実装してください
     */ 

    public function push($data)
    {
        $entryData = json_decode($data, true);

        foreach($this->subscribedTopics as $topic)
        {
            $topic->broadcast($entryData);
        }
    }
}

クライアントへの送信の実装は上記のようなものします。
subscribe以外にも細かくしていする場合はbroadcastの場合に何かしら処理を加えると
良いのではないでしょうか

次にwebsocketサーバを実装します。
CLIで起動させますので、前回紹介したものを利用します。

PubSubCommand.php
    public function __construct(AsyncInterface $loop, WampServerInterface $wamp)
    {
        parent::__construct();
        $this->loop = $loop;
        $this->wamp = $wamp;
    }

    /**
     * @return void
     */
    public function fire()
    {
        $loop = $this->loop->async();
        $this->info('redis subscribe start');
        $this->pull($loop, $this->wamp);

        $webSock = new \React\Socket\Server($loop);
        $webSock->listen($this->option('port'), '0.0.0.0');

        $webServer = new \Ratchet\Server\IoServer(
            new \Ratchet\Http\HttpServer(
                new \Ratchet\WebSocket\WsServer(
                    new \Ratchet\Wamp\WampServer($this->wamp)
                )
            ), $webSock);

        $loop->run();
    }

    /**
     * @param $loop \React\EventLoop\LoopInterface
     * @param $wamp \Ratchet\Wamp\WampServerInterface
     * @return void
     */
    protected function pull($loop, $wamp)
    {
        $context = new \React\ZMQ\Context($loop);
        $pull = $context->getSocket(\ZMQ::SOCKET_PULL);
        $pull->bind(\Config::get('app.socket_connection'));
        $pull->on('message', array($wamp, 'push'));
    }

pushとpubsubは既に実装しましたので、websocketサーバだけの実装です。
これでコマンドを叩くとwebsocketサーバが起動し、
pubsubを利用することができるようになりました。

あとはAPI等を通じてpublishします。

$result = \Redis::connection('default')
    ->publish(\Config::get('pubsub.basic_channel'), json_encode($array));

javascriptも以前のものを利用するだけです。
コマンドでpublishを実行しても、
pubsubを利用して各クライアントへpushしますので、是非phpで実装してみてください。
細かい実装はGitHubで公開していますので、興味がありましたら参考にしてみてください。
laravel-websocket

27
27
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
27
27

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?