Posted at
SchooDay 5

Schooの非同期パイプライン処理について

More than 1 year has passed since last update.


はじめに

本投稿は、Schooの2016年末特別企画「Schoo advent calendar 2016」の5日目の記事になります。

エンジニア -> 法人営業 -> 新規事業立ち上げ <- いまここ

ということでエンジニアを離れてすでに半年以上が経過しておりますが、1年以上前のことを思い出せる範囲で書きたいと思います。

内容は今更感が否めないですが、2日目のSchooの録画配信の裏側でも登場してきた非同期パイプライン処理についてです。


Schooの非同期パイプライン処理について

Schooの非同期パイプライン処理はAmazon Simple Queue Service (SQS)を用いて実装されております。

下図、例としてメール送信処理の流れです。

スクリーンショット 2016-12-02 0.40.54.png


設計時のポイント


Workerの並列処理のプロセス管理(ディスパッチャ)


  • ディスパッチャはSQSからジョブを取ってきて各ワーカープロセスに割り振ります。

  • Preforkモデルでマルチプロセス処理にしました。


    • シングルプロセスで動作させた場合、重いジョブに引きづられて他のジョブの処理が遅延してしまいます。



signal
signal number
処理
内容

SIGINT
2
forced shutdown
子プロセスの処理の終了を待たずに子プロセスを強制的に終了し、親プロセスも終了する

SIGQUIT
3
graceful shutdown
子プロセスの処理が完了し次第終了する

SIGTERM
15
forced shutdown
子プロセスの処理の終了を待たずに子プロセスを強制的に終了し、新しい子プロセスをフォークして再開する


ショートポーリング or ロングポーリング


  • SQSではメッセージを取得する際にWaitTimeSecondsでリクエストタイムアウト時間を設定できます。

  • ロングポーリング


    • SQSへのリクエストのタイムアウト時間を長くして、リクエスト回数を減らします。

    • リクエスト中はそのプロセスがロックされます。


      • キューの種類ごとにプロセスが分かれている場合などは有効です。





  • ショートポーリング


    • SQSへのリクエストを頻繁にして、キューが作成されていないかチェックします。

    • リクエスト時間が短い分ロックされません。


      • 異なる種類のキューを横断的に一つのワーカープロセスが処理する場合は有効です。






スケールアウト


  • キューイングのスケールアウトはSQSに任せるので、ポイントはワーカー側のスケールアウトです。


    • SQSはほぼ無制限のスループットが提供されます。



  • キューの種類とワーカーは1対1にしました。


    • このワーカーが担当するキューはこの種類に限りますと定めた。

    • 複数のキーを1つのワーカーが担当してしまうと、各キーにストックされているジョブの処理の重さが異るため、スケールアウトしにくいです。

    • ワーカーを分けておけば、このキーのジョブは重いのでこのキーを処理するワーカーは別サーバにして、複数台並列に並べる等が容易になります。

    • また、ソースコードの管理が分割できます。


      • すべてのソースコードが無くても、そのジョブに必要なソースコードさえあれば処理できます。






可視性タイムアウト


  • SQSは可視性タイムアウトというものを設定できます。

  • 可視性タイムアウトとは簡単に言うと、キューのメッセージを取り出してもそのメッセージはキューから削除されず、一定秒数取得できなくするという機能です。

  • なぜこのような機能が必要かというと、キューからメッセージを受信する前に接続が切断されたり、コンポーネントで障害が発生してメッセージを正しく処理できない可能性があり、その際にリトライできるようにするためです。

  • なので、コンポーネントがメッセージを正常に処理できた場合にのみ、そのメッセージを明示的に削除するように実装します。


Dead Letter Queues


  • 参考

  • 何回リトライして失敗したら、そのジョブをエラーとしてキューに入れますよ、という設定ができます。


    • キューイングされたジョブの中で、正しいフォーマットではないジョブなどは、何回リトライしてもエラーが発生して処理できません。




ディスパッチャのルール定義


  • キューイングするメッセージに保存する情報としては、キューの名前とワーカーに渡す引数のみに定めました。


    • ディスパッチャはキューから取得したキュー名を基にワーカーを起動し、引数を渡します。



  • キューに対してワーカーが一意に定まるようにしました。


    • あるキューからジョブを取り出したら起動するワーカーが定まるようにしました。




ワーカーごとの設定


  • ワーカーの種類ごとに設定を変更できるようにしました。


    • ディスパッチャがフォークする子プロセス数

    • キューが空の場合の待機秒数

    • キューから1度に取得するメッセージ件数

    • SQSへのリクエストのタイムアウト秒数

    • 可視性タイムアウトの秒数




おわりに

頑張って思い出しながら書いてみました。

次書くことがあれば、もっと自分ならではのエンジニア以外の視点も含めた内容にしようと思います。

Schooでは以下の職種を募集しておりますので、少しで興味のある方は遊びに来てください!

* リードエンジニア https://www.wantedly.com/projects/47833

* エンジニア(サーバサイド) https://www.wantedly.com/projects/49282

* エンジニア(サービス開発) https://www.wantedly.com/projects/67488

* テクニカルディレクター https://www.wantedly.com/projects/72407

* デザイナー https://www.wantedly.com/projects/70697