Edited at

Akka Stream によるバッチ処理開発のススメ


はじめに

自社の技術ブログ向けに執筆した草稿を Qiita に記載していたものを公開しています。規約違反等、問題がありましたら削除いたします。

この記事は株式会社フロムスクラッチの開発部門が運営する技術ブログ向けに執筆しています。

執筆者はログ基盤周りを中心に日々開発業務に当たっています、関口です。

弊社が運営するマーケティングクラウドサービス「b→dash」では Web サーバ周りを Rails で開発する一方で、 バッチ処理やツール群を golang、Java、Scala を用途や開発者のスキルに合わせて適宜選択して採用しています。

私自身は昨年の夏に Scala で実装されたバッチアプリケーションとしてのログ基盤を引き継ぎ、前Qは累積した負債を解決するためにリプレイスを計画し、その前段階とも言える新規のバッチ処理を Akka-Stream をバッチフレームワークとして採用し開発したので、そのアプリケーション開発での経験を少し書いていきたいと思います。

よろしくお願いします。


そもそもの課題について


ログ処理基盤

開発したアプリケーションは弊社が提供するレコメンド機能が生み出すレコメンド結果をログデータから取り込み Hive 、 Presto から参照可能な Orc ファイルのデータに変換するバッチアプリケーションです。

前身としてクライアントのサイトに仕込んだタグからログデータを飛ばして集積する Web ログの処理基盤があり、レコメンドログは概ね仕組みとしてはこれと同じ仕組みで出力されますが、配信メール、LINE、SMS といった他の種別のログと同様に Web とは処理系統を素にする事が決まっていました。

ところが、この Web ログの処理系統そのものが初期の想定を超えてアプリケーションの処理効率を落としており、そのリプレイスに先んじてレコメンドを新規のフレームワークでスケールしようという魂胆がありました。


課題概要

という訳で解決を目指した課題は以下の通り。

パフォーマンスに直接関わる課題が2点、保守の生産性に関わる課題が1点です。


  1. 出力データのデータ構造の悪さ

  2. そもそもの変換・集計処理が直列に実行される事による処理効率の悪さ

  3. 当初想定から要件が変更になった過程での大きなリファクタリングを避けた結果としての内部構造の見通しの悪さ

これらの課題のために、毎時に実行されるバッチ処理が月間 1000 万 PV に及ぶ某クライアントのアカウントでは非キャンペーン時でも毎時 60 分を超えてしまい翌日のための集計処理のための要件に深刻な影響を与えていました。

「1. 出力データのデータ構造の悪さ」がボトルネックとしては特に深刻で、主要なアカウントでは差分を集計した後の既存データとのマージ処理だけで1回のバッチ処理の三分の一に及び、根本的にはここを解決しなければ非機能要件をクリアできない事は明白でした。

「2. 変換・集計処理が直列に実行される事による処理効率の悪さ」はこれはこれで改善の余地が大きく、機能追加の過程で少し Future1 を利用した非同期処理に書き換えてやるだけで最大級のクライアントに至っては1回のバッチ処理にかかる非キャンペーン時の所要時間がざっくり10分程度軽減できたため、複数段階の集計処理を十分安全に非同期化する事ができれば大幅なパフォーマンス改善に繋がりそうです。

「3. 大きなリファクタリングを避けた結果としての内部構造の見通しの悪さ」は機能追加や問題発生時の検証作業に当たれる人間を限定してしまっており、ログ基盤周りの発展を遅延させていました。設計レベルのスケーラビリティが低く当初想定のアカウント、データビュー (サブアカウントに近い概念) といった大きなくくりを超えたスケールに対応できずためこれも解決すべき課題と言えます。


つまりは ?


  1. 出力データの構造を Orc ファイルという形式に合わせて再検討し

  2. アプリケーションの設計を十分抽象化してスケーラビリティを高め

  3. 集計過程を安全に非同期化する

パフォーマンス改善の必要があったというわけです。


Akka-Stream とは ?


採用にあたって

課題を解決するために利用を検討したバッチフレームワークが Akka-Stream です。

その他、同じ Scala を使うにしても Hadoop や Spark といったフレームワークも候補として検討はしましたが、レコメンドログの処理自体に必要な要件としては EMR を使った分散処理とするのはコストとして過剰になると判断しました。

Akka-Stream 自体はそもそも Spark の実装に用いられた Java / Scala 用のリアクティブストリームフレームワークです。経緯的にはその後 Akka-Stream の低レイヤーを実現していた Netty に Spark 側は置きかわり、一方 Akka-Stream はというとその低レイヤーの実装を Spray.io に変更しています。2

そういった経緯で作られてきたものなので基本的に Spark のような分散処理で実装したいものは Akka-Stream で実装する事は可能であるという前提がありました。アプリケーションの規模感や Spark の枠組みよりも柔軟な処理を書きたい場合の選択になるという予測で、今回の場合は処理内容的にもシンプルなので Spark を利用するメリットは薄めという所感でした。


サクッと説明する

では Akka-Stream 自体は何を可能にしてくれるフレームワークかと言いますと、処理系をストリーミング処理を構成するグラフコンポーネントの集合で表現することを可能にしてくれるものです。詳しくは既に様々なドキュメントが起されているのでそちらに頼りましょう。3

グラフコンポーネントは大別すると以下の三種から成り立ちます。


  • Source


    • 入力部

    • キューや InputStream 、イテレータなど様々な入力装置からメッセージを受けて処理系にメッセージを与える事ができる



  • Sink


    • 出力 (終端) 部

    • グラフの処理を完了させる

    • 結果を出力させるか (Future[A]) させずに完了のみを通知するか (Future[Done]) は任意



  • Flow


    • 中間処理部

    • Source から Sink を繋ぐ処理系統の主軸で、新しい Source や Sink を作成できる


      • Source + Flow = Source

      • Flow + Sink = Sink

      • Flow + Flow = Flow



    • I/O はメッセージに対して1対1に限らず、ブロードキャストやZIP、UNZIP、マージなど分岐、合流が容易に可能。



簡単にレコメンドログ処理に沿って言ってしまえば、 S3 にアウトプットしたアクセスログファイル (gz) をインプットとして Orc ファイルに書き出すもので、 S3 からのインプットを InputStream を入力にした Source 、Orc への書き出し処理を Sink にして、アクセスログの読み込みやバリデーション、変換処理を Flow に連ねる事になります。


どんなアーキテクチャで ?


もちろん ? クリーンアーキテクチャで

採用したアーキテクチャは現代ではマイクロサービスやドメイン駆動設計の需要に伴いもはや概念としては定着した感もあるクリーンアーキテクチャです。 4 5

シンプルなバッチ処理をクリーンアーキテクチャに寄せすぎてしまうのは逆に過剰かなといういう感覚もあったのですが、 弊社の Web サーバ自体 Rails でクリーンアーキテクチャ風味の設計を実現しているため基礎の相性は悪くありませんでした。

課題の一つである構造の見通しの悪さもビジネスロジックとインフラ系をはっきり区別する事で改善できる自信がありましたし、何より自分自身のキャッチアップの成果を測る機会として、クリーンアーキテクチャを採用しました。


4層構造


  • domain


    • models

    • services

    • repositories



  • application


    • usecases

    • services



  • infrastructure


    • repositories

    • gateway

    • utils



  • presentation


    • main

    • params



  • (utils)

レイヤはベースとして4層に分け、ビジネスロジックを極力抽象でドメイン層に記述し、具象をアプリケーション層に書いてインフラ層との中立を任せ、データの入出力の実態や実データからモデルへの変換処理などは極力インフラ層に吐き出すという形で記述しました。 presentation 層は今回はバッチ処理のためのパラメータをコマンドラインから渡して起動する事を前提としていたので CLI から実行するためのメイン関数周りの記述を集約しました。(ここに関しては大いに解釈が誤っているかもしれません。。。)

書ける限りの要素をドメイン層とインフラ層に寄せた事で、アプリケーション層とプレゼンテーション層は最低限の処理の取りまとめに集中する事が出来て、ドメイン層では境界付けられたコンテキスト6の外側を意識しないビジネスロジックの記述に、インフラ層ではデータの読み書きの実態を柔軟に書く事が出来た手応えがありました。


Akka Stream をどう使ったの?


主として

Akka-Stream は主にアプリケーション層に記述した Usecase とインフラ層での I/O 処理の記述に利用しました。 Usecase#execute のリターンは Runnable グラフの実行結果である Future です。7

クリーンアーキテクチャでは原則フレームワーク側の都合などをインフラ層で吸収するのが一つの哲学になるため、どこまで Akka-Stream のインタフェースを干渉させていいかはかなり迷ったのですが、そもそも Usecase をグラフ構築の取り纏め役と定めた時点でアプリケーション層に干渉することは明らかでしたので、まずはインフラ層、アプリケーション層までで止めるよう努めてみるのが良いだろうと考えました。

ドメイン層に抽象を置くリポジトリなどは具象を記述するインフラ層的には Source や Flow をリターンしたくなりそこそこに妥協をしてしまった部分がありますが、そういう部分こそアプリケーション層に書き出してしまう方がまとまっただろうなと反省しています。


役割として

Akka-Stream が果たした役割はドメイン層からアプリケーション層にかけて記述したビジネスロジックの流れを Flow に取り纏め、 I/O をインフラ層とやりとりする処理系統の制御・構築です。

ドメインサービスやアプリケーションサービスはまず基本的には Value/Entity オブジェクトをインプットとして何らかの結果をアウトプットする手続きモデルです。 Value/Entity オブジェクトそれ自体にサービスを呼び出すような機能は無いため、動線は別のクラスで引いてやる必要があります。その動線が Akka-Stream の Flow であり、動線の構築を担うのがアプリケーション層です。


リリースの後に

後に、と言っても実はまだまだ満たさなければならない機能があり俺たちの戦いはこれからだ!状態なのですが。。。

数値を出せないのが大変心苦しいところなのですが、アプリケーションをステージング環境にリリースして検証作業を実施してみたところ、IPO が Akka-Stream を利用して並列化した事で Web の処理系統と比較して安定してパフォーマンスを発揮できるようになりました。ボトルネックの確認やバグと思しき挙動の調査検証も設計上のフローが明確になった事で分かりやすく、調査ポイントが幾分絞りやすくなったと思っています。

プロジェクトを一から構築し直し再設計を適用、要所要所で用いるライブラリも自分自身で選定してリリースに持ち込めた事で、個人的には粗がありつつも満足度の高いアプリケーションになりました。ほぼほぼ独学に近い状態から1年以上 Scala によるログ基盤の開発を任されてきた一つの成果としての実感を得ることができ、その意味でフロムスクラッチという会社に感謝の気持ちがあります。





  1. Future/Promiseについて (ドワンゴさんの研修資料) 



  2. Migration Guide 2.1.x to 2.2.x (Akka 公式ドキュメント) 



  3. Akka Streams についての基礎概念 (Qiita記事 @xoyo24) 



  4. 実装クリーンアーキテクチャ (Qiita記事 @nrslib) 



  5. Clean ArchitectureでAPI Serverを構築してみる (Qiita記事 @hirotakan) 



  6. 境界付けられたコンテキスト 概念編 - ドメイン駆動設計用語解説 (Qiita記事 @little_hand_s) 



  7. charwork社 フロムスクラッチとしてではなくあくまで個人的に、ではありますが、 Akka-Stream を使ったアプリケーションの実装については chatwork 社の皆様にご意見を伺う機会があり、 Usecase のリターンを RunnableGraph にするのは一つの手だ、というものを含め様々なアドバイスを頂きました。今回は execute メソッドのリターンとしてグラフを返すのは違和感があったため Usecase 内で実行し結果を呼び出し側で取得できるように書きましたが、方針を検討する上で大いに参考にさせて頂きました。この場を借りて改めて謝辞を述べさせて頂きます。