Scala
Akka
Akka-Stream

AkkaStreamで動的にSourceやSinkを切り替える

AkkaStreamはSource, Flow, Sinkを組み合わせることで、柔軟に要求を達成できるように設計されています。
しかし、既存のSourceSinkを使い回してStreamを構成・実行つつ、状況によってそれを付け替えたい要求が出てきた際に、少し凝ったことをする必要が出てきました。
今回は例として、流れてきたログをS3にアップロードしつつ、時間単位・日付単位でファイル名を切り替えるようなユースケースを想定して実装してみたいと思います。

ユースケースと問題点の整理

まず、ログを単一のS3Objectとしてアップロードするようなシンプルなユースケースを考えてみます。これだけならさほど難しくもなく、alpakkaのS3Connector等を利用することで容易に実現できます。ここではalpakkaのS3Connectorを採用したものとして進めます。

アップロード先のS3ObjectのObjectKey(とBucketName)を指定して、S3にアップロードすることが可能です。(型の変換等は本質ではないので省略します)
スクリーンショット 2018-06-17 18.38.58.png

ログのような時系列データを扱う際は、日付別や時間別に区切ってパーティショニングを行う要望がよく生じます。となると、動的にObjectKeyを変更してアップロードするよう設計されていない(その設計思想自体はもちろん正しい)現状のSinkだと、実現できそうにありません。
やりたいのはこういうStreamです。
スクリーンショット 2018-06-17 19.00.34.png

独自の要求を達成するためだけに、既存の資産を使えないとなるとかなり厳しいです。なんとか自前のSinkの中で、既存のSinkを使う方法はないでしょうか。

SubSource, SubSinkを使って内部にStreamを構築する

GraphStageのInternalAPIとして、SubSourceOutlet, SubSinkInletというものが提供されています。
これらはそれぞれ、SourceにあたるSubSource, SinkにあたるSubSinkを内部的に持っており、push, pull, grabを呼び出すことでそれらに要素を流したり、受け取ったりすることが可能です。また、GraphStageLogicの実装も要求し、ストリームの完了処理を呼び出すcompleteのタイミングも制御できるので、不要になるまでSinkとつなぎっぱなしにできます。

今回の場合、既存のS3ConnectorのSinkSubSourceOutletSourceをつないでおき、親のSinkに入ってきた要素をSubSourceOutletに流します。これにより、自前のSourceSinkの中に、既存のSourceSinkを取り入れて実装することが可能になります。
スクリーンショット 2018-06-17 20.25.48.png

これで既存の資産を流用する方法はわかりました。動的にSink(S3へのSink)を切り替えるには、必要なタイミングで実行中のStreamを完了させ、再度Streamを構築してそちらに要素を流していけば良さそうです。

指定の時間単位で区切ってS3にアップロードする

では、実装してみます。今回の実装で注意しないといけない点として、大きく2つあります。

  1. 状況に応じてS3へのストリーム処理を完了させ、次のストリーム処理を開始する必要がある
  2. 挙動の主軸となるのはSubSourceであり、Sink側のpullcompleteSubSource側で制御する必要がある

1つ目については、現在流れているSubSourceをステートとして持ち続け、完了させるとともに破棄・新たなSubSourceに入れ替えるというステートフルさが生む複雑性として、
2つ目については、Sink自身の振る舞いがSink自身からは分離され、見通しの悪いコードになってしまいます。

これらの複雑性を踏まえた上で、実装の一例を下記に示します。
(それぞれ、注意点に対応する部分をコメントしています)

class DynamicS3Sink[T](
  partitionBy      : ChronoUnit
)(implicit val actorSystem: ActorSystem, materializer: Materializer) extends GraphStage[SinkShape[ByteString]] {

  private val inlet: Inlet[ByteString] = Inlet("DynamicS3Sink.in")

  private val s3Client = new S3Client(S3Settings())

  override def shape: SinkShape[ByteString] = SinkShape(inlet)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var subSource: SubSourceOutlet[ByteString] = createSubSource
      var partitionStart: ZonedDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.HOURS)

      private def createSubSource: SubSourceOutlet[ByteString] = {
        val subSource = new SubSourceOutlet[ByteString]("DynamicS3Sink.SubSource.out")
        subSource.setHandler(() => {
          // 2. Sinkのpull等の挙動をSubSourceで管理する
          if (isClosed(inlet)) {
            completeStage()
          } else if (isNextPartition) {
            sinkRestart()
          } else {
            pull(inlet)
          }
        })
        subSource
      }

      private def createS3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
        s3Client.multipartUpload("logs", s"${partitionStart.toEpochSecond.toString}.log")

      private def isNextPartition: Boolean = {
        val now = ZonedDateTime.now().truncatedTo(partitionBy)
        now.isAfter(partitionStart.truncatedTo(partitionBy))
      }

      // 1. 処理中のSubSourceの完了とステートの上書き
      private def sinkRestart(): Unit = {
        subSource.complete()

        subSource = createSubSource
        partitionStart = ZonedDateTime.now().truncatedTo(partitionBy)

        Source.fromGraph(
          subSource.source
        ).to(
          createS3Sink
        ).run()(subFusingMaterializer)
      }

      override def preStart(): Unit = {
        sinkRestart()
      }

      setHandler(inlet, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(inlet)
          subSource.push(elem)
        }
      })

    }
}

コードを読むとpushpullSubSourceおよびSinkcompleteの管理等が煩雑になってきて辛い部分が多いです...
現状のS3用のSinkを外部から注入するようにすればlocalに書き出したりもできるので、もっと拡張性の高い実装方法はあると思います。

まとめ

最近色々AkkaStreamで実装してみて感じるのですが、凝ったストリームロジックを組むと大抵ステートフルな実装になっていきます。時系列のデータを集計したり、前の要素に応じて振る舞いを変えたり...気を抜くとどんどんテストしづらくなっていきます。
単体テストしやすいStreamの設計手法についても何か知見が得られれば、記事にしてみたいと思います。