LoginSignup
2
2

More than 5 years have passed since last update.

Cluster Clientを使って、双方向にコミュニケーションをするサーバーを作る

Last updated at Posted at 2015-02-11

標準のCluster Clientは単方向通信

askパターンを使って応答を得る、ということができない。

// これはできない。
val client = system.actorOf(ClusterClient.props(initialContacts = initialContacts))
client ? ClusterClient.Send("/ping", "ping")

どうするか?

Wikipediaより

メッセージを送信する相手のアクターはアドレスによって指定される(これをアクターの「メールアドレス」とも呼ぶ)。結果として、アクターはアドレスのあるアクターとのみ通信可能であり、他のアクターのアドレスは以下のような方法で獲得される:
1. 受信したメッセージ内にアドレスが書いてある。  ★これを採用
2. そのアクターが何らかの方法で既に相手のアドレスを知っている。★これを採用 
3. そのアクターは生成したアクターである。
アクターモデルは、アクター自体およびアクター間の計算の本質的並行性を特徴とし、メッセージ内にアクターのアドレスを含め、相互のやりとりは到着順が保証されない直接的非同期メッセージパッシングのみである。

Cluster間の通信

  • メッセージの送り手 Cluster Client
  • メッセージの受け手 Cluster Receptionist

異なるActorSystem間でメッセージをやり取りする場合、受け手のActorSystemにCluster Receptionistを用意しなければならない。
今回は双方向なので、両方のActorSystemにCluster Receptionistを用意する。

プロトタイプの構成

 (actor system1)             (actor system2)
 cluster-client1       ----> cluster-receptionist2
 cluster-receptionist1 <---- cluster-client2

両方のActorSystemにCluster ClientとCluster Receptionistを用意する。

プロトタイプ(Ping Pong Server)のソースコード


import com.typesafe.config.ConfigFactory

import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props, UnhandledMessage, actorRef2Scala}
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension}

object BiCommunicateCluster extends App {
  val Array(host, port, systemName, targetHost, targetPort, targetName, _*) = args
  val confText = s"""
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  cluster {
  //  seed-nodes = []
    seed-nodes = ["akka.tcp://${systemName}@${host}:${port}"]
  }
  extensions = ["akka.contrib.pattern.ClusterReceptionistExtension"]
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = ${host}
      port = ${port}
    }
  }
  contrib.cluster.client {
    mailbox {
      mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
      stash-capacity = 1000
    }
  }
  akka.contrib.cluster.receptionist {
    name = receptionist
    number-of-contacts = 3
    response-tunnel-receive-timeout = 30s
  }
  // loglevel = "DEBUG"
}
"""
  val conf = ConfigFactory.load(ConfigFactory.parseString(confText))
  val system = ActorSystem(systemName, conf)
  // 相手側
  val targetPath = s"akka.tcp://${targetName}@${targetHost}:${targetPort}/user/receptionist"
  val pinger = system.actorOf(Pinger.props(targetPath), "ping")
  val ponger = system.actorOf(Ponger.props(targetPath), "pong")
  ClusterReceptionistExtension(system).registerService(ponger)// Pongは公開する
  Thread.sleep(10000L)
  pinger ! Protocols.SendPing
}

/**
 * メッセージ
 */
object Protocols {
  case object SendPing
  case class Ping(path: ActorPath)
  case class Pong(path: ActorPath)
}

// Ping
object Pinger {
  def props(targetAddress: String) = Props(classOf[Pinger], targetAddress)
}

class Pinger(receptionistAddress: String) extends Actor with ActorLogging {
  import Protocols._
  log.info(s"Pinger will send to ${context.actorSelection(receptionistAddress)}")
  val client = context.actorOf(ClusterClient.props(initialContacts =
    Set(receptionistAddress).map(context.actorSelection(_))))

  def receive = {
    case SendPing =>
      log.info("Received SendPing")
      val pongReceiver = context.actorOf(Props[PongReceiver])
      log.info(s"Pong Receiver's path = ${pongReceiver.path}")
      ClusterReceptionistExtension(context.system).registerService(pongReceiver)
      client ! ClusterClient.Send("/user/pong", Ping(pongReceiver.path), false)
  }
}

// Pong
object Ponger {
  def props(receptionistAddress: String) = Props(classOf[Ponger], receptionistAddress)
}

class Ponger(receptionistAddress: String) extends Actor with ActorLogging {
  import Protocols._
  log.info(s"Ponger will respond to ${context.actorSelection(receptionistAddress)}")
  val client = context.actorOf(ClusterClient.props(initialContacts =
    Set(receptionistAddress).map(context.actorSelection(_))))

  def receive = {
    case Ping(path) =>
      log.info(s"Ping from ${path}.")
      client ! ClusterClient.Send(path.toStringWithoutAddress, Pong(self.path), false)
  }
}

// PingerがPingしてからPongを受け取る人
// メッセージを受け取ったら死ぬ
// メッセージの受信をPingとは並列にしたいため、別Actorにする。
class PongReceiver extends Actor with ActorLogging {
  import Protocols._
  def receive = {
    case Pong(address) =>
      log.info(s"Pong from ${address}")
      context stop self
  }
}

動作確認

> scala BiCommunicateCluster xxx.xxx.xxx.xxx 3005 Test2 yyy.yyy.yyy.yyy 3000 Test1 &
> scala BiCommunicateCluster yyy.yyy.yyy.yyy 3000 Test1 xxx.xxx.xxx.xxx 3005 Test2 &

Received SendPing // Test1#Pinger
Pong Receiver's path = akka://Test1/user/ping/$b// Test1#Pinger
Pong from akka://Test2/user/pong // Test1#PongReceiver
Ping from akka://Test2/user/ping/$b. // Test1#Ponger

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2