AkkaのActorをAkka Streamと組み合わせて使いたかったので、簡易的な使い方を記載しています。
Akka ActorをAkka StreamのSourceとして使う
val simpleSource = Source.actorRef[Int](bufferSize = 10, OverflowStrategy.dropHead)
val simpleSink = Sink.foreach[Int](number => println(s"my number is $number"))
val myActor: ActorRef = simpleSource.to(simpleSink).run()
myActor ! 100
myActor ! 200
myActor ! akka.actor.Status.Success("Stream処理が完了しました。")
実行結果
myActor ! 100
myActor ! 200
Akka ActorをAkka StreamのFlowとして使う
// 以下のようなActorを定義する
class MyActor extends Actor {
override def receive: Receive = {
case number: Int => // Intのメッセージを受け取ったらStringで返す
sender() ! s"Received number is $number"
}
}
val myActor = system.actorOf(Props[MyActor], "myActor")
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val simpleFlow = Flow[Int].ask[String](parallelism = 5)(myActor)
val simpleSource = Source(1 to 10)
val simpleSink = Sink.foreach[String](println)
simpleSource.via(simpleFlow).to(simpleSink).run()
実行結果
Received number is 1
Received number is 2
Received number is 3
Received number is 4
Received number is 5
Received number is 6
Received number is 7
Received number is 8
Received number is 9
Received number is 10
Akka ActorをAkka StreamのSinkとして使う
// Sinkとして使うAkka Actorが受け取るメッセージを定義
object MyInit
object MyAck
object MyComplete
case class Fail(throwable: Throwable)
// 以下のようなActorを定義する
class MyActor extends Actor {
override def receive: Receive = {
case MyInit =>
println("Stream initialized")
sender() ! MyAck // Ackを返す
case MyComplete =>
println("Stream completed")
sender() ! MyAck // Ackを返す
case Fail(ex) =>
println(s"Stream failed :ex")
case message =>
println(s"MyActor received message: $message")
sender() ! MyAck
}
}
val simpleSource = Source(1 to 10)
val myActor = system.actorOf(Props[MyActor], "myActor")
// actorRefWithAckを使うとバックプレッシャーが働く
val simpleSink = Sink.actorRefWithAck[Int](
myActor,
onInitMessage = MyInit,
onCompleteMessage = MyComplete,
ackMessage = MyAck,
onFailureMessage = throwable => Fail(throwable)
)
simpleSource.to(simpleSink).run()
実行結果
Stream initialized
MyActor received message: 1
MyActor received message: 2
MyActor received message: 3
MyActor received message: 4
MyActor received message: 5
MyActor received message: 6
MyActor received message: 7
MyActor received message: 8
MyActor received message: 9
MyActor received message: 10
Stream completed