この記事はAWS Advent Calandarの21日目の記事です。
これは個人のメモ/意見であり、私の所属する組織を代表するものではありません。また、記事中のコード片はすべてMITライセンスにて提供されるものとします。
サマリと背景
テーマとしては、Amazon Kinesisとのプログラマとしての付き合い方について考えてみたという話。「Kinesisでリアルタイム分析だ!」とか「IoTだ!」とか「ストリーム処理だ!」みたいな感じにマジックワードが飛び交っていて、実際の実装の話があまり世に出てこないのに違和感を感じたのがこの記事を書こうと思った発端。
違和感を書き連ねるだけだとアレなので、Kinesisをもっとプログラマフレンドリにするにはどうしたらいいんだろう、というのを考えてみた。
あと、RxJSを触りたかったというのもある笑
Amazon Kinesis
2013年のRe:Inventで発表されたサービスで、おおまかに以下のような特徴を持っている
- 高い信頼性とスケーラビリティを持ったストリームデータの受け取りと保存
- 受け取ったデータをPartition Keyによってシャッフル&ソートしてコンシューマーに渡してくれる
- MapReduceモデルに当てはめると、リアルタイム案なMapperのような動きをしてくれる
- IoTやリアルタイム分析の文脈の中で取り上げられることが多い
現時点で感じていること: まだまだインフラサービスだよね
見出しの通りなのだが、Kinesisはインフラの文脈で語られていることが多い。機能や性能など、Whatについては多く語られているのだが、どう使うのかというHowについてはぼんやりとしている。もっとアプリケーションの話をしたいんだけど、じゃあそこに足りないものは何なのか。
いろいろあると思うが、サービスをうまく抽象化してプログラムの一モデルとして取り扱うための抽象化ライブラリが足りていないのでは、とう点について今回は考えたい。
例えば下記のようにひとつのShardを持ったKinesis Streamからデータを受け取って処理をするConsumerプロセスをEC2上にひとつ起動する、という非常に単純なことをやるにしても
必要な作業手順としては
- Kinesisのストリームをつくる
- Consumerのコードを書く
- EC2を起動して必要なランタイムやライブラリをインストールする
- ConsumerのコードをEC2にデプロイ
- Shard-0を狙ってConsumer-0を起動する
となってくる。アプリケーションを書きたいのに、実はやっていることのほとんどがインフラの準備という状態になってしまっていて、ちょっとめんどくさいよね。Shardが複数になってきたり、Streamが多段になってきたりするとこの手間は指数関数的に増えていくわけだし。「アプリケーション書いたらあとはKinesisやEC2、その上で動くコードはよしなにデプロイしてくれる」みたいな状態にしていきたいなーと思っている。
インフラじゃなくてモデルやオブジェクトとして扱おう。フレームワークでサービスを隠蔽しよう
じゃあどうしたらいいのという話。
例えばAmazonから提供されているKinesis Client Library(以下、KCL)(Java, Python)というものがある。今回はKCLとは別路線を考えたいと思っているが、その前にKCLが提供してくれる/してくれないことを整理すると以下のとおり。
KCLが提供してくれること
- 直接Kinesisを触る部分の隠蔽。IRecordProcessorというクラスのInitialize、ProcessRecords、Shutdownという3つのインターフェイスを実装するだけでOK。
- チェックポイント処理の隠蔽。CheckpointerというクラスのCheckpointというメソッドを呼ぶだけでDynamoDBに「どこまで処理をしたのか」という冒険の書を作ってくれる
- 横方向のスケールとコーディネーション。KinesisのStreamが複数になってきた時に、KCLは起動するConsumerのプロセス数を自動調整してくれる。また、複数のホストでKCLが起動していても、前述のDynamoDBのテーブルを使って処理の重複などが起こらないようなコーディネーションをしてくれる。
KCLが提供してくれないこと
- 縦方向(ステップ)の連続性は自分で考慮しなければならない。KCLを多段にするときはKCLアプリケーションを複数作ってそれぞれ別物として扱う必要がある。
- デプロイも自分で面倒を見なければならない(AWSのビッグデータ番長のIan MeyersがElastic Beanstalkでデプロイしようぜ!といういいブログポストをしてくれているので興味ある方はそちらを是非。)
感覚的にいうと「マッシブなShard数/流量を持ったStreamをConsumeするアプリケーションをいかに楽に実装/運用できるようにするか」というのがKCLのコンセプトのように思う。
一方今回は「縦に長いパイプラインをいかに楽に実装してデプロイできるか」というのを考えたい。HadoopのMapReduceみたいな感じで「一枚のファイルに連続する処理ステップを書いたらあとはクラスタ上でコードが動く。しかも横方向のスケールも考慮してくれるぜ」みたいな感じのものがほしい。
ここから先は、実際にそんなかんじのフレームワークを書くにあたって(リリースできるかどうかはわかりません)のブレインストーミングをしてみる。言語はnode.jsで。
リアクティブプログラミング: Kinesisはデータストリーム
Kinesisは受け取ったデータを(Shuffle & Sortしたうえで)後続のConsumerにストリームのようなイメージで渡してくれる。なのでConsumerの実装は、基本的にはGoFのObserverパターンのような実装にするのが自然だと考えられる。リアクティブ!
RxJSの話。
それで、リアクティブなプログラミングモデルを提供するフレームワークを書くにあたって、今回はRxJSをいろいろ調べてみた。
JavaScriptにはPromiseやJSDeffered、RxJSなど、データストリームを処理するためのフレームワークがいろいろある。以前、社内で行われたハッカソンではPromiseっぽいものを自分で実装してこの仕組に近いものを作ってみた。今回はRxJSに興味があったので(遅っ!)RxJSベースで考えてみる。
RxJSの正式な名前はThe Reactive Extentions for JavaScriptで、Microsoft Open Technologies, Incによってホストされているプロジェクト。
このライブラリを使うことによってFRP(Funcitonal Reactive Programming)を簡単に実現できますよというのがウリなのだが、Functional ProgrammingもReactive Programmingもちゃんと語れるほど知識があるわけではないのでこれはまたこんどまとめたい。(いろいろドキュメントは読み漁ってみた。)
で、RxJSを使うとこんな感じにイベント処理が書ける。(サンプルコードは上記githubプロジェクトページのREADMEより引用)
var source = getStockData();
source
.filter(function (quote) {
return quote.price > 30;
})
.map(function (quote) {
return quote.price;
})
.forEach(function (price) {
console.log('Prices higher than $30: $' + price);
});
KinesisのStreamをどう隠蔽するか
こんな感じにコードが書ける。
var stream1 = new ObservableStream({stream: 'stream-1'});
stream1.Subscribe(
function(data){
doSomeWork(data)
}
);
これによりKinesisを直接扱うというよりはRxJSのコードを書く流れでKinesisを扱えるようになる。
ノード内での横方向のスケール
先ほどの擬似コードではKinesisの隠蔽をしただけで他にも課題が残っている。では、まず横方向のスケールをどう扱えばいいのか。イメージとしては下記のような状況。
で、僕としてはこんな感じにしてやればいいのかなと思っている。
このアプローチであれば先ほどとコードを変える必要はなく、フレームワーク側での処理を単純なEvent Loopの生成から、子プロセスをForkした上でそれぞれEvent Loopを生成、という処理に書き換えてやればよさそう。そうすれば取り扱うShardが増えても問題なさげ。
var stream1 = new ObservableStream({stream: 'stream-1'});
stream1.Subscribe(
function(data){
doSomeWork(data)
}
);//実はここでFork処理が挟まっている
ノードをまたいだ横方向のスケール
先ほどはひとつのノードの中でConsumerプロセスを増やす方法を考えたが、今度はノードをまたいでプロセスを増やす方法を考えたい。こういう感じ。
事前に定義されたリソースプールから、必要な分だけリソース(ノード)をshiftしてきてタスクに割り当ててくれる、YARNみたいなサービスがほしいなぁと思っていたところにちょうどECS(Elastic Container Service)がプレビューリリースされたので、これをプロビジョナに使えば以下のようなイメージにできそう。もちろん、EC2のAPIを使って実装することもできるが、リソースプールの仕組みを実装するのが手間(?)なのでECSが手っ取り早そう。あと、勉強不足でアレだけど、YARNそのものもこういう用途に使えたりするんだろうか。
ただし、この実装方法だと、実際のConsumerプロセスをManagerから直接Forkするわけではないので、関数の渡し方に工夫の必要が出てくる。どうしたらいいんだろう・・・ 関数をtoString()しちゃう・・?w
という課題はあるもののこの時点でもまだアプリケーションコードは下記のままで動くように作れるはず。
var stream1 = new ObservableStream({stream: 'stream-1'});
stream1.Subscribe(
function(data){
doSomeWork(data)
}
);//実はここでリソースプロビジョン処理が挟まっている
stream1.run(); //リソースプロビジョンがあるので、ヨーイドンのトリガーがあったほうがよさげ
縦方向のスケール
ここまでは横方向のスケールについて考えてきたが、次に縦方向のスケール、というかKinesisを挟んだ処理のチェインのさせ方について考えたい。のだが、今回はだいぶ長くなってきたので次回じっくり考えることにする。
いまのところは
var stream1 = new ObservableStream({stream: 'stream-1'});
var stream2 = new ObservableStream({stream: 'stream-2'});
stream1 // Stream1
.Subscribe( // Task1
function(data){
doSomeWork(data)
}
)
.toStream(stream2) // Stream2
.Subscribe( // Task2
function(data){
doAnotherWork(data)
}
).run();
みたいな感じに、RxJSの書き方の中で実現できればいいなぁと思っている。しかし、toStream()でKinesisを挟み、そこを分岐点にして前後の関数を別々にTaskとしてComposeしてノードに配る、のは大変そう、というかかなりトリッキーな実装になってしまいそう。なので下記のようなApache StormのTopologyっぽい書き方が割りと現実的かも、と現時点では思っている。
var stream1 = new ObservableStream({stream: 'stream-1'});
var stream2 = new ObservableStream({stream: 'stream-2'});
var tasks = new ChainedTasks([
{
stream: stream1,
task: function(){
this.source.Subscribe(function(data){
doSomeWork(data)
}
}
},
{
stream: stream2,
task: function(){
this.source.Subscribe(function(data){
doAnotherWork(data)
}
}
}
]);
tasks.run();
これならTaskをステップごとに切り分けてComposeがしやすそう。RxJSの書き方からはかけ離れてしまうけれども。
ローカルのテスタビリティ
あとはローカルでのテスタビリティが非常に大事だと思っている。実際に書いたコードを、分散環境にデプロイしないと試せないというのは非常にアレなので、以下のようにできるといいかな。
tasks.run({
kinesis: 'local', //本物のKinesisを使うかMockするかのフラグ
distributed: 'local' //ノード分散をするかlocalhostだけで動かすかのフラグ
});
まとめ
まだブレインストーミングの途中なのでまとまっているわけではないが、このポストのまとめとしては、Kinesisをもっとプログラマフレンドリに使うためのライブラリについて考えてみた、という話でした。もうちょっと考えを進めていって、冬休みの宿題的に実装・・・・したいなぁ。
あと、今回はKinesisを話の出発点にしているのでKinesisベースで話をしているが、LambdaやSWFも抽象化できるようにしてもよさげ。