Posted at

AkkaStreamで動的にSourceやSinkを切り替える

More than 1 year has passed since last update.

AkkaStreamはSource, Flow, Sinkを組み合わせることで、柔軟に要求を達成できるように設計されています。

しかし、既存のSourceSinkを使い回してStreamを構成・実行つつ、状況によってそれを付け替えたい要求が出てきた際に、少し凝ったことをする必要が出てきました。

今回は例として、流れてきたログをS3にアップロードしつつ、時間単位・日付単位でファイル名を切り替えるようなユースケースを想定して実装してみたいと思います。


ユースケースと問題点の整理

まず、ログを単一のS3Objectとしてアップロードするようなシンプルなユースケースを考えてみます。これだけならさほど難しくもなく、alpakkaのS3Connector等を利用することで容易に実現できます。ここではalpakkaのS3Connectorを採用したものとして進めます。

アップロード先のS3ObjectのObjectKey(とBucketName)を指定して、S3にアップロードすることが可能です。(型の変換等は本質ではないので省略します)

スクリーンショット 2018-06-17 18.38.58.png

ログのような時系列データを扱う際は、日付別や時間別に区切ってパーティショニングを行う要望がよく生じます。となると、動的にObjectKeyを変更してアップロードするよう設計されていない(その設計思想自体はもちろん正しい)現状のSinkだと、実現できそうにありません。

やりたいのはこういうStreamです。

スクリーンショット 2018-06-17 19.00.34.png

独自の要求を達成するためだけに、既存の資産を使えないとなるとかなり厳しいです。なんとか自前のSinkの中で、既存のSinkを使う方法はないでしょうか。


SubSource, SubSinkを使って内部にStreamを構築する

GraphStageのInternalAPIとして、SubSourceOutlet, SubSinkInletというものが提供されています。

これらはそれぞれ、SourceにあたるSubSource, SinkにあたるSubSinkを内部的に持っており、push, pull, grabを呼び出すことでそれらに要素を流したり、受け取ったりすることが可能です。また、GraphStageLogicの実装も要求し、ストリームの完了処理を呼び出すcompleteのタイミングも制御できるので、不要になるまでSinkとつなぎっぱなしにできます。

今回の場合、既存のS3ConnectorのSinkSubSourceOutletSourceをつないでおき、親のSinkに入ってきた要素をSubSourceOutletに流します。これにより、自前のSourceSinkの中に、既存のSourceSinkを取り入れて実装することが可能になります。

スクリーンショット 2018-06-17 20.25.48.png

これで既存の資産を流用する方法はわかりました。動的にSink(S3へのSink)を切り替えるには、必要なタイミングで実行中のStreamを完了させ、再度Streamを構築してそちらに要素を流していけば良さそうです。


指定の時間単位で区切ってS3にアップロードする

では、実装してみます。今回の実装で注意しないといけない点として、大きく2つあります。


  1. 状況に応じてS3へのストリーム処理を完了させ、次のストリーム処理を開始する必要がある

  2. 挙動の主軸となるのはSubSourceであり、Sink側のpullcompleteSubSource側で制御する必要がある

1つ目については、現在流れているSubSourceをステートとして持ち続け、完了させるとともに破棄・新たなSubSourceに入れ替えるというステートフルさが生む複雑性として、

2つ目については、Sink自身の振る舞いがSink自身からは分離され、見通しの悪いコードになってしまいます。

これらの複雑性を踏まえた上で、実装の一例を下記に示します。

(それぞれ、注意点に対応する部分をコメントしています)

class DynamicS3Sink[T](

partitionBy : ChronoUnit
)(implicit val actorSystem: ActorSystem, materializer: Materializer) extends GraphStage[SinkShape[ByteString]] {

private val inlet: Inlet[ByteString] = Inlet("DynamicS3Sink.in")

private val s3Client = new S3Client(S3Settings())

override def shape: SinkShape[ByteString] = SinkShape(inlet)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var subSource: SubSourceOutlet[ByteString] = createSubSource
var partitionStart: ZonedDateTime = ZonedDateTime.now().truncatedTo(ChronoUnit.HOURS)

private def createSubSource: SubSourceOutlet[ByteString] = {
val subSource = new SubSourceOutlet[ByteString]("DynamicS3Sink.SubSource.out")
subSource.setHandler(() => {
// 2. Sinkのpull等の挙動をSubSourceで管理する
if (isClosed(inlet)) {
completeStage()
} else if (isNextPartition) {
sinkRestart()
} else {
pull(inlet)
}
})
subSource
}

private def createS3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
s3Client.multipartUpload("logs", s"${partitionStart.toEpochSecond.toString}.log")

private def isNextPartition: Boolean = {
val now = ZonedDateTime.now().truncatedTo(partitionBy)
now.isAfter(partitionStart.truncatedTo(partitionBy))
}

// 1. 処理中のSubSourceの完了とステートの上書き
private def sinkRestart(): Unit = {
subSource.complete()

subSource = createSubSource
partitionStart = ZonedDateTime.now().truncatedTo(partitionBy)

Source.fromGraph(
subSource.source
).to(
createS3Sink
).run()(subFusingMaterializer)
}

override def preStart(): Unit = {
sinkRestart()
}

setHandler(inlet, new InHandler {
override def onPush(): Unit = {
val elem = grab(inlet)
subSource.push(elem)
}
})

}
}

コードを読むとpushpullSubSourceおよびSinkcompleteの管理等が煩雑になってきて辛い部分が多いです...

現状のS3用のSinkを外部から注入するようにすればlocalに書き出したりもできるので、もっと拡張性の高い実装方法はあると思います。


まとめ

最近色々AkkaStreamで実装してみて感じるのですが、凝ったストリームロジックを組むと大抵ステートフルな実装になっていきます。時系列のデータを集計したり、前の要素に応じて振る舞いを変えたり...気を抜くとどんどんテストしづらくなっていきます。

単体テストしやすいStreamの設計手法についても何か知見が得られれば、記事にしてみたいと思います。