Scala
Akka
actor
Akka-Stream

Akka-StreamのRunnableGraphの構築パターン

More than 1 year has passed since last update.

Akka-Streamでアプリケーションを実装する場合、
SourceとFlowとSinkを組み合わせてRunnableGraphを構築し、runして実行する。

本記事ではAkka-Streamの部品のメインとなるSource, Flow, Sinkを、
Akka-ActorおよびActorPublisher, ActorSubscriberを使う場合/使わない場合でシンプルに実装してみる。

Actorを使わずにRunnableGraphを構築する

まず、Actorを使わないAkka-Streamなアプリケーションを実装する。
コード全体はこちら
AkkaStreamStandard.scala

今回取り扱うデータ型はこれだけ。

case class Message(value: String) extends AnyVal

このMessageを追加されたら加工しながら出力するgraphを実装する。

Source

後からデータを追加できるようにSourceQueueWithCompleteを使う。
Source.queue[T]で楽に作れて便利。

val source: Source[Message, SourceQueueWithComplete[Message]] =
  Source.queue[Message](100, OverflowStrategy.backpressure)

Flow

Messageを受け取って値を書き換えるものと、畳み込んでStringにする2つのFlow。
Flow[T].mapFlow[T].foldで実装する。

val flow: Flow[Message, Message, NotUsed] =
  Flow[Message].map { r => r.copy(value = s"(Mapped: ${r.value})") }

val accumulater: Flow[Message, String, NotUsed] =
  Flow[Message].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }

Sink

先程のFlowで作成したStringprintlnするだけの簡単なSink。

val sink: Sink[String, Future[Done]] = Sink.foreach[String] { println }

Graph

上で作ったSource, Flow, Sinkを組み合わせてRunnableGraphを構築する。
viatoでつなげるだけ。

// simple graph
val graph: RunnableGraph[SourceQueueWithComplete[Message]] =
  source via flow via accumulater to sink

実行

上のgraphrunを呼んだら手に入るqueueに対してofferでメッセージを詰めるとstreamを流れていく。
completeで終了。

val queue: SourceQueueWithComplete[Message] = graph.run()

queue offer Message("hello!") 
queue offer Message("100") 
queue offer Message("good") 
queue complete

実行するとこんなログが出る。

[info] init :: (Mapped: hello!) :: (Mapped: 100) :: (Mapped: good)

Actor{Publisher,Subscriber}を使ってRunnableGraphを構築する

コード全体はこちら
AkkaStreamPracWithActor.scala

使用するデータ型は、
先程のMessageと名前を変えたLetterqueue.completeに相当するメッセージとしてFinishを用意した。

case class Letter(value: String) extends AnyVal
case object Finish

Source

Source用のActorを実装

Sourceに使用するActorはActorPublisher[T]を実装すれば良い。
publishするメッセージはreceive内でonNext(msg)し、onCompleteで終了する。

class PublishActor extends ActorPublisher[Letter] {
  // publish [[Letter]] or OnComplete
  override def receive: Actor.Receive = {
    case s: String =>
      onNext(Letter(s"Nice: $s"))
    case i: Int =>
      onNext(Letter(s"Great: ${i * 100}"))
    case Finish =>
      onComplete()
  }
}

ActorPublisherを使ってSourceを作る

先程のPublishActorActorRefインスタンスをActorPublisher.applyPublisher[T]にしてから、
Source.fromPublisherでSourceに変換する

// publisher actor
val publishActorRef = system.actorOf(Props[PublishActor])

// source with actor
val source: Source[Letter, NotUsed] = {
  val publisher: Publisher[Letter] = ActorPublisher(publishActorRef)
  Source.fromPublisher(publisher)
}

Flow

Flow用のActorを実装

これは普通にActorをextendsした普通のActor。
返り値を使いたいのでsenderにメッセージを送る。

class FlowActor extends Actor {
  // subscribe and publish
  override def receive: Actor.Receive = {
    case Letter(msg) => sender() ! Letter(s"(Mapped: $msg)")
    case any        => println(s"??? => $any")
  }
}

Actorを使ってFlowを作る

StackOverFlowを参考に実装。
先程のFlowActorActorRefインスタンスにask(?)で得られるFuture[T]Flow.mapAsync[T]でFlowにするだけ。

// flow
val flow: Flow[Letter, Letter, NotUsed] = {
  import scala.concurrent.duration._
  implicit val timeout: Timeout = 1.second
  val flowActor = system.actorOf(Props[FlowActor])
  def flowWithActor(reply: Letter): Future[Letter] = (flowActor ? reply).mapTo[Letter]

  Flow[Letter].mapAsync[Letter](3)(flowWithActor)
}

// another flow without actor
val accumulater: Flow[Letter, String, NotUsed] =
  Flow[Letter].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }

Actorを使わないFlowもあわせて使ってみる。

Sink

Sink用のActorを実装

ActorSubscriberを実装したActorにする。
OnNextOnCompleteが送られてくるので、それをreceiveすればいい。

class SubscribeActor extends ActorSubscriber {
  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  // just subscribe
  override def receive: Actor.Receive = {
    case OnNext(any) => println(s"subscribed: $any")
    case OnComplete  => println(s"finish process!")
  }
}

ActorSubscriberを使ってSinkを作る

先程のSubscribeActorActorRefActorSubscriber.applyしたものをSink.fromSubscriberでSourceにする。

// sink with actor
val sink: Sink[String, NotUsed] = {
  val printActor = system.actorOf(Props[SubscribeActor])
  Sink.fromSubscriber[String](ActorSubscriber[String](printActor))
}

RunnableGraph

これは特に変わらない。
viatoでSource, Flow, Sinkをつなげればいい。

// simple graph
val graph: RunnableGraph[NotUsed] = source via flow via accumulater to sink

実行

RunnableGraph#runする。
そして、Sourceの元となったpublishActorRefに対してメッセージをtellすればstreamを流れていく。
runして即座にメッセージを流すとIllegalStateExceptionが出てしまうのでとりあえずThread.sleepしてごまかしている。

graph.run
// wait preparing graph
Thread.sleep(100L)

publishActorRef ! "hello!"
publishActorRef ! 100
publishActorRef ! "good"
publishActorRef ! Finish

実行するとこんなログが出る。

[info] subscribed: init :: (Mapped: Nice: hello!) :: (Mapped: Great: 10000) :: (Mapped: Nice: good)

ActorRefを使ってRunnableGraphを構築する

上記ではActorPublisher/ActorSubscriberを使って実装したが、次はSource.actorRef, Sink.actorRefを使ってみる。
コード全体はこちら
AkkaStreamPracWithActorRef.scala

Source

Source.actorRefでSourceを作る。

// source with actorRef
val source: Source[Message, ActorRef] = Source.actorRef[Message](65536, OverflowStrategy.fail)

これだけ。
Sourceとして使用するActorRefインスタンスを用意する必要すらない。
第一引数はメッセージバッファのサイズで、第二引数はそれがoverflowした時の挙動。
ちなみにこの場合はBackPressureはサポートされていない。

Flow

先ほどと全く同じものを使うのでここでは省略。

Sink

Sink用のActor

まずSink用のActorを実装する。
ただprintlnするだけのシンプルなActor。

class SubscribeActor extends Actor {
  // just subscribe
  override def receive: Actor.Receive = {
    case Finish => println(s"finish process!")
    case any => println(s"subscribed: $any")
  }
}

Sink.actorRefでSinkを作る

先程のActorのActorRefインスタンスをSink.actorRefに渡すだけで良い。

// subscriber
val subscribeActor = system.actorOf(Props[SubscribeActor])
// sink with actorRef
val sink: Sink[Any, NotUsed] = Sink.actorRef(subscribeActor, onCompleteMessage = Finish)

第二引数のonCompleteMessageは、Sink内部のActorSubscriberに対してOnCompletetellされた時にこのActorRefに対してtellされる。
ActorRefSinkActor.scala

RunnableGraph

viatoでSource, Flow, Sinkをつなげる点は同じ。
ただ、RunnableGraph[Mat]MatActorRefになる。

// simple graph
val graph: RunnableGraph[ActorRef] = source via flow via accumulator to sink

実行

先程のgraphに対してrunを実行するとActorRefが返ってくる。
scala
val sourceActorRef: ActorRef = graph.run

これに対してメッセージをtellすればそのメッセージがSourceとしてStreamを流れていく。

// wait preparing graph
Thread.sleep(100L)

sourceActorRef ! Letter("hello!")
sourceActorRef ! Letter("100")
sourceActorRef ! Letter("good")

// force complete upstream source
sourceActorRef ! PoisonPill

今回は途中のFlowでfoldを使っているため、上流となるsourceActorRefを終了させることで次にstreamを無理やり流している。
実行結果は以下。

[info] subscribed: init :: (Mapped: hello!) :: (Mapped: 100) :: (Mapped: good)

まとめ

RunnableGraphの構築にActorを使わない, ActorPublisher/ActorSubscriber, ActorRefを使う3パターンで実装した。

Actorを使うやり方だとそもそもActorを多少知らないと使えない点ではコスト高にはなるが、
開発アーキテクチャのベースとしてActorを使用しているケースではStreamでも統一してActorを使えるというのは利点になるはず。

ただ、Konradさん(@ktosopl)StackOverFlow

Please don't use ActorPublisher and ActorSubscriber. They're too low level and you might end up implementing them in such a way that's violating the Reactive Streams specification.

と言っているのでActorPublisher/ActorSubscriberを使って実装するのは避けた方が良さそう。
なので、これらを使うくらいならActorRefを使うべきだが、
Akka-StreamがActorの存在をせっかく隠してくれているので明示的に使わずにすむならそれがベターかと。
必要のないものを無理に使わない、という当たり前の感想となってしまった。