前回からの続き。Actorがメッセージを受け取ってからの動きを見ていく。メインとなるのはUntypedActorに定義されている抽象メソッドonReceiveの実装である。
ChatRoom.java
package models;
import play.mvc.*;
import play.libs.*;
import play.libs.F.*;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import static akka.pattern.Patterns.ask;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.util.*;
import static java.util.concurrent.TimeUnit.*;
/**
* A chat room is an Actor.
*/
public class ChatRoom extends UntypedActor {
// Default room.
static ActorRef defaultRoom = Akka.system().actorOf(Props.create(ChatRoom.class));
// Create a Robot, just for fun.
static {
new Robot(defaultRoom);
}
/**
* Join the default room.
*/
public static void join(final String username, WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out) throws Exception{
// Send the Join message to the room
String result = null;
result = (String)Await.result(ask(defaultRoom,new Join(username, out), 1000), Duration.create(1, SECONDS));
if("OK".equals(result)) {
// For each event received on the socket,
in.onMessage(new Callback<JsonNode>() {
public void invoke(JsonNode event) {
// Send a Talk message to the room.
defaultRoom.tell(new Talk(username, event.get("text").asText()), null);
}
});
// When the socket is closed.
in.onClose(new Callback0() {
public void invoke() {
// Send a Quit message to the room.
defaultRoom.tell(new Quit(username), null);
}
});
} else {
// Cannot connect, create a Json error.
ObjectNode error = Json.newObject();
error.put("error", result);
// Send the error to the socket.
out.write(error);
}
}
// Members of this room.
Map<String, WebSocket.Out<JsonNode>> members = new HashMap<String, WebSocket.Out<JsonNode>>();
public void onReceive(Object message) throws Exception {
if(message instanceof Join) {
// Received a Join message
Join join = (Join)message;
// Check if this username is free.
if(members.containsKey(join.username)) {
getSender().tell("This username is already used", getSelf());
} else {
members.put(join.username, join.channel);
notifyAll("join", join.username, "has entered the room");
getSender().tell("OK", getSelf());
}
} else if(message instanceof Talk) {
// Received a Talk message
Talk talk = (Talk)message;
notifyAll("talk", talk.username, talk.text);
} else if(message instanceof Quit) {
// Received a Quit message
Quit quit = (Quit)message;
members.remove(quit.username);
notifyAll("quit", quit.username, "has left the room");
} else {
unhandled(message);
}
}
// Send a Json event to all members
public void notifyAll(String kind, String user, String text) {
for(WebSocket.Out<JsonNode> channel: members.values()) {
ObjectNode event = Json.newObject();
event.put("kind", kind);
event.put("user", user);
event.put("message", text);
ArrayNode m = event.putArray("members");
for(String u: members.keySet()) {
m.add(u);
}
channel.write(event);
}
}
// -- Messages
public static class Join {
final String username;
final WebSocket.Out<JsonNode> channel;
public Join(String username, WebSocket.Out<JsonNode> channel) {
this.username = username;
this.channel = channel;
}
}
public static class Talk {
final String username;
final String text;
public Talk(String username, String text) {
this.username = username;
this.text = text;
}
}
public static class Quit {
final String username;
public Quit(String username) {
this.username = username;
}
}
}
onReceiveメソッド
メッセージ用のオブジェクトを引数で受け取る。
- Join
- Talk
- Quit
のメッセージは、対応するクラスが予め定義してあるため、ここではどのクラスのインスタンスが渡されたかを判断し、分岐処理を行っている。
Joinの場合
渡されたユーザ名が既にRoom内に存在していた場合は、Joinメッセージの送り主(getSender()で取得。この場合はaskで作成されたPromiseActorRef)に、「既にあるよ」という旨のメッセージと自分自身(defaultRoom)の参照を渡して返している。
新規ユーザだった場合は、defaultRoomのメンバーにそのユーザ名を追加する。また、notifyAllメソッドにより、ルームに登録されているすべてのWebSocketの出力チャンネルへ、「参加した」旨のJSONオブジェクトを渡している。これによりクライアント側のonMessageが反応し、「誰々が入ってきたよ」と表示が更新される。
メッセージの送り主のask(PromiseActorRef)には「入室OK」の旨のメッセージを返す。以降の流れは前回見た通りである。
Talkの場合
notifyAllメソッドにより、ルームに登録されているすべてのWebSocketの出力チャンネルへ、「発言」のJSONオブジェクトを渡している。これによりクライアント側のonMessageが反応し、発言者と発言内容がセットで更新される。
Quitの場合
defaultRoomのメンバー一覧からユーザ名を除去。notifyAllメソッドにより、ルームに登録されているすべてのWebSocketの出力チャンネルへ、「退出した」旨のJSONオブジェクトを渡している。これによりクライアント側のonMessageが反応し、「誰々が退出したよ」と表示が更新される。
Robot.java
ひと通り見終わった。最後にRobot.javaを見てみる。Room内で定期的につぶやいているbotである。
package models;
import play.*;
import play.mvc.*;
import play.libs.*;
import scala.concurrent.duration.*;
import akka.actor.*;
import com.fasterxml.jackson.databind.JsonNode;
import static java.util.concurrent.TimeUnit.*;
public class Robot {
public Robot(ActorRef chatRoom) {
// Create a Fake socket out for the robot that log events to the console.
WebSocket.Out<JsonNode> robotChannel = new WebSocket.Out<JsonNode>() {
public void write(JsonNode frame) {
Logger.of("robot").info(Json.stringify(frame));
}
public void close() {}
};
// Join the room
chatRoom.tell(new ChatRoom.Join("Robot", robotChannel), null);
// Make the robot talk every 30 seconds
Akka.system().scheduler().schedule(
Duration.create(30, SECONDS),
Duration.create(30, SECONDS),
chatRoom,
new ChatRoom.Talk("Robot", "I'm still alive"),
Akka.system().dispatcher(),
/** sender **/ null
);
}
}
ChatRoom.javaの冒頭、static初期化ブロックでインスタンス化されているオブジェクトである。
defaultRoomにJoinオブジェクトを渡して入室しようとしているが、ここでJoinオブジェクトにフェイクのWebSocket出力チャンネルを渡している。botの出力チャンネルに出力依頼が来ると、ロガーを使ってログを吐き出すよう、処理をOverrideしているわけである。この場合WebSocket本来のWebSocket通信は全く行われていない。ちょっと強引やな。
同様に、ルームへの書き込みも、クライアント側からのWebSocket通信ではなく、サーバでAkkaのスケジューラを使って定期的にdefaultRoomにメッセージを送っているだけである。
まとめ
Playframework付属の'websocket-chat'を4回に渡って見て来た。
ごちゃごちゃしていたが、WebSocketとAkkaのActorを巧みに組み合わせて処理が行われていることがわかった。
クライアントとサーバ間ではWebSocketのinとoutで通信、サーバ内ではAkkaのActorシステムを使用し、ask、tellで送ってonReceiveで受け取る、というメッセージのやりとりが行われている。
根幹部分が理解できたので、これを自分なりに拡張してもっとそれらしいチャットシステムを構築してみよう!
Playframework付属のサンプル'websocket-chat'を見てみる(1/4)
Playframework付属のサンプル'websocket-chat'を見てみる(2/4)
Playframework付属のサンプル'websocket-chat'を見てみる(3/4)