5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Elixir: データ処理パイプラインを効率化するツールBroadway

Last updated at Posted at 2019-04-18

Broadwayは、Elixirでデータ処理パイプラインを効率化するためのオープンソースのツールです。同時並行かつマルチステージでデータを統合し、データ処理のパイプラインが構築できます。Amazon SQSやRabbitMQなどのさまざまなソースから、データが利用できるということです。

おもな機能

Broadwayにより、同時並行のGenStageトポロジーを定める負担が減り、シンプルな設定APIで同時生成や同時処理、バッチの扱いなどが自動的に定義できます。データの取得や処理にかかる時間とコストがともに下げられるのです。

  • バックプレッシャー - GenStageを活用して、上流のソースから必要なイベントだけを受け取るので、パイプラインがあふれません。

  • 自動通知 - パイプラインの終わりやエラーが起こったときには、自動的にメッセージが送られます。

  • バッチ処理 - バッチ処理が組み込まれており、メッセージをサイズや時間によってグループ化できます。Amazon SQSのようにバッチでメッセージが効率的に処理できるシステムでは、時間とコストの両面から重要です。

  • データ損失を最小限にする耐障害性 - データの損失は最小限に抑えられる設計です。 プロデューサーは他のパイプラインから分離され、障害が起きた場合には自動的にサブスクライブし直されます。それに対して、ユーザーコールバックはステートレスです。そのため、エラーはローカル処理できます。予期しないバグに対しては、ダウンストリームコンポーネントのみが再起動されます。データの損失を防ぐためです。

  • グレースフル(graceful)シャットダウン - VMと統合してグレースフルシャットダウンができます。Broadwayを監視ツリーの一部として起動することにより、VMがシャットダウンしたときは、必ずすべてのイベントをフラッシュすることが保証されます。

  • 組み込みテスト - 組み込みテストのAPIが予め備わっているので、テストメッセージをパイプラインで簡単にプッシュでき、イベントを正しく処理したことが確かめられます。

  • パーティショニング - 動的パーティションにもとづいて、メッセージがバッチ処理できます。たとえば、パイプラインにuser_idやメールアドレスなどに応じたバッチが必要だとします。その場合は、Broadway.Message.put_batch_key/2を呼び出せばよいのです。

  • Rate-limiting (未実装)

  • Statistics/Metrics (未実装)

  • Back-off (未実装)

インストール

mix.exsの依存(deps)に:broadwayを加えてください。

def deps do
  [
    {:broadway, "~> 0.1.0"}
  ]
end

Broadwayビヘイビアを使う

つぎのふたつを定義します。

  1. パイプラインの設定
  2. Broadwayビヘイビアを実装するモジュール

Broadwayプロセスを開始するには、モジュールでuse Broadwayを呼び出して、start_link関数を定めます。GitHubの「A quick example: SQS integration」に掲げられているのは以下の例です。依存関係にはBroadwaySQSが加えられ、SQS認証情報を設定したものとされています。

defmodule MyBroadway do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        sqs: [
          module: {BroadwaySQS.Producer, queue_name: "my_queue"}
        ]
      ],
      processors: [
        default: [stages: 50]
      ],
      batchers: [
        s3: [stages: 5, batch_size: 10, batch_timeout: 1000]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:s3)
  end

  def handle_batch(:s3, messages, _batch_info, _context) do
    # Send batch of messages to S3
  end

  defp process_data(data) do
    # Do some calculations, generate a JSON representation, process images.
  end
end

Broadwayモジュールを定めたら、アプリケーション監視ツリーの子として{MyBroadway, []}というかたちでに加えてください。

参考

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?