Line-delimited JSON
NDJSON, LDJ, JDJSON, JSONLなどとも呼ばれることがある、1行1JSONのテキストことです。Dockerのjson-logや、jqがデフォルトで受け付ける入力としても有名ですね。
この形式のよいところは、対応しているツールが多いために、フォーマット変換を自前でやらなくても本質的な処理だけをパイプで繋げられる点です。
パイプラインの一部となるツールを自分で実装する場合は、入力と出力にこの形式を採用できないか検討すると、ツールの再利用性が高まります。
Ref: https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON
bethos
様々なプロトコルから/へのIN/OUTをつないで、Fan-in/outなど様々なパターンでルーティングすることができるstream multiplexer serviceと呼ばれるアプリケーション。
- Amazon (S3, SQS)
- File
- HTTP(S)
- Kafka
- MQTT
- Nanomsg
- NATS
- NATS Streaming
- NSQ
- RabbitMQ (AMQP 0.91)
- Redis
- Stdin/Stdout
- ZMQ4
入出力にそれぞれ標準入力と標準出力を指定できるので、bash pipelineの一部として利用することもできる。
標準入力を利用する場合、1行1メッセージのセマンティクス以外に、1行1パート・空行がメッセージの区切り・・・というマルチパートのセマンティクスも利用できて便利。
sourceapp | benthos -c ./config.yaml
mirocule
SDK and CLI for spawning streaming stateless HTTP microservices in multiple programming languages
20以上のプログラミング言語で書かれたスニペットをHTTPサービスにラップするツール、とでも言えばよいだろうか。
実は標準入力からリクエストを受けることができるので、出力を標準出力にしておけばPipelineの一部としても利用できる。
tail -f ReadMe.md | microcule --stream=true ./examples/services/streams/echo.js
Linux Pipeline Programmingの文脈では、
- Webアプリケーションに仕立て上げたPipelineをHTTPでつなぐことで、PipelineをステートレスなWebアプリケーションとして水平スケールさせられる
という点が面白い。
hook.io
miroculeベースのWebhook as a Serviceのためのツール。
Linux Pipeline Programmingの文脈だと、
ベースとなるMiroculeが与える特性に加えて
- Webアプリケーションに仕立て上げたPipelineをHTTPでつなぐことで、PipelineをステートレスなWebアプリケーションとして水平スケールさせられる
- その管理ツール・コンソールを提供してくれる
という点が面白い。
esbulk
1行1 JSON形式の入力をElasticsearchにバルクロード(index)してくれるツール。パイプラインの終端におけば、入力を変換したのちESに効率的にロードするパイプラインが組める。
cat docs.ndjson | esbulk -index example file.ldj
bashful
Bashコマンドを並列に、または選択的に実行し、実行経過・結果をヒューマンフレンドリーな形式でリアルタイム表示してくれるツール。
Linux Pipeline Programmingの文脈では、複数のPipelineから構成されるDurableでないWorkflowを、人間が実行する場合の良いインターフェースになる。
ref: https://github.com/wagoodman/bashful
小ネタ
inotify...
(いまさらだが)ローカルファイルの変更検知をこの文脈で気軽にやる方法はいまだにinotifyなので、覚えておいて損はない
参考: https://superuser.com/questions/181517/how-to-execute-a-command-whenever-a-file-changes
bashでcommand-line flags...
(こちらもいまさらだが)bashスクリプト2引数以上とるようになってきたら2つ目以降はオプションとみなせるケースも多い。
そういう場合はコマンドラインフラグを活用する。
参考: https://stackoverflow.com/questions/192249/how-do-i-parse-command-line-arguments-in-bash
seq & gnuparallel & xargs ...
(いまさらだが)bashスクリプト内で気軽にmap/reduceスタイルの並列処理を行いたい場合、この3つの組み合わせはイディオムとして覚えておくと便利。
例えば、以下の例では1-10番の入力を並列に処理して、結果を番号順に連結する。
seq 1 10 | parallel './dosomething foo/bar/{} > intermediate-{}.json'
seq 1 10 | xargs -I{} cat intermediae-{}.json > result.ndjson
未来
pachyderm
Kubernetes上で「ファイルを読み書きする任意のLinuxプログラム」を分散実行させてパイプラインをつくることができる、Pachydermというプラットフォームがある。
PachydermのIssueでは、以下のようなFeature Requestがでている
これらができるようになると、Linux Pipelineをコンテナ化して分散実行することができ、さらにLinux Pipelineが気軽に、高速に大量データのストリーミング処理に応用できる範囲が増えると思われる。
Argo
Kubernetes上で「コンテナ化されたアプリケーションのワークフロー」を実現するための、Argoというシステムがある。
Pipelineを「ファイルを読んでSTDIN経由でパイプラインに入力し、パイプラインの出力をファイルに書き出すようなコンテナ」でラップしてやれば、一応Linxu Pipelineから構成されるWorkflowをKubernetes上で実行することができる。
ただし、仕組み上、Argo単体で低レイテンシーなストリーミング処理はできないので、多くのケースでLinux Pipelineに求められると思われる「低レイテンシーなストリーミング処理」というユースケースには合わなさそうではある。