背景
Socket.IOでpub/subを実現したいのであれば、Node.jsでサーバーを立てるのが鉄板?と思いますが、異なるシステム間で連携したいこともありますよね。
ScalaでSocket.IO
図の様にセンサーで拾ったデータをScalaで書かれたサーバーに送っていますが、更新情報をサーバーからクライアント側にリアルタイムで伝えたい場合、Scalaのサーバーとクライアント間でwebsocketを使えば良いですが、クライアント側のブラウザの対応有無などを気にしたくないので今回はSocket.IOを使います。ScalaでSocket.IOに対応したライブラリが無いか探してみましたが、Play Frameworkのモジュールとして存在するのは確認できましたが、汎用的なライブラリは見つかりませんでした。
今回サーバー側はAkkaHTTPで書いており、別の方法で実現する必要が出てきました。
Redisのpub/subを利用する
図の様にRedisを仲介役として利用する方法を考えます。Node.js側は、以下の様に少しコードを足すだけで対応できます。
const io = require('socket.io')(3000);
const redisAdapter = require('socket.io-redis');
io.adapter(redisAdapter({ host: 'localhost', port: 6379 }));
Scala側はというと、Redisに自前で書きに行く(publish)必要があります。
Scala用のRedisクライアントについてまとめてくれていた記事を参考に選定します。
たけぞう瀕死ブログ
Sedisはブロッキングのみなので除外。残った「scala-redis」か「rediscala」が候補になります。
redisへ書き込むデータの形式は以下のようになります。
ここでは、data配列の第2要素に送りたいデータを設定します。
[
"emitter",
{
"type": 2,
"data": [
"イベント名",
{
"message": "Hello World!"
}
],
"nsp": "namespace"
},
{
"rooms": ["ルーム名"],
"flags": []
}
]
これをMessagePackでシリアライズしたバイナリデータをredisへ書き込みます。
ここで先程のライブラリのうち「scala-redis」はバイナリ書き込みのメソッドを持っていないため、「rediscala」一択となります。
ScalaでMessagePackのライブラリは、MessagePack本家が出している「msgpack-scala」を利用します。
msgpack-scala
このライブラリが少し曲者で上記のJSONを表現するコードは以下の様になります。
他の言語のライブラリだとメソッド一発でバイナリにしてくれたりするようですが、このライブラリには見当たりませんでした・・・
val out = new ByteArrayOutputStream
val packer = MessagePack.newDefaultPacker(out)
packer.packArrayHeader(3)
// set emitter
packer.packString("emitter")
// set type 2
packer.packMapHeader(3)
packer.packString("type")
packer.packString("2")
// set data
packer.packString("data")
packer.packArrayHeader(2)
// set event name
packer.packString("イベント名")
// set data body
packer.packMapHeader(1)
packer.packString("message")
packer.packString("Hello World!")
// set namespace
packer.packString("nsp")
packer.packString("/")
packer.packMapHeader(2)
packer.packString(roomName)
packer.packArrayHeader(1)
packer.packString("ルーム名")
packer.packString("flags")
packer.packMapHeader(0)
packer.flush()
packer.close()
out.toByteArray
ここで取得したバイナリデータをrediscalaを利用してRedisに書き込みます。
※RedisClientのコンストラクタはActorSystemをimplictで受け取るようになっているので、それぞれの環境に応じて用意して下さい。
val redis = RedisClient(server,port)
redis.publish("チャンネル名", バイナリデータ)
データ受信
クライアント側では、設定したイベント名でデータを受信することができます。
socketio.on('イベント名', function(msg){
console.log('receive:' + JSON.stringify(msg));
});
まとめ
Scalaでの紹介でしたが、他の言語でも同じ仕組みを利用することでRedisに書き込みができれば連携ができるようになるかと思います。
※今回参考にしたサイト
この方は同様のことをpythonで実現されていました。
pythonを使ってsocket.io-redisへイベントをemitする