5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Rust】bitFlyer Lightning APIのWebSocketクライアントの実装

Last updated at Posted at 2023-05-22

はじめに

取引を自動化したいユーザーとAPI

 仮想通貨取引所では日々取引が行われています。取引を行うユーザーの目的の一つにその売買の価格差による収益を得ることがあります。年に一度のような長期的な売買を行うユーザーもいれば、秒単位で売買を繰り返す短期的なユーザーもいます。
 取引を行うには、まず現在価格などの情報を知ることも必要です。情報を取得したり取引したりする方法として一般的なのは、取引所がWebで公開しているプラットフォームにブラウザからアクセスする方法です。一方で、プログラムを用いて取引所が公開するAPIを経由して情報を取得したり取引したりする方法もあります。高頻度にあるいは正確に取引を行いたいユーザーにとってAPIを利用した自動化は有用です。
 ブラウザが取引所の情報を表示できるのも結局はAPIを経由して情報を取得しているからですが、ありがたいことにそのAPIを公開している取引所は多いです。余談ですが、FXや株の取引所は基本公開していない印象です。

WebSocketなAPIを公開する取引所

 ユーザーはこのAPIの窓口に対して情報を要求したり、取引の注文を行うプログラムを書いたりすることでそれらの処理を自動化することができます。
 また、昨今はWebSocketという通信方式を用いたAPIを公開する取引所もあり、ますますユーザーは高頻度に通信できるようになっています。一般的なAPI通信ではHTTPリクエストとそのレスポンスの往復を都度繰り返しますが、WebSocketでは一度HTTP通信で接続を確立できればHTTPリクエストを再送信せずとも双方向に情報のやり取りが可能です。都度再接続しなくてよい分コストが少ないので、よりリアルタイム性を求められる取引所のAPIとしても相性がよいと考えられます。

The goal of this technology is to provide a mechanism for browser-based
applications that need two-way communication with servers that does
not rely on opening multiple HTTP connections (e.g., using
XMLHttpRequest or <iframe>s and long polling).

The WebSocket Protocol (RFC6455)

Rustでリアルタイムに情報取得してみる

 WebSocketで情報公開している取引所は複数ありますが、bitFlyerはその一つです。bitFlyerではSocket.IOJSON-RPCの2つの方法のエンドポイントを用意しています。

Realtime API - bitFlyer Lightning

 今回はbitFlyer LightningのRealtime API(Socket.IO)経由でFX_BTC_JPYのPUBLIC CHANNELSの情報をリアルタイムに取得するようなWebSocketクライアントをRustで実装したので、これについて書いていきます。
 ちなみに、Rustに限らず様々な言語でWebSocketクライアントの実装が可能です。私が過去にPythonで作成したbitbank用のソースコードもGitHubで公開していますので関心があればご覧ください。

開発環境

Windows11 + VSCode + Rust(バージョン1.68)

クレート バージョン 説明
another-rxrust 0.0.45 リアクティブプログラミング用ライブラリ1
rust_socketio 0.4.1 Socket.IOを扱うためのライブラリ
serde 1.0.158 シリアライズ / デシリアライズのためのライブラリ
serde_json 1.0.96

実装

models.rs
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct PriceSize {
    pub price: f64,
    pub size: f64,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Board {
    pub mid_price: f64,
    pub asks: Vec<PriceSize>,
    pub bids: Vec<PriceSize>,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Execution {
    pub id: u32,
    pub side: String,
    pub price: f64,
    pub size: f64,
    pub exec_date: String,
    pub buy_child_order_acceptance_id: String,
    pub sell_child_order_acceptance_id: String,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct Ticker {
    pub product_code: String,
    pub state: String,
    pub timestamp: String,
    pub tick_id: u32,
    pub best_bid: f64,
    pub best_ask: f64,
    pub best_bid_size: f64,
    pub best_ask_size: f64,
    pub total_bid_depth: f64,
    pub total_ask_depth: f64,
    pub market_bid_size: f64,
    pub market_ask_size: f64,
    pub ltp: f64,
    pub volume: f64,
    pub volume_by_product: f64,
}
bitflyer_socketio.rs
use models::{Board, Execution, Ticker, SocketData}

use another_rxrust::{prelude::Observable, subjects::subjects::Subject};
use rust_socketio::{ClientBuilder, Payload, TransportType};
use serde::{Deserialize, Serialize};
use serde_json::from_str;
use std::time::Duration;

pub struct BitFlyerSocketIo<'a> {
    pub executions: Observable<'a, Vec<Execution>>,
    pub board: Observable<'a, Board>,
    pub snapshot: Observable<'a, Board>,
    pub ticker: Observable<'a, Ticker>,
}

impl<'a> BitFlyerSocketIo<'a> {
    pub fn activate() -> Self {

        fn on_message<'a, T: Clone + Sync + Send + Deserialize<'a>>(
            subject: &Subject<'a, T>,
            payload: Payload,
        ) {
            if let Payload::String(message) = payload {
                let message: T = from_str(&message).unwrap();
                subject.next(message);
            }
        }

        let subject_executions = Subject::new();
        let subject_board = Subject::new();
        let subject_snapshot = Subject::new();
        let subject_ticker = Subject::new();

        let client = ClientBuilder::new("https://io.lightstream.bitflyer.com")
            .transport_type(TransportType::Websocket)
            .on("lightning_executions_FX_BTC_JPY", |payload, _| {
                on_message(&subject_executions, payload)
            })
            .on("lightning_board_FX_BTC_JPY", |payload, _| {
                on_message(&subject_board, payload)
            })
            .on("lightning_board_snapshot_FX_BTC_JPY", |payload, _| {
                on_message(&subject_snapshot, payload)
            })
            .on("lightning_ticker_FX_BTC_JPY", |payload, _| {
                on_message(&subject_ticker, payload)
            })
            .connect()
            .unwrap();

        let wait_connect_seconds = Duration::from_secs(3);
        std::thread::sleep(wait_connect_seconds);

        for channel in [
            "lightning_executions_FX_BTC_JPY",
            "lightning_board_FX_BTC_JPY",
            "lightning_board_snapshot_FX_BTC_JPY",
            "lightning_ticker_FX_BTC_JPY",
        ] {
            client.emit("subscribe", channel).unwrap();
        }

        Self {
            executions: subject_executions.observable(),
            board: subject_board.observable(),
            snapshot: subject_snapshot.observable(),
            ticker: subject_ticker.observable(),
        }
    }
}

説明

models.rs

 今回はbitFlyerのPUBLIC CHANNELSの情報を取得するので、それらについて必要な構造体をbitFlyerの仕様に基づいて定義します。
 また、APIから取得したメッセージをJSON文字列からオブジェクトに変換するために、serdeserde_jsonというクレートを使います。

PUBLIC CHANNELS

{pair}は今回はFX_BTC_JPYです。

チャンネル 説明
lightning_board_snapshot_{pair} 板情報のスナップショット
lightning_board_{pair} 板情報の差分
lightning_ticker_{pair} Ticker
lightning_executions_{pair} 約定

定義した構造体

struct 説明
PriceSize 価格と量の情報を持つ構造体
Board 板情報のスナップショット差分を表す構造体
Execution 約定を表す構造体
Ticker Tickerの情報を表す構造体
SocketData 各データに取得時刻を付加した構造体

bitflyer_socketio.rs

定義した構造体

struct 説明
BitFlyerSocketIo bitFlyerのSocket.IO用エンドポイントから取得したメッセージをnextするObservableを、チャンネルごとに用意する構造体

 RustのWebSocketクライアント用のクレートは複数ありますが、wsなど現在はうまく動かないものもあります2。今回はSocket.IO公式が掲載しているクレートのrust-socketioを使います。また、WebSocketのメッセージ受信は非同期的であり、リアクティブプログラミング(RxRust)との相性も良いと考え、そのためのクレートanother-rxrustを使います。

メッセージを受信して扱うための処理

bitFlyerの場合、Socket.IOでメッセージを受信するためには、次のような手続きが必要になります。

  1. メッセージを受信した際の処理を定義しておく
  2. エンドポイントに接続のリクエストをする
  3. メッセージを受信したいチャンネルに対して"subscribe"というメッセージを送信する

これら手続きをBitFlyerSocketIoactivate()メソッドに記述しています。

1. メッセージを受信した際の処理を定義しておく

 メッセージの受信時にそのJSON文字列をオブジェクトにデシリアライズする関数on_message()を定義します。ソースコードは上で書いたものと同じです。

        fn on_message<'a, T: Clone + Sync + Send + Deserialize<'a>>(
            subject: &Subject<'a, T>,
            payload: Payload,
        ) {
            if let Payload::String(message) = payload {
                let message: T = from_str(&message).unwrap();
                subject.next(message);
            }
        }
2. エンドポイントに接続のリクエストをする

 まずチャンネルごとにSubjectを用意します。そしてrust-socketio::ClientBuilderに必要な情報を設定したものを用意し、bitFlyerに接続を試みます。そのあと接続が完了するのを3秒間待ちます。

        let subject_executions = Subject::new();
        let subject_board = Subject::new();
        let subject_snapshot = Subject::new();
        let subject_ticker = Subject::new();

        let client = ClientBuilder::new("https://io.lightstream.bitflyer.com")
            .transport_type(TransportType::Websocket)
            .on("lightning_executions_FX_BTC_JPY", |payload, _| {
                on_message(&subject_executions, payload)
            })
            .on("lightning_board_FX_BTC_JPY", |payload, _| {
                on_message(&subject_board, payload)
            })
            .on("lightning_board_snapshot_FX_BTC_JPY", |payload, _| {
                on_message(&subject_snapshot, payload)
            })
            .on("lightning_ticker_FX_BTC_JPY", |payload, _| {
                on_message(&subject_ticker, payload)
            })
            .connect()
            .unwrap();

        let wait_connect_seconds = Duration::from_secs(3);
        std::thread::sleep(wait_connect_seconds);
3. メッセージを受信したいチャンネルに対して"subscribe"というメッセージを送信する

 チャンネルごとに"subscribe"というメッセージをemit()します。最後に、チャンネルごとに用意したSubjectObservableに置き換え、BitFlyerSocketIo構造体として返します(SubjectObservableなどはリアクティブプログラミングでの用語です。RxRustだと情報が少ないので、詳しくはRxJsなどで調べていただけるとよいかと思います)。

        for channel in [
            "lightning_executions_FX_BTC_JPY",
            "lightning_board_FX_BTC_JPY",
            "lightning_board_snapshot_FX_BTC_JPY",
            "lightning_ticker_FX_BTC_JPY",
        ] {
            client.emit("subscribe", channel).unwrap();
        }

        Self {
            executions: subject_executions.observable(),
            board: subject_board.observable(),
            snapshot: subject_snapshot.observable(),
            ticker: subject_ticker.observable(),
        }

 今回はチャンネルごとのObservableをもつBitFlyerSocketIoのインスタンスを返すようにしたので、実際に利用する際は次のようにそれらをsubscribe()して扱うことになります。

main_example.rs
use bitflyer_socketio::BitFlyerSocketIo;

fn main() {
    fn next(message) { /* messageを表示するなどの処理 */ }
    fn error(e) {/* 省略 */}
    fn complete() {/* 省略 */}

    let socket = BitFlyerSocketIo::activate();
    socket.ticker.subscribe(next, error, complete);
    loop {}
}

おわりに

 今回はbitFlyerのWebSocketクライアントをRustで実装したことについて書きました。実装する際はRustで書く前にPostmanなどを使ってAPIの挙動を確認するとよいです。
 これまで自動売買Botなどを開発する際はPythonを主に使ってきたのですが、ラズパイで収集しているデータ数はとうに1億点を超え、大量のデータを一度に扱うとなると限界を感じることもありました。そのため、比較的処理速度が速いRustは魅力的で、今後のBot開発でも使っていきたいと考えています。
 一方でRustについての情報はPythonと比べると少ないと感じます。実際、WebSocket用のライブラリとしてrust-socketio以外にもwstungstenitetokioなどある中でどれを使えばいいのか最初はわからず時間がかかりました。今後はより情報が増えてくれるとありがたいですね。
 売買に関しての責任はとれませんが、この記事が参考になれば幸いです。

  1. https://qiita.com/terukazu/items/6c1195adc2d01d7012c8

  2. https://stackoverflow.com/questions/69933869/how-can-i-make-a-rust-websocket-client

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?