リソースを確保(例えばファイルをオープン)して、読み込んで開放するというライフサイクルを持つ Source を定義したかった。
Akka stream (2.4.6) でやってみた。
GraphStage を定義してやるとこんな感じ?
package com.example
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import scala.collection.immutable
class ResourceStage[R](acquire: () => R, release: R => Unit) extends GraphStage[SourceShape[R]] {
val out = Outlet[R]("ResourceStage.out")
override def shape: SourceShape[R] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var resource: Option[R] = None
var pulled: Boolean = false
override def preStart() = {
// リソース確保
resource = Some(acquire())
pulled = false
}
override def postStop() = {
// リソース解放
resource.foreach(release)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// 最初に pull されたときにリソースを push
// 二度目にリソースを要求されたらストリームを終わる
if (pulled) {
complete(out)
} else {
pulled = true
resource.foreach(push(out, _))
}
}
})
}
}
object ResourceStage {
def bracket[A, B](acquire: () => A)(release: A => Unit)(f: A => immutable.Iterable[B]): Source[B, NotUsed] = {
Source.fromGraph(new ResourceStage(acquire, release))
.mapConcat(f)
}
def iteratorBracket[A, B](acquire: () => A)(release: A => Unit)(f: A => Iterator[B]): Source[B, NotUsed] = {
bracket(acquire)(release) { resource =>
new immutable.Iterable[B] {
override def iterator: Iterator[B] = f(resource)
}
}
}
}
使うときはこんな感じ。
// ファイルを開いて、行を読み込み、ファイルを閉じる
ResourceStage.iteratorBracket(
() => new BufferedReader(new FileReader(new File("LICENSE")))
)(_.close()) { buf =>
Iterator.continually(buf.readLine()).takeWhile(_ != null)
}
.runForeach(println)