Edited at

Akka ActorをAkka StreamのSource/Flow/Sinkとして使う

AkkaのActorをAkka Streamと組み合わせて使いたかったので、簡易的な使い方を記載しています。


Akka ActorをAkka StreamのSourceとして使う

val simpleSource = Source.actorRef[Int](bufferSize = 10, OverflowStrategy.dropHead)

val simpleSink = Sink.foreach[Int](number => println(s"my number is $number"))

val myActor: ActorRef = simpleSource.to(simpleSink).run()

myActor ! 100
myActor ! 200
myActor ! akka.actor.Status.Success("Stream処理が完了しました。")

実行結果

myActor ! 100

myActor ! 200


Akka ActorをAkka StreamのFlowとして使う

// 以下のようなActorを定義する

class MyActor extends Actor {
override def receive: Receive = {
case number: Int => // Intのメッセージを受け取ったらStringで返す
sender() ! s"Received number is $number"
}
}

val myActor = system.actorOf(Props[MyActor], "myActor")

import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val simpleFlow = Flow[Int].ask[String](parallelism = 5)(myActor)
val simpleSource = Source(1 to 10)
val simpleSink = Sink.foreach[String](println)

simpleSource.via(simpleFlow).to(simpleSink).run()

実行結果

Received number is 1

Received number is 2
Received number is 3
Received number is 4
Received number is 5
Received number is 6
Received number is 7
Received number is 8
Received number is 9
Received number is 10


Akka ActorをAkka StreamのSinkとして使う

// Sinkとして使うAkka Actorが受け取るメッセージを定義

object MyInit
object MyAck
object MyComplete
case class Fail(throwable: Throwable)

// 以下のようなActorを定義する
class MyActor extends Actor {
override def receive: Receive = {
case MyInit =>
println("Stream initialized")
sender() ! MyAck // Ackを返す
case MyComplete =>
println("Stream completed")
sender() ! MyAck // Ackを返す
case Fail(ex) =>
println(s"Stream failed :ex")

case message =>
println(s"MyActor received message: $message")
sender() ! MyAck
}
}

val simpleSource = Source(1 to 10)

val myActor = system.actorOf(Props[MyActor], "myActor")

// actorRefWithAckを使うとバックプレッシャーが働く
val simpleSink = Sink.actorRefWithAck[Int](
myActor,
onInitMessage = MyInit,
onCompleteMessage = MyComplete,
ackMessage = MyAck,
onFailureMessage = throwable => Fail(throwable)
)

simpleSource.to(simpleSink).run()

実行結果

Stream initialized

MyActor received message: 1
MyActor received message: 2
MyActor received message: 3
MyActor received message: 4
MyActor received message: 5
MyActor received message: 6
MyActor received message: 7
MyActor received message: 8
MyActor received message: 9
MyActor received message: 10
Stream completed