LoginSignup
3
3

More than 5 years have passed since last update.

Playframework付属のサンプル'websocket-chat'を見てみる(3/4)

Last updated at Posted at 2014-08-08

前回の続き。今回はサーバ側のWebSocketを見てみる。

Application.java

Application.java
public static WebSocket<JsonNode> chat(final String username) {

    return new WebSocket<JsonNode>() {
        // Called when the Websocket Handshake is done.
        public void onReady(WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out){

            // Join the chat room.
            try { 
                ChatRoom.join(username, in, out);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    };
}

クライアントにWebSocketを返すところで、無名クラスを定義している。このクラスにはonReadyというメソッドがあるが、これはハンドシェイクが完了した時点で実行されるとのこと。おそらく最初に一回呼ばれるのだろうね。引数にWebSocketの入力、出力を受け取り、中でやっていることと言えば、ユーザ名と入力、出力を渡しChatRoomjoinというメソッドを呼び出しているだけである。早速ChatRoom.javajoinメソッドを見てみよう。

ChatRoom.java

ChatRoom.java
package models;

import play.mvc.*;
import play.libs.*;
import play.libs.F.*;

import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import static akka.pattern.Patterns.ask;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;



import java.util.*;

import static java.util.concurrent.TimeUnit.*;

/**
 * A chat room is an Actor.
 */
public class ChatRoom extends UntypedActor {

    // Default room.
    static ActorRef defaultRoom = Akka.system().actorOf(Props.create(ChatRoom.class));

    // Create a Robot, just for fun.
    static {
        new Robot(defaultRoom);
    }

    /**
     * Join the default room.
     */
    public static void join(final String username, WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out) throws Exception{

        // Send the Join message to the room
        String result = (String)Await.result(ask(defaultRoom,new Join(username, out), 1000), Duration.create(1, SECONDS));

        if("OK".equals(result)) {

            // For each event received on the socket,
            in.onMessage(new Callback<JsonNode>() {
               public void invoke(JsonNode event) {

                   // Send a Talk message to the room.
                   defaultRoom.tell(new Talk(username, event.get("text").asText()), null);

               } 
            });

            // When the socket is closed.
            in.onClose(new Callback0() {
               public void invoke() {

                   // Send a Quit message to the room.
                   defaultRoom.tell(new Quit(username), null);

               }
            });

        } else {

            // Cannot connect, create a Json error.
            ObjectNode error = Json.newObject();
            error.put("error", result);

            // Send the error to the socket.
            out.write(error);

        }

    }

    // Members of this room.
    Map<String, WebSocket.Out<JsonNode>> members = new HashMap<String, WebSocket.Out<JsonNode>>();

    public void onReceive(Object message) throws Exception {

        if(message instanceof Join) {

            // Received a Join message
            Join join = (Join)message;

            // Check if this username is free.
            if(members.containsKey(join.username)) {
                getSender().tell("This username is already used", getSelf());
            } else {
                members.put(join.username, join.channel);
                notifyAll("join", join.username, "has entered the room");
                getSender().tell("OK", getSelf());
            }

        } else if(message instanceof Talk)  {

            // Received a Talk message
            Talk talk = (Talk)message;

            notifyAll("talk", talk.username, talk.text);

        } else if(message instanceof Quit)  {

            // Received a Quit message
            Quit quit = (Quit)message;

            members.remove(quit.username);

            notifyAll("quit", quit.username, "has left the room");

        } else {
            unhandled(message);
        }

    }

    // Send a Json event to all members
    public void notifyAll(String kind, String user, String text) {
        for(WebSocket.Out<JsonNode> channel: members.values()) {

            ObjectNode event = Json.newObject();
            event.put("kind", kind);
            event.put("user", user);
            event.put("message", text);

            ArrayNode m = event.putArray("members");
            for(String u: members.keySet()) {
                m.add(u);
            }

            channel.write(event);
        }
    }

    // -- Messages

    public static class Join {

        final String username;
        final WebSocket.Out<JsonNode> channel;

        public Join(String username, WebSocket.Out<JsonNode> channel) {
            this.username = username;
            this.channel = channel;
        }

    }

    public static class Talk {

        final String username;
        final String text;

        public Talk(String username, String text) {
            this.username = username;
            this.text = text;
        }

    }

    public static class Quit {

        final String username;

        public Quit(String username) {
            this.username = username;
        }

    }

}

joinメソッド、最初の一文はこれである。

ChatRoom.java
String result = (String)Await.result(
    ask(defaultRoom,new Join(username, out), 1000), 
    Duration.create(1, SECONDS));

Await.resultaskDuration.create、どれも見慣れないが、それぞれ
Await.resultscala.concurrent.Await.result
askakka.pattern.Patterns.ask
Duration.createscala.concurrent.duration.Duration.create
のことである。ちょっとAPIリファレンスを見てみる。

scala.concurrent.Await.result→API

def result[T](awaitable: Awaitable[T], atMost: Duration): T
Await and return the result (of type T) of an Awaitable.

処理待ちして、Awaitableの結果(T)を返す、と言っている。
引数にはAwaitable(T)とDurationを受け取ると言っている。

akka.pattern.Patterns.ask→API

続いてask

 def ask (actor: ActorRef, message: Any, timeoutMillis: Long): Future[AnyRef]

Java API for `akka.pattern.ask`: Sends a message asynchronously and returns a Future holding the eventual reply message; this means that the target actor needs to send the result to the sender reference provided. The Future will be completed with an akka.actor.AskTimeoutException after the given timeout has expired; this is independent from any timeout applied while awaiting a result for this future (i.e. in Await.result(..., timeout)).

メッセージを非同期で送り、結果、応答メッセージを含んだFutureを受け取ると言っている。
引数にはActorRefとメッセージ(Any)とLongを受け取る、と言っている。

FutureAwaitableインターフェイスを実装しているようなので、上記Await.resultの第一引数になれたのだね。

ChatRoom.java
ask(defaultRoom,new Join(username, out), 1000)

をもう少し見てみると、
defaultRoom→第1引数ActorRef
new Join(username, out)→第2引数Any(Javaでいう'Object')のメッセージ
1000→第3引数Longのタイムアウト
となっている。

defaultRoomというのはChatRoomクラスの頭の方に定義されているstaticなActorRef型の変数である。

ChatRoom.java
static ActorRef defaultRoom = Akka.system().actorOf(Props.create(ChatRoom.class));

Akka.systemplay.libs.Akka.system
actorOfakka.actor.ActorSystem.actorOf
Props.createakka.actor.Props.create
これらをそれぞれ見ていく。

play.libs.Akka.system()→API

public static akka.actor.ActorSystem system()

    Retrieve the application Akka Actor system. 

akka.actor.ActorSystemを取ってくると言っている。

akka.actor.ActorSystem.actorOf→API

 def actorOf (props: Props): ActorRef

Create new actor as child of this context and give it an automatically generated name (currently similar to base64-encoded integer count, reversed and with “$” prepended, may change in the future).

このコンテキスト(?)の子として新しいアクターを作成し、自動で名前をつけ、ActorRefを返すと言っている。
Props(以下で説明)を引数として受け取ると言っている。

akka.actor.Props.create→API

 static Props   create(java.lang.Class<?> clazz, java.lang.Object... args)
          Java API: create a Props given a class and its constructor arguments.

Classクラスのオブジェクトを受け取り、Propsを作成すると言っている。

いったんまとめ

joinメソッド、最初の一文のこれで

ChatRoom.java
String result = (String)Await.result(
    ask(defaultRoom,new Join(username, out), 1000), 
    Duration.create(1, SECONDS));

Actor(ChatRoom)にJoin型のメッセージを非同期で送信し、処理結果をFuture型で受け取って(ask)、Awaitable型で受け取って(result)、String型に変換している。
端的に言うと、チャットのハブとなっているChatRoomのオブジェクト(Actor)に、入室の旨を伝え、反応を待っているわけである。
Joinは自分で定義したクラスで、usernameとWebSocketの出力チャンネルが入っている。
ちなみに、askに定義してあるタイムアウトに達しても応答が返らない場合、akka.actor.AskTimeoutExceptionが発生する。Await.resultの第2引数で定義してあるタイムアウトに達しても応答が返らない場合、java.util.concurrent.TimeoutExceptionが発生する。微妙に違うので注意。

続・ChatRoom.java(joinメソッド)

ChatRoom.java
if("OK".equals(result)) {

    // For each event received on the socket,
    in.onMessage(new Callback<JsonNode>() {
       public void invoke(JsonNode event) {

           // Send a Talk message to the room.
           defaultRoom.tell(new Talk(username, event.get("text").asText()), null);

       } 
    });

    // When the socket is closed.
    in.onClose(new Callback0() {
       public void invoke() {

           // Send a Quit message to the room.
           defaultRoom.tell(new Quit(username), null);

       }
    });

} else {

    // Cannot connect, create a Json error.
    ObjectNode error = Json.newObject();
    error.put("error", result);

    // Send the error to the socket.
    out.write(error);

}

ChatRoomアクターに入室を問い合わせ、すでにユーザ名が使われているなどでエラーになった場合、WebSocketの出力チャンネルにエラー用のJSONを書き出している。これでクライアント側のWebSocketのonmessageが反応する。

入室が無事OKだった場合は、WebSocketの入力用チャンネルにin.onmessageでクライアントからメッセージを受信した時のハンドラ、in.onCloseでWebSocketが閉じられた時のハンドラをplay.libs.FCallbackの無名クラスで登録している。

onMessage時は、defaultRoomtellメソッドで、Talkというオブジェクト(チャットのテキストを表す)を、onClose時はQuitというオブジェクトを渡している。


さて、askで送られてきたJoinも、tellで送られてきたTalkQuitも、すべてChatRoom.javaonReceiveで受け止めて処理を行うのだが・・・長くなったのでまた次回


Playframework付属のサンプル'websocket-chat'を見てみる(1/4)
Playframework付属のサンプル'websocket-chat'を見てみる(2/4)
Playframework付属のサンプル'websocket-chat'を見てみる(4/4)

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3