MessagePackはTreasure Dataの古橋さんを中心として提案・実装されているデータのシリアライズフォーマットで、fluentdが互いにデータ通信をする際のフォーマットとしてよく知られていると思います。個人的にはこれをデータ分析をする際のデータの保存に活用することが多く、この記事はその手法の紹介になります。
なお、記事はなるべく正確に書くよう努めていますが、データ分析などについてはわりと素人なのでご指摘・コメントなどは歓迎です。
MessagePackを使って何をするのか
やりたいことは、分析をするために取得したデータをMessagePack形式にしてファイルに保存し、分析時にそのデータを読み出す、それだけです。
通常、データ分析などをする場合はデータベースなどに一度格納してから分析を実施するのが王道だと思われますが、以下のようなケースではMessagePack形式での保存・読み込みメリットがあると考えます。
MessagePackを使うと嬉しい場合
スキーマが不明確なデータを扱う場合
例えばあるシステムから出力されるデータが辞書形式のJSONだとして、レコードごとに含まれているキーが違ったり、値として入るものの形式がまちまちだったり、辞書形式がくるのか配列形式が来るのかわからなかったり、といったことがあります。さらにキーや値が変わるとしてもそういったスキーマというか仕様がちゃんと定義されているのであればいいですが、仕様すら無く現物のデータから推定するしかない、という場合もあります(あった)。
このような場合、MessagePackは辞書や配列などの階層構造もほぼそのまま変換して使うことができるので、ひとまず何も考えずにファイルに突っ込んでおく、ということが容易にできます。データ分析のためにデータを取得する場合、データのI/Oがボトルネックになることが少なくないので、ひとまずファイル形式でローカルに保存しておくと後から試行錯誤がやりやすくなります。
一時的にデータを保存したい場合や反復的にデータの保存・読み込みを繰り返したい場合
データ分析と一言で言ってもいろいろですが、十分な仮説が立てられない段階でデータをこねくり回しながら全体像を把握し仮説を立てる「探索型データ分析」が必要なことも少なくありません。このようなときは都度データを処理しやすい形式に変換したり、必要なデータだけを抜き出して保存したり、というような作業を繰りかえします。その際には「ちょっとこの属性も追加したいな」とか「リストじゃなくて辞書型で保存したほうがよさそう」など、データの形式についても試行錯誤することがあるのではと思います。
このようにデータ形式を柔軟に変更しながら処理をまわしていく場合、データを処理するコード以外の場所で改めてスキーマを定義する(例えばSQLでテーブルを作る、など)とコード側を変えたときなどにいちいち不整合がおこって大変煩わしい思いをすることがあります。一方でMessagePack形式でデータを保存しながらやる場合、もちろん書き出しと読み込みの双方でデータ形式を一致させる必要はありますが、作業は最低限ですみます。
(とは言いつつある程度ちゃんとコメントなどを残しておかないと、数カ月後にそのコードとかを自分で見直してもさっぱり意味がわからない、というようなことが起きたりするわけですが…)
データを高速に保存・読み込み(ただし全文)する必要がある場合
ミドルウェアとしてのDBは単純にデータを保存するだけでなく、キーを設定したり信頼性の保証をしたりなど様々な機能を提供しているため、単純にディスクに書き込む処理以上のオーバーヘッドがかかります。これは負荷分散などのテクニックによって解決するかもしれませんが、単純に「ちょっとデータを保存しておくだけ」という用途であれば、MessagePackの形式に変換した上で直接ファイルとして書き込めば手間もかかりませんし、ファイル書き込み・読み込みの性能がほぼそのまま適用されます。
ただし、信頼性などはファイルへの書き込みと同等のものになりますし、読み込み時は全データを読み込むことが前提となります。
MessagePackを使うと嬉しくない場合
一方、当然ながらMessagePack形式で保存してデータ分析をするのが不向きな場合もあります。
- スキーマがはっきりしているデータや、もともとDBなどで整理されているデータを使う場合
- おとなしくDBが提供してくれる機能を使うべき
- 検索や集計など、キーやインデックスを作成することの恩恵が大きい分析をする場合
- データを読み込む場合は基本的に全検索するためデータ量に対して処理が線形に増える
- 体感的にはMessagePackで保存するデータは100万件程度が上限の目安。それ以上になるならまじめにDBへの格納などを考えるべき
- 複数人で同じデータをあつかう場合
- 今回は1つのファイルの読み書きを前提としているので、ロックなどは考えてられていない
- データに対して機密性、完全性、可用性をしっかり提供する場合
- 単一ファイルに書き出す場合、レコードごとにアクセスを制御するというようなことはできない
- 思わずファイルを消してしまった、ということも容易に起こりうるのでそのあたりをよく考慮する必要あり
代替案
以下の技術が代替手段として考えられます。状況に合わせて選択してください。
CSV
データがほぼ固定長のカラム形式で階層構造になっていないのであればCSV(ないしTSV)を使う方が良いでしょう。しかし、可変長の要素や階層構造がふくまれる場合はMessagePackを使ったほうが楽です。
MongoDB
ドキュメント指向DBでスキーマを定義せずともinsertできるため、とりあえずデータを保存するという点からは同じことができます。ただinsertの性能は3,500 insert/sec あまりらしく、MessagePackで書き込む場合はディスクに直接書き込むだけなので、disk I/Oの性能がそのままでるMessagePackでの保存の方が圧倒的に速くなります。ただし後からキーなどを作成する用途があるならMongoDBの方が向いているでしょう。
JSON, BSON
処理速度やデータサイズでMessagePackとくらべて不利(参照)とのことです。またJSONは1つのデータセグメント(例えば1ファイル)内に複数のオブジェクトをいれようと思うとijsonのような段階的に読み出すというモジュールなどを使わないと一括でparseしないといけないのでデータの件数が多くなると貧弱なマシンでは処理が厳しくなります。一方でijsonのような書き方だとコードが煩雑になってしまうため、個人的には1つのデータセグメント内に連続してデータを格納ができ素直にとりだしができる形式のほうが楽だと考えます。
Protocol Buffers
シリアライズ技術の一つとして有名なProtocol Buffersですが、処理するコード側でスキーマを定義してあげる必要があるため、スキーマが不明確なデータを扱うには手間がかかります。ソフトウェア間でのインターフェースとしてのデータシリアライズとして見ると、スキーマを規程するので便利だと考えられますが、どのようなスキーマのデータがくるかわからないようなケースでは扱いが難しくなります。
サンプルコード
公式ページに充実した解説があるので多くを語る必要はないのですが、ファイルへの書き込み・読み込みだけにしぼってサンプルコードを紹介します。コードはgithubにも置いてあります。
全てのケースで以下のようなデータを data.msg
ファイルに書き込み・読み込みするものとします。
{
"name": "Alice",
"age": 27,
"hist": [5, 3, 1]
}
{
"name": "Bob",
"age": 33,
"hist": [4, 5]
}
Python
パッケージ msgpack-python
が必要です。
インストール
$ pip install msgpack-python
書き込みサンプルコード
# coding: UTF-8
import msgpack
obj1 = {
"name": "Alice",
"age": 27,
"hist": [5, 3, 1]
}
obj2 = {
"name": "Bob",
"age": 33,
"hist": [4, 5]
}
with open('data.msg', 'w') as fd:
fd.write(msgpack.packb(obj1))
fd.write(msgpack.packb(obj2))
読み込みサンプルコード
# coding: UTF-8
import msgpack
for msg in msgpack.Unpacker(open('data.msg', 'rb')):
print msg
Ruby
パッケージ msgpack
が必要です。
$ gem install msgpack
書き込みサンプルコード
# -*- coding: utf-8 -*-
require "msgpack"
obj1 = {
"name": "Alice",
"age": 27,
"hist": [5, 3, 1]
}
obj2 = {
"name": "Bob",
"age": 33,
"hist": [4, 5]
}
File.open("data.msg", "w") do |file|
file.write(obj1.to_msgpack)
file.write(obj2.to_msgpack)
end
読み込みサンプルコード
# -*- coding: utf-8 -*-
require "msgpack"
File.open("data.msg") do |file|
MessagePack::Unpacker.new(file).each do |obj|
puts obj
end
end
Node
いくつかメジャーなMessagePack用ライブラリがありますが、今回は msgpack-lite
を使ったコードにします。
$ npm install msgpack-lite
書き込みサンプルコード
const fs = require('fs');
const msgpack = require('msgpack-lite');
const obj1 = {
name: "Alice",
age: 27,
hist: [5, 3, 1]
};
const obj2 = {
name: "Bob",
age: 33,
hist: [4, 5]
};
fs.open('data.msg', 'w', (err, fd) => {
fs.writeSync(fd, msgpack.encode(obj1));
fs.writeSync(fd, msgpack.encode(obj2));
});
読み込みサンプルコード
const fs = require('fs');
const msgpack = require('msgpack-lite');
var rs = fs.createReadStream('data.msg');
var ds = msgpack.createDecodeStream();
rs.pipe(ds).on('data', (msg) => {
console.log(msg);
});
C++
msgpackc
ライブラリが必要です。macOSの場合はbrewでインストールできます。
$ brew install msgpack
書き込みサンプルコード
#include <msgpack.hpp>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
int fd = open("data.msg", O_WRONLY | O_CREAT, 0600);
msgpack::sbuffer buf1, buf2;;
msgpack::packer<msgpack::sbuffer> pk1(&buf1), pk2(&buf2);
pk1.pack_map(3);
pk1.pack("name"); pk1.pack("Alice");
pk1.pack("age"); pk1.pack(27);
pk1.pack("hist");
pk1.pack_array(3);
pk1.pack(5); pk1.pack(3); pk1.pack(1);
write(fd, buf1.data(), buf1.size());
pk2.pack_map(3);
pk2.pack("name"); pk2.pack("Bob");
pk2.pack("age"); pk2.pack(33);
pk2.pack("hist");
pk2.pack_array(2);
pk2.pack(4); pk2.pack(5);
write(fd, buf2.data(), buf2.size());
close(fd);
return 0;
}
読み込みサンプルコード
#include <msgpack.hpp>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
int main(int argc, char *argv[]) {
static const size_t BUFSIZE = 4; // あえてバッファサイズ小さめに
int rc;
char buf[BUFSIZE];
int fd = open("data.msg", O_RDONLY);
msgpack::unpacker unpkr;
while (0 < (rc = read(fd, buf, sizeof(buf)))) {
unpkr.reserve_buffer(rc);
memcpy(unpkr.buffer(), buf, rc);
unpkr.buffer_consumed(rc);
msgpack::object_handle result;
while (unpkr.next(result)) {
const msgpack::object &obj = result.get();
if (obj.type == msgpack::type::MAP) {
printf("{\n");
msgpack::object_kv* p(obj.via.map.ptr);
for(msgpack::object_kv* const pend(obj.via.map.ptr + obj.via.map.size);
p < pend; ++p) {
std::string key;
p->key.convert(key);
if (key == "name") {
std::string value;
p->val.convert(value);
printf(" %s: %s,\n", key.c_str(), value.c_str());
}
if (key == "age") {
int value;
p->val.convert(value);
printf(" %s: %d,\n", key.c_str(), value);
}
if (key == "hist") {
msgpack::object arr = p->val;
printf (" %s, [", key.c_str());
for (int i = 0; i < arr.via.array.size; i++) {
int value;
arr.via.array.ptr[i].convert(value);
printf("%d, ", value);
}
printf ("],\n");
}
}
printf("}\n");
}
result.zone().reset();
}
}
return 0;
}
ちなみにmsgpack::object形式は ostream
(std::cout
など)に放り込むと勝手にフォーマットを整形して表示してくれますが、プログラム的に値を取り出すために上記のような面倒な手順をサンプルとして記載しています。