6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

D言語でプライス受信をしてみる

Last updated at Posted at 2016-12-11

はじめに

投稿遅れてしまってごめんなさい……。

さて、普段実用性皆無のコードばかりD言語で書いてきたのですが、今年のACでは、実際上の問題におそろしく関わるコードを書いてみようと思います。

つまり です。ようするに FX です。

仕事では金融・証券系残業システム・エンジニアをやっているので債券とか外為とかにはそれなりに馴染みがあるのでした。せっかくなのでD言語でそれらを扱ってみようと思います。

まずはプライスを受信するところからです。

業者からプライスを受信する。

この業者に25万円入金すると、無料でREST APIによるプライスデータ受信が行えるようになります。ついでに取引も行えます。

(とりあえず試すだけならデモ口座で良いようです)

APIは、認証トークンをAuthorizationヘッダに付けてHTTPSリクエストを飛ばすだけの簡単なものです。結果はすべてJSONで返って来るようです。

D言語でとりあえず書く

D言語の標準ライブラリだけで真面目にプライス受信プログラムを書くと、下記のようになります。

price.d
/**
 * price receiver
 */
module price;

import std.algorithm : each, joiner, map;
import std.array : appender, array;
import std.conv : to;
import std.json : parseJSON;
import std.net.curl : byLineAsync, get, HTTP;
import std.process : environment;
import std.stdio : KeepTerminator, writeln;

/// プライス構造体
struct Price {
    string instrument; /// 通貨
    string time; /// 時刻
    string ask; /// 買値
    string bid; /// 売値
}

/// API発行構造体
struct FxApi {

    /// 各種URL・アクセスキーで初期化
    this(string apiUrl, string streamingUrl, string accountId, string accessToken) {
        this.apiUrl_ = apiUrl ~ API_VERSION;
        this.streamingUrl_ = streamingUrl ~ API_VERSION;
        this.accountId_ = accountId;

        // 認証トークン設定
        this.http_ = HTTP();
        this.http_.addRequestHeader(AUTH_HEADER, "Bearer " ~ accessToken);
    }

    /// 取引可能な通貨を返す
    string[] getInstruments() {
        auto url = apiUrl("instruments", ["fields": "instrument"]);
        auto json = parseJSON(get(url, http_));
        return json["instruments"].array.map!(e => e["instrument"].str).array;
    }

    /// 指定通貨のプライスを受信する
    void receivePrices(string[] instruments, void delegate(Price price) dg) {
        // ストリーミング受信の実行
        auto params = ["instruments": to!string(joiner(instruments, "%2C"))];
        auto url = streamingUrl("prices", params);

        // レスポンスは継続的に返って来るので、byLineAsyncを使用する必要がある。
        foreach(line; byLineAsync(url, KeepTerminator.no, '\x0a', 1024, http_)) {
            foreach(string key, value; parseJSON(line)) {
                // tick以外にheartbeatも来るので注意
                if(key != "tick") {
                    continue;
                }

                // プライス構造体に変換して処理を行う
                dg(Price(value["instrument"].str, value["time"].str, value["ask"].toString, value["bid"].toString));
            }
        }
    }

private:
    enum API_VERSION = "/v1/";
    enum AUTH_HEADER = "Authorization";

    /// アカウントIDパラメーター付きAPI用URLを生成する
    string makeUrl(string url, string apiName, string[string] params) {
        auto app = appender!string(url);
        app ~= apiName;
        app ~= "?accountId=";
        app ~= accountId_;
        foreach(k, v; params) {
            app ~= '&';
            app ~= k;
            app ~= '=';
            app ~= v;
        }
        return app.data;
    }

    /// API用URLの生成
    string apiUrl(string apiName, string[string] params = null) {
        return makeUrl(apiUrl_, apiName, params);
    }

    /// ストリーミング用URLの生成
    string streamingUrl(string apiName, string[string] params = null) {
        return makeUrl(streamingUrl_, apiName, params);
    }

    HTTP http_;
    string apiUrl_;
    string streamingUrl_;
    string accountId_;
}

/// メイン関数
void main() {
    immutable FX_API_HOST = environment["FX_API_HOST"];
    immutable FX_STREAMING_HOST  = environment["FX_STREAMING_HOST"];
    immutable FX_ACCOUNT_ID = environment["FX_ACCOUNT_ID"];
    immutable FX_ACCESS_TOKEN = environment["FX_ACCESS_TOKEN"];

    auto fxApi = FxApi(FX_API_HOST, FX_STREAMING_HOST, FX_ACCOUNT_ID, FX_ACCESS_TOKEN);
    auto instruments = fxApi.getInstruments();
    writeln(instruments);
    fxApi.receivePrices(instruments, price => writeln(price));
}

これで、環境変数に各種URL・アクセストークンを設定すればとりあえず受信できます。

問題点

いろいろ言いたいことはあるでしょうが、プライスを受信するにあたっての上記コードの問題点は下記の通りです。

  • プライス受信はほぼ1週間途切れることなく続く。受信時はなるべくGCアロケータからメモリを確保したくない。
    • D言語はGCがあまり効率的でないらしいので……。
    • ポインタとの相互運用やら開発リソースの問題
  • 毎回のJSON解析が明らかに遅そう。
  • そもそも1行1レコードと仮定していて大丈夫なのか。改行なしで連続送信されるような仕様に突然変わったらどうするか。

なるべく速く効率的に解析したい

上記の問題はすべてプライスデータの解析に関わる問題です。標準のstd.jsonよりも軽くて空間効率の良い方法があればそちらを採用したくなります。

必要な条件は下記の通りです。

  • 受信中にGCアロケーターやヒープをなるべく使用しない。
    • 起動時に1回だけなどならOK
    • 確保量が一定で増加しなければOK
  • ボトルネックにならない程度に速い。
    • いまどきなら、上記メモリ確保問題が解決されていればOKなはず。
  • 解析が柔軟に行えるよう。
    • 仕様変更があってもどうにかできる柔軟性が必要になる。

パーサーを書き始める

というわけで今年もパーサーを書いてしまいました……。

今回はランタイム限定で、解析中のメモリ消費量を極力抑えらえるよう工夫してあります。
また、セマンティックアクションの発生が全体成功事の1回のみ起こるようになっています。

自作ライブラリ使用前の準備

さて、ライブラリを使って解析を始める前にいくつか準備が必要です。

断片的な受信データをどう連続的に扱うか

受信データは往々にして下記のように来ます。

foreach(const(ubyte)[] chunk; byChunkAsync(url, 1024, 256, http)) {
    // 受信するごとに受信した分だけデータが渡される
    onReceive(chunk);
}

でも、解析するときは下記のように連続的にアクセスして解析したいです。

// sourceから文字を連続取得。場合によってはバックトラックも……。
parseJson(source);

この溝を埋めるには……終わりがあるデータの受信なら、とりあえずバッファなり中間ファイルに吐き出して連続アクセスすれば良いのですが、プライスデータは終わりがありません。ヒープもあまり使いたくないし、バッファにとりあえず突っ込む方法は今回は避けました。

この問題を解決するために、Fiberを使用します。D言語はちゃんと標準ライブラリにFiberがあるのでした。

// Fiberを内部で使用したGeneratorで、断続的な受信をRangeに見せかける
auto g = new Generator!(ubyte[])({
    foreach(const(ubyte)[] chunk; byChunkAsync(url, 1024, 256, http)) {
        // 呼び出し元にchunkを返す
        yield(chunk);
    }
});

// rangeとして使用可能
writeln(g.front);
g.popBack();
writeln(g.front);

さて、上記ではRangeの要素がubyte[]ですが、できればこれをubyteにしたいです。もちろん全体をjoinerすれば良いのですが、やっぱりヒープを触りたくありません。Rangeをflattenする仕組みが必要です。

標準で良いツールがなかったので、Rangeを返すRangeを要素のRangeにするFlattenRangeというものをkaisekiライブラリの中に作って見ました。

/// rangeのrangeをflattenしてrangeにする
struct FlattenRange(R) {
    static assert(isInputRange!(ElementType!R));
    alias InnerRange = ElementType!R;
    alias Element = ElementType!InnerRange;

    this(R range) {
        this.range_ = range;

        // 最初のrangeを参照する
        moveNextRange();
    }   

    @property {
        Element front()
        in {
            assert(!empty);
        } body {
            return currentRange_.front;
        }   
        bool empty() {return empty_;}
    }   

    void popFront()
    in {
        assert(!empty);
    } body {
        // 現在のrangeがまだ残っていたら次へ移動
        if(!currentRange_.empty) {
            currentRange_.popFront();
            if(!currentRange_.empty) {
                return;
            }   
        }

        // 次のrangeに移る
        range_.popFront();
        moveNextRange();
    }

private:
    /// 次のrangeに移る
    void moveNextRange() {
        empty_ = true;
        for(; !range_.empty; range_.popFront()) {
            currentRange_ = range_.front;
            if(!currentRange_.empty) {
                // 空でないrangeがあった
                this.empty_ = false;
                break;
            }
        }
    }

    bool empty_;
    R range_;
    InnerRange currentRange_;
}

auto flatten(R)(R range) {return FlattenRange!R(range);}

///
unittest {
    assert(flatten(["abc", "de", "f", ""]).equal("abcdef"));
    assert(flatten(["", "", "", ""]).empty);
    assert(flatten(["", "a", "b", "c"]).equal("abc"));
}

これで配列の配列がflattenされて配列になったりするようになりました。これを先ほどのGeneratorと組み合わせることにします。

パーサーの定義

kaiskiライブラリはいまのところとりあえず式テンプレートでパーサーが構成できます。

今回解析するデータはこんな感じです。

{"tick":{"instrument":"AAA_BBB","time":"2314-12-12T11:23:54.012345Z","bid":0.523,"ask":1230.793}}
{"heartbeat":{"time":"2314-12-12T11:23:54.123456Z"}}
{"tick":{"instrument":"AAA_BBB","time":"2314-12-12T11:23:54.012345Z","bid":0.523,"ask":1230.793}}
{"tick":{"instrument":"AAA_BBB","time":"2314-12-12T11:23:54.012345Z","bid":0.523,"ask":1230.793}}

これを解析するプライス受信パーサーは下記の通りです。

price.d
/// プライス情報を解析し、ファイルに出力する
void parsePrice(R)(Context!R context, ref File file) if(isInputRange!R) {
    alias ualpha = parseRange!('A', 'Z');
    alias cur = oneOrMore!(ualpha);
    alias num = oneOrMore!(parseRange!('0', '9'));
    alias dot = parseChar!'.';
    alias us = parseChar!'_';
    alias hy = parseChar!'-';
    alias cl = parseChar!':';
    alias t = parseChar!'T';
    alias z = parseChar!'Z';
    alias time = sequence!(num, hy, num, hy, num, t, num, cl, num, cl, num, dot, num, z);
    alias price = sequence!(num, dot, num);

    // 通過受信時
    alias instrument = action!(sequence!(cur, us, cur), (const(ubyte)[] match) {
        file.rawWrite(match);
    });

    // プライス時刻受信時
    alias priceTime = action!(time, (const(ubyte)[] match) {
        file.rawWrite(",");
        file.rawWrite(match);
    });

    // 売値受信時。切り出した文字列をそのまま出力する
    alias bid = action!(price, (const(ubyte)[] match) {
        file.rawWrite(",");
        file.rawWrite(match);
    });

    // 買値受信時。切り出した文字列をそのまま出力する
    alias ask = action!(price, (const(ubyte)[] match) {
        file.rawWrite(",");
        file.rawWrite(match);
        file.rawWrite("\n");
    });

    // 1レコード解析
    alias priceRecord = sequence!(
        parseString!`{"tick":{"instrument":"`, instrument,
        parseString!`","time":"`, priceTime,
        parseString!`","bid":`, bid,
        parseString!`,"ask":`, ask,
        parseString!`}}`);

    // heartbeatも一応解析。認識だけで何もしない。
    alias heartbeatRecord = sequence!(
        parseString!`{"heartbeat":{"time":"`, time, parseString!`"}}`);

    // プライスでもheartbeatでもない部分のスキップ
    alias skip = parseAny;

    // プライスかheartbeatを解析、認識できない文字はスキップ
    alias parse = zeroOrMore!(choice!(
        priceRecord, heartbeatRecord, skip));

    // 解析実行
    parse(context);
}

mainはこんな感じになります。

price.d
// 断片的なプライス受信をGeneratorでconst(ubyte)[]のrangeにする
auto r = new Generator!(const(ubyte)[])({
    fxApi.receivePrices(instruments, chunk => yield(chunk));
});

// flattenでconst(ubyte)[]のrangeをconst(ubyte)のrangeにする。
auto bytes = flatten(r);

// const(ubyte)のrangeを解析して標準出力に出力
context(bytes).parsePrice(stdout);

結果

とりあえずこれでプライスデータがCSVとして吐き出されるようになりました。

SGD_CHF,2016-12-09T21:59:58.111848Z,0.71,0.71169
SGD_HKD,2016-12-09T21:59:58.110851Z,5.41964,5.42785
SGD_JPY,2016-12-09T21:59:58.122502Z,80.518,80.701
TRY_JPY,2016-12-09T21:59:58.112651Z,33.091,33.234
USD_CAD,2016-12-09T21:59:58.114766Z,1.31769,1.3185
USD_CHF,2016-12-09T21:59:58.114886Z,1.01647,1.01739
USD_CNH,2016-12-09T21:59:58.115565Z,6.92501,6.92851
USD_CZK,2016-12-09T21:59:58.114235Z,25.53966,25.64215
USD_DKK,2016-12-09T21:59:58.113902Z,7.03929,7.04481
USD_HKD,2016-12-09T21:59:58.114817Z,7.75881,7.75971
USD_HUF,2016-12-09T21:59:58.115712Z,297.59,298.569
USD_INR,2016-12-09T21:59:58.114709Z,67.259,67.509
USD_JPY,2016-12-09T21:59:58.114305Z,115.28,115.36
USD_MXN,2016-12-09T21:59:58.114686Z,20.3682,20.419
USD_NOK,2016-12-09T21:59:58.114966Z,8.48711,8.52537
USD_PLN,2016-12-09T21:59:58.115077Z,4.20612,4.22341
USD_SAR,2016-12-09T21:59:58.115878Z,3.74625,3.75563
USD_SEK,2016-12-09T21:59:58.114925Z,9.16349,9.20267
USD_SGD,2016-12-09T21:59:58.115916Z,1.42961,1.43161
USD_THB,2016-12-09T21:59:58.115118Z,35.503,35.803
USD_TRY,2016-12-09T21:59:58.115740Z,3.47165,3.48209
USD_ZAR,2016-12-09T21:59:58.115394Z,13.76742,13.83379
ZAR_JPY,2016-12-09T21:59:58.116171Z,8.331,8.386

当初はJSONのまま受信していて、1日で数GBに達してしまっていたので、これならもう少しコンパクトにできそうです。
まだ週末に作ったので長時間受信は試していないですが、来週1週間受信して、BigQueryに突っ込むなりしていろいろ遊んでみようと思います。

終わり

システムトレード、まだプライス受信くらいしかやっていないのですが、すでにVPS代やら作業用カフェ代で大赤字です。このさきプライスデータをBigQueryに格納したりするとさらに費用が嵩みそうです。

でも、D言語をDockerで動かして通信やら計算をさせるのは楽しいので、趣味としてはなかなか面白いです。

余談ですが、DockerでD言語を動かすのはなかなか良いとようです。VMや余計なライブラリが不要なので、Dockerイメージがとてもコンパクトになります。

今回のプライス受信プロセスも、コンパイルのみホストマシンで行なって、libcurlのみ入れたalpineのDockerイメージで動かせます。

すみません、よく見たら自分でcentos:7使ってました(汗)

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?