前回の続き。今回はサーバ側のWebSocketを見てみる。
Application.java
public static WebSocket<JsonNode> chat(final String username) {
return new WebSocket<JsonNode>() {
// Called when the Websocket Handshake is done.
public void onReady(WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out){
// Join the chat room.
try {
ChatRoom.join(username, in, out);
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
}
クライアントにWebSocketを返すところで、無名クラスを定義している。このクラスにはonReady
というメソッドがあるが、これはハンドシェイクが完了した時点で実行されるとのこと。おそらく最初に一回呼ばれるのだろうね。引数にWebSocketの入力、出力を受け取り、中でやっていることと言えば、ユーザ名と入力、出力を渡しChatRoom
のjoin
というメソッドを呼び出しているだけである。早速ChatRoom.java
のjoinメソッド
を見てみよう。
ChatRoom.java
package models;
import play.mvc.*;
import play.libs.*;
import play.libs.F.*;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import static akka.pattern.Patterns.ask;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.util.*;
import static java.util.concurrent.TimeUnit.*;
/**
* A chat room is an Actor.
*/
public class ChatRoom extends UntypedActor {
// Default room.
static ActorRef defaultRoom = Akka.system().actorOf(Props.create(ChatRoom.class));
// Create a Robot, just for fun.
static {
new Robot(defaultRoom);
}
/**
* Join the default room.
*/
public static void join(final String username, WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out) throws Exception{
// Send the Join message to the room
String result = (String)Await.result(ask(defaultRoom,new Join(username, out), 1000), Duration.create(1, SECONDS));
if("OK".equals(result)) {
// For each event received on the socket,
in.onMessage(new Callback<JsonNode>() {
public void invoke(JsonNode event) {
// Send a Talk message to the room.
defaultRoom.tell(new Talk(username, event.get("text").asText()), null);
}
});
// When the socket is closed.
in.onClose(new Callback0() {
public void invoke() {
// Send a Quit message to the room.
defaultRoom.tell(new Quit(username), null);
}
});
} else {
// Cannot connect, create a Json error.
ObjectNode error = Json.newObject();
error.put("error", result);
// Send the error to the socket.
out.write(error);
}
}
// Members of this room.
Map<String, WebSocket.Out<JsonNode>> members = new HashMap<String, WebSocket.Out<JsonNode>>();
public void onReceive(Object message) throws Exception {
if(message instanceof Join) {
// Received a Join message
Join join = (Join)message;
// Check if this username is free.
if(members.containsKey(join.username)) {
getSender().tell("This username is already used", getSelf());
} else {
members.put(join.username, join.channel);
notifyAll("join", join.username, "has entered the room");
getSender().tell("OK", getSelf());
}
} else if(message instanceof Talk) {
// Received a Talk message
Talk talk = (Talk)message;
notifyAll("talk", talk.username, talk.text);
} else if(message instanceof Quit) {
// Received a Quit message
Quit quit = (Quit)message;
members.remove(quit.username);
notifyAll("quit", quit.username, "has left the room");
} else {
unhandled(message);
}
}
// Send a Json event to all members
public void notifyAll(String kind, String user, String text) {
for(WebSocket.Out<JsonNode> channel: members.values()) {
ObjectNode event = Json.newObject();
event.put("kind", kind);
event.put("user", user);
event.put("message", text);
ArrayNode m = event.putArray("members");
for(String u: members.keySet()) {
m.add(u);
}
channel.write(event);
}
}
// -- Messages
public static class Join {
final String username;
final WebSocket.Out<JsonNode> channel;
public Join(String username, WebSocket.Out<JsonNode> channel) {
this.username = username;
this.channel = channel;
}
}
public static class Talk {
final String username;
final String text;
public Talk(String username, String text) {
this.username = username;
this.text = text;
}
}
public static class Quit {
final String username;
public Quit(String username) {
this.username = username;
}
}
}
joinメソッド
、最初の一文はこれである。
String result = (String)Await.result(
ask(defaultRoom,new Join(username, out), 1000),
Duration.create(1, SECONDS));
Await.result
、ask
、Duration.create
、どれも見慣れないが、それぞれ
Await.result
は scala.concurrent.Await.result
、
ask
は akka.pattern.Patterns.ask
、
Duration.create
は scala.concurrent.duration.Duration.create
のことである。ちょっとAPIリファレンスを見てみる。
scala.concurrent.Await.result→API
def result[T](awaitable: Awaitable[T], atMost: Duration): T
Await and return the result (of type T) of an Awaitable.
処理待ちして、Awaitableの結果(T)を返す、と言っている。
引数にはAwaitable(T)とDurationを受け取ると言っている。
akka.pattern.Patterns.ask→API
続いてask
def ask (actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef]
Java API for `akka.pattern.ask`: Sends a message asynchronously and returns a Future holding the eventual reply message; this means that the target actor needs to send the result to the sender reference provided. The Future will be completed with an akka.actor.AskTimeoutException after the given timeout has expired; this is independent from any timeout applied while awaiting a result for this future (i.e. in Await.result(..., timeout)).
メッセージを非同期で送り、結果、応答メッセージを含んだFuture
を受け取ると言っている。
引数にはActorRef
とメッセージ(Any
)とLong
を受け取る、と言っている。
Future
はAwaitableインターフェイス
を実装しているようなので、上記Await.result
の第一引数になれたのだね。
ask(defaultRoom,new Join(username, out), 1000)
をもう少し見てみると、
defaultRoom
→第1引数ActorRef
new Join(username, out)
→第2引数Any(Javaでいう'Object')
のメッセージ
1000
→第3引数Long
のタイムアウト
となっている。
defaultRoomというのはChatRoomクラスの頭の方に定義されているstaticなActorRef型の変数である。
static ActorRef defaultRoom = Akka.system().actorOf(Props.create(ChatRoom.class));
Akka.system
→ play.libs.Akka.system
actorOf
→ akka.actor.ActorSystem.actorOf
Props.create
→ akka.actor.Props.create
これらをそれぞれ見ていく。
play.libs.Akka.system()→API
public static akka.actor.ActorSystem system()
Retrieve the application Akka Actor system.
akka.actor.ActorSystemを取ってくると言っている。
akka.actor.ActorSystem.actorOf→API
def actorOf (props: Props): ActorRef
Create new actor as child of this context and give it an automatically generated name (currently similar to base64-encoded integer count, reversed and with “$” prepended, may change in the future).
このコンテキスト(?)の子として新しいアクターを作成し、自動で名前をつけ、ActorRefを返すと言っている。
Props(以下で説明)を引数として受け取ると言っている。
akka.actor.Props.create→API
static Props create(java.lang.Class<?> clazz, java.lang.Object... args)
Java API: create a Props given a class and its constructor arguments.
Classクラスのオブジェクトを受け取り、Propsを作成すると言っている。
いったんまとめ
join
メソッド、最初の一文のこれで
String result = (String)Await.result(
ask(defaultRoom,new Join(username, out), 1000),
Duration.create(1, SECONDS));
Actor(ChatRoom)にJoin
型のメッセージを非同期で送信し、処理結果をFuture
型で受け取って(ask)、Awaitable
型で受け取って(result)、String型に変換している。
端的に言うと、チャットのハブとなっているChatRoomのオブジェクト(Actor)に、入室の旨を伝え、反応を待っているわけである。
Join
は自分で定義したクラスで、username
とWebSocketの出力チャンネルが入っている。
ちなみに、ask
に定義してあるタイムアウトに達しても応答が返らない場合、akka.actor.AskTimeoutException
が発生する。Await.result
の第2引数で定義してあるタイムアウトに達しても応答が返らない場合、java.util.concurrent.TimeoutException
が発生する。微妙に違うので注意。
続・ChatRoom.java(joinメソッド)
if("OK".equals(result)) {
// For each event received on the socket,
in.onMessage(new Callback<JsonNode>() {
public void invoke(JsonNode event) {
// Send a Talk message to the room.
defaultRoom.tell(new Talk(username, event.get("text").asText()), null);
}
});
// When the socket is closed.
in.onClose(new Callback0() {
public void invoke() {
// Send a Quit message to the room.
defaultRoom.tell(new Quit(username), null);
}
});
} else {
// Cannot connect, create a Json error.
ObjectNode error = Json.newObject();
error.put("error", result);
// Send the error to the socket.
out.write(error);
}
ChatRoomアクターに入室を問い合わせ、すでにユーザ名が使われているなどでエラーになった場合、WebSocketの出力チャンネルにエラー用のJSONを書き出している。これでクライアント側のWebSocketのonmessage
が反応する。
入室が無事OKだった場合は、WebSocketの入力用チャンネルにin.onmessage
でクライアントからメッセージを受信した時のハンドラ、in.onClose
でWebSocketが閉じられた時のハンドラをplay.libs.F
のCallback
の無名クラスで登録している。
onMessage
時は、defaultRoom
のtell
メソッドで、Talk
というオブジェクト(チャットのテキストを表す)を、onClose
時はQuit
というオブジェクトを渡している。
さて、ask
で送られてきたJoin
も、tell
で送られてきたTalk
もQuit
も、すべてChatRoom.java
のonReceive
で受け止めて処理を行うのだが・・・長くなったのでまた次回!
Playframework付属のサンプル'websocket-chat'を見てみる(1/4)
Playframework付属のサンプル'websocket-chat'を見てみる(2/4)
Playframework付属のサンプル'websocket-chat'を見てみる(4/4)