MongoDB
rust
msgpack

時系列データを保存する

シミュレーションを行うと時系列データの管理が問題になりますが、そのための簡単なライブラリを書いたので共有しておきます。

https://github.com/termoshtt/asink
Async sink wrapper for time-series data

使い方

crates.ioに上がってるのでCargoで指定するだけで使えます:

Cargo.toml
[dependencies]
asink = "0.1.1"

このライブラリではデータを順次流し込む先を Sink と呼びます。Sinkはスレッドを一つ起動してチャンネル経由でもらったデータを保存していきます。

let sink = msgpack::MsgpackSink::from_str("test.msg");
let (s, th) = sink.run();
// execute simulation...
th.join().unwrap();  // Sinkのスレッドの終了を待つ

この sstd::sync::mpscSender<T> です。std::sync::mpscはその名の通り複数の生産者 (Multiple-Producer)から単一の消費者(Single-Consumer)にデータを送信できるチャンネルを提供します。送られたデータは容量無制限でFIFOに貯められ、Sink側のスレッドで消費されていきます。

送信されるデータの型 TT: Send + Serializeを満たす型であれば何でも使えます。SerializeはRust標準となった(?)シリアライズ方式であるserdeのtraitでstdの標準的な型(Vec等)の組み合わせで出来た型についてはあらかじめ実装されているうえ、自作の構造体についてもserde-deriveで簡単に実装できます:

extern crate serde;
#[macro_use]
extern crate serde_derive;

#[derive(Serialize)]
struct Doc {
    time: f64,
    data: Vec<f64>,
}

保存先のバックエンドとしてJSON, msgpack及びMongoDBをサポートしています。いずれもあらかじめスキーマを決める必要がないため、serde-jsonrmp-serdeを経由してシリアライズができます。
正しJSONとmsgpackは受け取ったデータ毎にシリアライズするため、JSON lines

{"a": 1, "b": [1, 2, 3]}
{"a": 2, "b": [2, 2, 3]}

やmsgpack streamとしてシリアライズされます。MongoDBの場合にはbsonによってシリアライズされますが、MongoDBに保存するにはBSON::Documentである必要があるのでHashMapである必要があり、T = Vec<f64>のようなものは保存できません。上で定義したようなserde-deriveを使用する場合、構造体は自動的にHashMapになるのでその点は問題ないと思います。

Simulationから保存に関するコードを分離

fn experiment(s: Sender<Doc>) {
    for i in 0..5 {
        // 計算のメインループ
        let doc = Doc {
            id: i,
            data: vec![i as f64],
        };
        s.send(doc).unwrap(); // データをSinkに投げる
    }
}

fn main() {
    let sink = msgpack::MsgpackSink::from_str("test.msg");
    let (s, th) = sink.run();
    experiment(s);
    th.join().unwrap();
}

このようにデータをどこに保存するか考えずにシミュレーションの本体を記述することが可能となります。