LoginSignup
6
2

More than 5 years have passed since last update.

Akka ActorをAkka StreamのSource/Flow/Sinkとして使う

Last updated at Posted at 2019-02-24

AkkaのActorをAkka Streamと組み合わせて使いたかったので、簡易的な使い方を記載しています。

Akka ActorをAkka StreamのSourceとして使う

val simpleSource = Source.actorRef[Int](bufferSize = 10, OverflowStrategy.dropHead)
val simpleSink = Sink.foreach[Int](number => println(s"my number is $number"))

val myActor: ActorRef = simpleSource.to(simpleSink).run()

myActor ! 100
myActor ! 200
myActor ! akka.actor.Status.Success("Stream処理が完了しました。")

実行結果

myActor ! 100
myActor ! 200

Akka ActorをAkka StreamのFlowとして使う

// 以下のようなActorを定義する
class MyActor extends Actor {
  override def receive: Receive = {
    case number: Int => // Intのメッセージを受け取ったらStringで返す
      sender() ! s"Received number is $number"
  }
}
val myActor = system.actorOf(Props[MyActor], "myActor")

import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val simpleFlow = Flow[Int].ask[String](parallelism = 5)(myActor)
val simpleSource = Source(1 to 10)
val simpleSink = Sink.foreach[String](println)

simpleSource.via(simpleFlow).to(simpleSink).run()

実行結果

Received number is 1
Received number is 2
Received number is 3
Received number is 4
Received number is 5
Received number is 6
Received number is 7
Received number is 8
Received number is 9
Received number is 10

Akka ActorをAkka StreamのSinkとして使う

// Sinkとして使うAkka Actorが受け取るメッセージを定義
object MyInit
object MyAck
object MyComplete
case class Fail(throwable: Throwable)

// 以下のようなActorを定義する
class MyActor extends Actor {
  override def receive: Receive = {
    case MyInit =>
      println("Stream initialized")
      sender() ! MyAck // Ackを返す
    case MyComplete =>
      println("Stream completed")
      sender() ! MyAck // Ackを返す
    case Fail(ex) =>
      println(s"Stream failed :ex")

    case message =>
      println(s"MyActor received message: $message")
      sender() ! MyAck
  }
}
val simpleSource = Source(1 to 10)

val myActor = system.actorOf(Props[MyActor], "myActor")

// actorRefWithAckを使うとバックプレッシャーが働く
val simpleSink = Sink.actorRefWithAck[Int](
  myActor,
  onInitMessage = MyInit,
  onCompleteMessage = MyComplete,
  ackMessage = MyAck,
  onFailureMessage = throwable => Fail(throwable)
)

simpleSource.to(simpleSink).run()

実行結果

Stream initialized
MyActor received message: 1
MyActor received message: 2
MyActor received message: 3
MyActor received message: 4
MyActor received message: 5
MyActor received message: 6
MyActor received message: 7
MyActor received message: 8
MyActor received message: 9
MyActor received message: 10
Stream completed
6
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2