概要
Spring boot(Webflux)で WebSocket クライアントを実装した時のメモ
今回は題材として bitFlyer Lightning Realtime API (JSON-RPC 2.0 over WebSocket) で Bitcoin の ticker データをリアルタイムに取得します。
JSON-RPC 2.0
JSON-RPC 2.0 は RPC(Remote Procedure Call)を実現するためのプロトコルの1つです。その名の通り、JSON形式でメッセージングを行います。
JSON-RPC 2.0では次のような JSON でリクエストを送信することで、外部のメソッドである add(20, 30)
を呼び出します。(リクエストの仕様はこちら)
{
"jsonrpc": "2.0",
"method": "add",
"params": [20, 30]
"id": 1
}
まずはこのリクエストを送信するための DTO を定義します。
@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class JsonRPC2Request<T>(
@field:JsonProperty("jsonrpc")
val version: String = "2.0",
val method: String = "subscribe",
val params: T?,
val id: Int? = null
)
今回は簡単のため、実際に送信する値をデフォルト引数に与えています。 params
はジェネリクスを用いて様々な形式のリクエストを送信できるようにします。
bitFlyer Lightning Realtime API メッセージ定義
次に bitFlyer Lightning Realtime API で BTC の ticker を取得するために必要な params
を定義します。
// { "channel": "lightning_ticker_BTC_JPY" }
// 上記のような JSON を `subscribe` メソッドに渡す
data class TickerRequestParams(val channel: String) {
companion object {
fun of(productCode: String) = TickerRequestParams(String.format("lightning_ticker_%s", productCode))
}
}
subsctibe
リクエストに成功すると、接続している間 API 側から次のようなリクエストが配信されるようになります。(先ほど送信したリクエストと同じ形式です。)
{
...,
"method": "channelMessage",
"params": {
"channel": "..."
"message": {
// ticker 情報
}
},
...
}
それでは API 仕様書通りに ticker 情報を格納する DTO を実装します。
data class TickerMessage(
@field:JsonProperty("product_code")
val productCode: String,
val timestamp: String,
@field:JsonProperty("tick_id")
val tickId: Int,
@field:JsonProperty("best_bid")
val bestBid: Double,
@field:JsonProperty("best_ask")
val bestAsk: Double,
@field:JsonProperty("best_bid_size")
val bestBidSize: Double,
@field:JsonProperty("best_ask_size")
val bestAskSize: Double,
@field:JsonProperty("total_bid_depth")
val totalBidDepth: Double,
@field:JsonProperty("total_ask_depth")
val totalAskDepth: Double,
val ltp: Double,
val volume: Double,
@field:JsonProperty("volume_by_product")
val volumeByProduct: Double
)
class TickerSubscribeParams(
val channel: String,
val message: TickerMessage
)
API クライアント実装
いよいよ API クライアントを実装します。
今回は JSON-RPC 2.0 のプロトコルを実装する(channelMessage
メソッドを実装する)のでは無く、受け取ったリクエストメッセージから必要な情報を抜き出すのみになります。
先述したように、大まかな流れは次の通りです。
- API の
subsctibe
メソッドをコール - API から
channelMessage
メソッドのリクエストを受信 - 受信したリクエストから
TickerMessage
を取り出して返却
また、返り値は Kotlin っぽく Flow でデータを流すようにします。
@Repository
class TickerRepository {
companion object {
private val endpoint = URI("wss://ws.lightstream.bitflyer.com/json-rpc")
// Kotlin 用支援モジュールが登録された ObjectMapper
private val objectMapper = jacksonObjectMapper()
private val webSocketClient = ReactorNettyWebSocketClient()
}
fun stream(productCode: String): Flow<TickerMessage> {
val output = EmitterProcessor.create<TickerMessage>()
// `subscribe` リクエストメッセージを用意
val sessionMono = webSocketClient.execute(endpoint) { session ->
val request = JsonRPC2Request(params = TickerRequestParams.of(productCode))
val requestMessage = Mono.fromCallable {
session.textMessage(objectMapper.writeValueAsString(request))
}
// `subsctibe` メソッドをコールし、API からメッセージを取得する
// 今回は `channelMessage` メソッド呼び出しである等の検証は特にしない
session.send(requestMessage)
.thenMany(
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map { objectMapper.readValue<JsonRPC2Request<TickerSubscribeParams>>(it).params!!.message }
.subscribeWith(output)
.then()
)
.then()
}
return output.doOnSubscribe { sessionMono.subscribe() }
.asFlow()
}
}
サンプルアプリケーション
実装した API クライアントをテストするアプリケーションを実装します。
購読した ticker をログに出力するだけのアプリケーションになります。
@SpringBootApplication
class WebsocketClientWebfluxApplication
fun main(args: Array<String>) {
runApplication<WebsocketClientWebfluxApplication>(*args)
}
@Component
class TickerSubscriber(private val tickerRepository: TickerRepository) : CommandLineRunner {
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
override fun run(vararg args: String?) {
runBlocking {
tickerRepository.stream("BTC_JPY")
.collect {
log.info(it.toString())
}
}
}
}
こちらを実行すると次のようなログが出力されます。
2020-06-16 00:11:25.709 INFO 39279 --- [ main] e.websocket.TickerSubscriber$Companion : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:27.8618736Z, tickId=5091266, bestBid=991618.0, bestAsk=991942.0, bestBidSize=1.0, bestAskSize=0.3, totalBidDepth=1485.61131859, totalAskDepth=1823.18609568, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:26.528 INFO 39279 --- [ main] e.websocket.TickerSubscriber$Companion : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:28.6021127Z, tickId=5091275, bestBid=991618.0, bestAsk=991941.0, bestBidSize=1.0, bestAskSize=0.30000013, totalBidDepth=1485.29131846, totalAskDepth=1823.01649568, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:27.360 INFO 39279 --- [ main] e.websocket.TickerSubscriber$Companion : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:29.5142542Z, tickId=5091301, bestBid=991651.0, bestAsk=991941.0, bestBidSize=1.0, bestAskSize=0.4153, totalBidDepth=1482.99883576, totalAskDepth=1823.07705555, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:28.320 INFO 39279 --- [ main] e.websocket.TickerSubscriber$Companion : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:30.472107Z, tickId=5091321, bestBid=991500.0, bestAsk=991941.0, bestBidSize=1.11, bestAskSize=0.15, totalBidDepth=1481.4661186, totalAskDepth=1823.18220555, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:29.135 INFO 39279 --- [ main] e.websocket.TickerSubscriber$Companion : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:31.2883294Z, tickId=5091340, bestBid=991500.0, bestAsk=991939.0, bestBidSize=1.11, bestAskSize=0.25, totalBidDepth=1479.5440686, totalAskDepth=1823.10825568, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
以上のようにして、WebSocket を利用した通信を行うことができました。
終わりに
ほぼ全てのソースコードは本エントリに記載しましたが、Github リポジトリも用意しました。
また WebSocket や JSON-RPC 2.0 については調べながら実装・執筆したため、間違い等ございましたらご指摘いただけると幸いです。