LoginSignup
1
1

More than 5 years have passed since last update.

リソースを確保して読み込んで開放する Source を作る

Last updated at Posted at 2016-05-24

リソースを確保(例えばファイルをオープン)して、読み込んで開放するというライフサイクルを持つ Source を定義したかった。
Akka stream (2.4.6) でやってみた。

GraphStage を定義してやるとこんな感じ?

package com.example

import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}

import scala.collection.immutable

class ResourceStage[R](acquire: () => R, release: R => Unit) extends GraphStage[SourceShape[R]] {

  val out = Outlet[R]("ResourceStage.out")

  override def shape: SourceShape[R] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var resource: Option[R] = None
      var pulled: Boolean = false

      override def preStart() = {
        // リソース確保
        resource = Some(acquire())
        pulled = false
      }

      override def postStop() = {
        // リソース解放
        resource.foreach(release)
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          // 最初に pull されたときにリソースを push
          // 二度目にリソースを要求されたらストリームを終わる
          if (pulled) {
            complete(out)
          } else {
            pulled = true
            resource.foreach(push(out, _))
          }
        }
      })
    }
}

object ResourceStage {
  def bracket[A, B](acquire: () => A)(release: A => Unit)(f: A => immutable.Iterable[B]): Source[B, NotUsed] = {
    Source.fromGraph(new ResourceStage(acquire, release))
      .mapConcat(f)
  }

  def iteratorBracket[A, B](acquire: () => A)(release: A => Unit)(f: A => Iterator[B]): Source[B, NotUsed] = {
    bracket(acquire)(release) { resource =>
      new immutable.Iterable[B] {
        override def iterator: Iterator[B] = f(resource)
      }
    }
  }
}

使うときはこんな感じ。

// ファイルを開いて、行を読み込み、ファイルを閉じる
ResourceStage.iteratorBracket(
    () => new BufferedReader(new FileReader(new File("LICENSE")))
  )(_.close()) { buf =>
    Iterator.continually(buf.readLine()).takeWhile(_ != null)
  }
  .runForeach(println)
1
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1