LoginSignup
14
17

More than 5 years have passed since last update.

Akka+Scala: Event Streamを使ってメッセージをPub/Subする

Last updated at Posted at 2015-08-20

Akka Actorでは、system.eventStream.publishsystem.eventStream.subscribeを使うことで、簡単にPublish-Subscriber Modelでメッセージの送受信ができるようになります。お手軽です。

package playground.actor.example8

import akka.actor._

class Publisher extends Actor with ActorLogging {
  override def receive: Receive = {
    case Publish(message) =>
      log.info("Publish: {}", message)
      // Publishする
      context.system.eventStream.publish(message)
    case any @ _ =>
      log.warning("Unknown message: {}", any)
  }

  override def preStart(): Unit = log.info("Actor is starting")
}

class Subscriber extends Actor with ActorLogging {
  override def receive: Receive = {
    case Text(message) =>
      log.info("Received text: {}", message)
    case Image(filename) =>
      log.info("Received image: {}", filename)
    case any @ _ =>
      log.warning("Unknown message: {}", any)
  }

  override def preStart(): Unit = {
    log.info("Actor is starting")
    // 起動時にSubscribeするメッセージを指定する
    context.system.eventStream.subscribe(self, classOf[Message])
  }

  override def postStop(): Unit = {
    log.info("Actor has stopped")
  }
}

case class Publish(content: Message)

trait Message
case class Text(message: String) extends Message
case class Image(filename: String) extends Message

object EventStream extends App {
  val system = ActorSystem("example")

  val publisher = system.actorOf(Props(classOf[Publisher]), "publisher")
  val subscriber = system.actorOf(Props(classOf[Subscriber]), "subscriber")

  publisher ! Publish(Text("Hello"))
  publisher ! Publish(Image("smile.png"))
  Thread.sleep(10L)

  // Subscriberが死ぬと、PublisherのSubscriberリストから除外される
  subscriber ! PoisonPill
  Thread.sleep(10L)

  // これはSubscriberに届かず、デッドレターにもならない(デッドレターはタイミング次第でなる場合がある)
  publisher ! Publish(Text("Hello again"))
  Thread.sleep(10L)

  system.shutdown()
}

実行すると、次のような出力になります。

[akka://example/user/publisher] Actor is starting
[akka://example/user/subscriber] Actor is starting
[akka://example/user/publisher] Publish: Text(Hello)
[akka://example/user/subscriber] Received text: Hello
[akka://example/user/publisher] Publish: Image(smile.png)
[akka://example/user/subscriber] Received image: smile.png
[akka://example/user/subscriber] Actor has stopped
[akka://example/user/publisher] Publish: Text(Hello again)

こちらのサンプルコードはsuin/scala-playgroundをチェックアウトすると試すことが出来ます。

git clone https://github.com/suin/scala-playground.git
cd scala-playground
./activator
runMain playground.actor.example8.EventStream
14
17
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
17