0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Spring Webflux による WebSocket クライアント

Posted at

概要

Spring boot(Webflux)で WebSocket クライアントを実装した時のメモ
今回は題材として bitFlyer Lightning Realtime API (JSON-RPC 2.0 over WebSocket) で Bitcoin の ticker データをリアルタイムに取得します。

JSON-RPC 2.0

JSON-RPC 2.0 は RPC(Remote Procedure Call)を実現するためのプロトコルの1つです。その名の通り、JSON形式でメッセージングを行います。
JSON-RPC 2.0では次のような JSON でリクエストを送信することで、外部のメソッドである add(20, 30) を呼び出します。(リクエストの仕様はこちら

{
  "jsonrpc": "2.0",
  "method": "add",
  "params": [20, 30]
  "id": 1
}

まずはこのリクエストを送信するための DTO を定義します。

JsonRPC2Request.kt
@JsonInclude(JsonInclude.Include.NON_EMPTY)
data class JsonRPC2Request<T>(
    @field:JsonProperty("jsonrpc")
    val version: String = "2.0",
    val method: String = "subscribe",
    val params: T?,
    val id: Int? = null
)

今回は簡単のため、実際に送信する値をデフォルト引数に与えています。 params はジェネリクスを用いて様々な形式のリクエストを送信できるようにします。

bitFlyer Lightning Realtime API メッセージ定義

次に bitFlyer Lightning Realtime API で BTC の ticker を取得するために必要な params を定義します。

TickerRequestParams.kt
// { "channel": "lightning_ticker_BTC_JPY" } 
// 上記のような JSON を `subscribe` メソッドに渡す
data class TickerRequestParams(val channel: String) {

    companion object {
        fun of(productCode: String) = TickerRequestParams(String.format("lightning_ticker_%s", productCode))
    }
}

subsctibe リクエストに成功すると、接続している間 API 側から次のようなリクエストが配信されるようになります。(先ほど送信したリクエストと同じ形式です。)

{
  ...,
  "method": "channelMessage",
  "params": {
    "channel": "..."
    "message": {
      // ticker 情報
    }
  },
  ...
}

それでは API 仕様書通りに ticker 情報を格納する DTO を実装します。

TickerMessage.kt
data class TickerMessage(
    @field:JsonProperty("product_code")
    val productCode: String,
    val timestamp: String,
    @field:JsonProperty("tick_id")
    val tickId: Int,
    @field:JsonProperty("best_bid")
    val bestBid: Double,
    @field:JsonProperty("best_ask")
    val bestAsk: Double,
    @field:JsonProperty("best_bid_size")
    val bestBidSize: Double,
    @field:JsonProperty("best_ask_size")
    val bestAskSize: Double,
    @field:JsonProperty("total_bid_depth")
    val totalBidDepth: Double,
    @field:JsonProperty("total_ask_depth")
    val totalAskDepth: Double,
    val ltp: Double,
    val volume: Double,
    @field:JsonProperty("volume_by_product")
    val volumeByProduct: Double
)
TickerSubscribeParams.kt
class TickerSubscribeParams(
    val channel: String,
    val message: TickerMessage
)

API クライアント実装

いよいよ API クライアントを実装します。
今回は JSON-RPC 2.0 のプロトコルを実装する(channelMessage メソッドを実装する)のでは無く、受け取ったリクエストメッセージから必要な情報を抜き出すのみになります。
先述したように、大まかな流れは次の通りです。

  1. API の subsctibe メソッドをコール
  2. API から channelMessage メソッドのリクエストを受信
  3. 受信したリクエストから TickerMessage を取り出して返却

また、返り値は Kotlin っぽく Flow でデータを流すようにします。

TickerRepository.kt
@Repository
class TickerRepository {

    companion object {
        private val endpoint = URI("wss://ws.lightstream.bitflyer.com/json-rpc")
        // Kotlin 用支援モジュールが登録された ObjectMapper
        private val objectMapper = jacksonObjectMapper()
        private val webSocketClient = ReactorNettyWebSocketClient()
    }

    fun stream(productCode: String): Flow<TickerMessage> {
        val output = EmitterProcessor.create<TickerMessage>()

        // `subscribe` リクエストメッセージを用意
        val sessionMono = webSocketClient.execute(endpoint) { session ->
            val request = JsonRPC2Request(params = TickerRequestParams.of(productCode))
            val requestMessage = Mono.fromCallable {
                session.textMessage(objectMapper.writeValueAsString(request))
            }

            // `subsctibe` メソッドをコールし、API からメッセージを取得する
            // 今回は `channelMessage` メソッド呼び出しである等の検証は特にしない
            session.send(requestMessage)
                .thenMany(
                    session.receive()
                        .map(WebSocketMessage::getPayloadAsText)
                        .map { objectMapper.readValue<JsonRPC2Request<TickerSubscribeParams>>(it).params!!.message }
                        .subscribeWith(output)
                        .then()
                )
                .then()
        }

        return output.doOnSubscribe { sessionMono.subscribe() }
            .asFlow()
    }
}

サンプルアプリケーション

実装した API クライアントをテストするアプリケーションを実装します。
購読した ticker をログに出力するだけのアプリケーションになります。

WebsocketClientWebfluxApplication.kt
@SpringBootApplication
class WebsocketClientWebfluxApplication

fun main(args: Array<String>) {
    runApplication<WebsocketClientWebfluxApplication>(*args)
}

@Component
class TickerSubscriber(private val tickerRepository: TickerRepository) : CommandLineRunner {

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }

    override fun run(vararg args: String?) {
        runBlocking {
            tickerRepository.stream("BTC_JPY")
                .collect {
                    log.info(it.toString())
                }
        }
    }
}

こちらを実行すると次のようなログが出力されます。

2020-06-16 00:11:25.709  INFO 39279 --- [           main] e.websocket.TickerSubscriber$Companion   : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:27.8618736Z, tickId=5091266, bestBid=991618.0, bestAsk=991942.0, bestBidSize=1.0, bestAskSize=0.3, totalBidDepth=1485.61131859, totalAskDepth=1823.18609568, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:26.528  INFO 39279 --- [           main] e.websocket.TickerSubscriber$Companion   : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:28.6021127Z, tickId=5091275, bestBid=991618.0, bestAsk=991941.0, bestBidSize=1.0, bestAskSize=0.30000013, totalBidDepth=1485.29131846, totalAskDepth=1823.01649568, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:27.360  INFO 39279 --- [           main] e.websocket.TickerSubscriber$Companion   : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:29.5142542Z, tickId=5091301, bestBid=991651.0, bestAsk=991941.0, bestBidSize=1.0, bestAskSize=0.4153, totalBidDepth=1482.99883576, totalAskDepth=1823.07705555, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:28.320  INFO 39279 --- [           main] e.websocket.TickerSubscriber$Companion   : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:30.472107Z, tickId=5091321, bestBid=991500.0, bestAsk=991941.0, bestBidSize=1.11, bestAskSize=0.15, totalBidDepth=1481.4661186, totalAskDepth=1823.18220555, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)
2020-06-16 00:11:29.135  INFO 39279 --- [           main] e.websocket.TickerSubscriber$Companion   : TickerMessage(productCode=BTC_JPY, timestamp=2020-06-15T15:11:31.2883294Z, tickId=5091340, bestBid=991500.0, bestAsk=991939.0, bestBidSize=1.11, bestAskSize=0.25, totalBidDepth=1479.5440686, totalAskDepth=1823.10825568, ltp=991900.0, volume=8075.04333188, volumeByProduct=8075.04333188)

以上のようにして、WebSocket を利用した通信を行うことができました。

終わりに

ほぼ全てのソースコードは本エントリに記載しましたが、Github リポジトリも用意しました。
また WebSocket や JSON-RPC 2.0 については調べながら実装・執筆したため、間違い等ございましたらご指摘いただけると幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?