シミュレーションを行うと時系列データの管理が問題になりますが、そのための簡単なライブラリを書いたので共有しておきます。
https://github.com/termoshtt/asink
Async sink wrapper for time-series data
使い方
crates.ioに上がってるのでCargoで指定するだけで使えます:
[dependencies]
asink = "0.1.1"
このライブラリではデータを順次流し込む先を Sink
と呼びます。Sinkはスレッドを一つ起動してチャンネル経由でもらったデータを保存していきます。
let sink = msgpack::MsgpackSink::from_str("test.msg");
let (s, th) = sink.run();
// execute simulation...
th.join().unwrap(); // Sinkのスレッドの終了を待つ
この s
は std::sync::mpscの Sender<T>
です。std::sync::mpsc
はその名の通り複数の生産者 (Multiple-Producer)から単一の消費者(Single-Consumer)にデータを送信できるチャンネルを提供します。送られたデータは容量無制限でFIFOに貯められ、Sink側のスレッドで消費されていきます。
送信されるデータの型 T
はT: Send + Serialize
を満たす型であれば何でも使えます。Serialize
はRust標準となった(?)シリアライズ方式であるserdeのtraitでstd
の標準的な型(Vec
等)の組み合わせで出来た型についてはあらかじめ実装されているうえ、自作の構造体についてもserde-deriveで簡単に実装できます:
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[derive(Serialize)]
struct Doc {
time: f64,
data: Vec<f64>,
}
保存先のバックエンドとしてJSON, msgpack及びMongoDBをサポートしています。いずれもあらかじめスキーマを決める必要がないため、serde-jsonやrmp-serdeを経由してシリアライズができます。
正しJSONとmsgpackは受け取ったデータ毎にシリアライズするため、JSON lines
{"a": 1, "b": [1, 2, 3]}
{"a": 2, "b": [2, 2, 3]}
やmsgpack streamとしてシリアライズされます。MongoDBの場合にはbsonによってシリアライズされますが、MongoDBに保存するにはBSON::Document
である必要があるのでHashMap
である必要があり、T = Vec<f64>
のようなものは保存できません。上で定義したようなserde-deriveを使用する場合、構造体は自動的にHashMap
になるのでその点は問題ないと思います。
Simulationから保存に関するコードを分離
fn experiment(s: Sender<Doc>) {
for i in 0..5 {
// 計算のメインループ
let doc = Doc {
id: i,
data: vec![i as f64],
};
s.send(doc).unwrap(); // データをSinkに投げる
}
}
fn main() {
let sink = msgpack::MsgpackSink::from_str("test.msg");
let (s, th) = sink.run();
experiment(s);
th.join().unwrap();
}
このようにデータをどこに保存するか考えずにシミュレーションの本体を記述することが可能となります。