Broadwayは、Elixirでデータ処理パイプラインを効率化するためのオープンソースのツールです。同時並行かつマルチステージでデータを統合し、データ処理のパイプラインが構築できます。Amazon SQSやRabbitMQなどのさまざまなソースから、データが利用できるということです。
おもな機能
Broadwayにより、同時並行のGenStage
トポロジーを定める負担が減り、シンプルな設定APIで同時生成や同時処理、バッチの扱いなどが自動的に定義できます。データの取得や処理にかかる時間とコストがともに下げられるのです。
-
バックプレッシャー -
GenStage
を活用して、上流のソースから必要なイベントだけを受け取るので、パイプラインがあふれません。 -
自動通知 - パイプラインの終わりやエラーが起こったときには、自動的にメッセージが送られます。
-
バッチ処理 - バッチ処理が組み込まれており、メッセージをサイズや時間によってグループ化できます。Amazon SQSのようにバッチでメッセージが効率的に処理できるシステムでは、時間とコストの両面から重要です。
-
データ損失を最小限にする耐障害性 - データの損失は最小限に抑えられる設計です。 プロデューサーは他のパイプラインから分離され、障害が起きた場合には自動的にサブスクライブし直されます。それに対して、ユーザーコールバックはステートレスです。そのため、エラーはローカル処理できます。予期しないバグに対しては、ダウンストリームコンポーネントのみが再起動されます。データの損失を防ぐためです。
-
グレースフル(graceful)シャットダウン - VMと統合してグレースフルシャットダウンができます。Broadwayを監視ツリーの一部として起動することにより、VMがシャットダウンしたときは、必ずすべてのイベントをフラッシュすることが保証されます。
-
組み込みテスト - 組み込みテストのAPIが予め備わっているので、テストメッセージをパイプラインで簡単にプッシュでき、イベントを正しく処理したことが確かめられます。
-
パーティショニング - 動的パーティションにもとづいて、メッセージがバッチ処理できます。たとえば、パイプラインに
user_id
やメールアドレスなどに応じたバッチが必要だとします。その場合は、Broadway.Message.put_batch_key/2
を呼び出せばよいのです。 -
Rate-limiting (未実装)
-
Statistics/Metrics (未実装)
-
Back-off (未実装)
インストール
mix.exs
の依存(deps
)に:broadway
を加えてください。
def deps do
[
{:broadway, "~> 0.1.0"}
]
end
Broadway
ビヘイビアを使う
つぎのふたつを定義します。
- パイプラインの設定
-
Broadway
ビヘイビアを実装するモジュール
例
Broadwayプロセスを開始するには、モジュールでuse Broadway
を呼び出して、start_link
関数を定めます。GitHubの「A quick example: SQS integration」に掲げられているのは以下の例です。依存関係にはBroadwaySQSが加えられ、SQS認証情報を設定したものとされています。
defmodule MyBroadway do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
sqs: [
module: {BroadwaySQS.Producer, queue_name: "my_queue"}
]
],
processors: [
default: [stages: 50]
],
batchers: [
s3: [stages: 5, batch_size: 10, batch_timeout: 1000]
]
)
end
def handle_message(_processor_name, message, _context) do
message
|> Message.update_data(&process_data/1)
|> Message.put_batcher(:s3)
end
def handle_batch(:s3, messages, _batch_info, _context) do
# Send batch of messages to S3
end
defp process_data(data) do
# Do some calculations, generate a JSON representation, process images.
end
end
Broadwayモジュールを定めたら、アプリケーション監視ツリーの子として{MyBroadway, []}
というかたちでに加えてください。
参考
- Plataformatec blog「Announcing Broadway」
- GitHub「Broadway」
- 公式ドキュメント「Broadway behaviour」