今回の目的
前回はTCP/IPの実装法と動作確認を行いました.
今回は, 前回のコードを拡張して複数ユーザが参加可能な簡単なチャットアプリを作ってみたいと思います
なお, 今回作成したコードはこちらのリポジトリにコミットされています
構想
チャットなので, 一人の発言者からのメッセージを参加者全員にリアルタイムで配布したいのですが,
メッセージの受信と送信を一つのプロセスで行う形のままでは, 受信待ちのたびに処理がブロックされてしまい
リアルタイムにメッセージを送信することができません.
そこで, 受信と送信を並列化し, メッセージの送信を受信状況に関わらず行えるようにします.
イメージ図は以下の感じです.
いわゆるオブザーバパターンというやつですね.
では, 早速実装していきましょう!
実装
Client
コード
package main
import (
"fmt"
"net"
"os"
"bufio"
)
func main() {
connection, error := net.Dial("tcp", "localhost:10000");
if error != nil {
panic(error);
}
defer connection.Close()
go waitMessage(connection);
sendMessage(connection);
}
func sendMessage(connection net.Conn) {
fmt.Print("> ");
stdin := bufio.NewScanner(os.Stdin)
if stdin.Scan() == false {
fmt.Println("Ciao ciao!");
return;
}
_, error := connection.Write([]byte(stdin.Text()));
if error != nil {
panic(error);
}
sendMessage(connection)
}
func waitMessage(connection net.Conn) {
var response = make([]byte, 4 * 1024);
_, error := connection.Read(response);
if (error != nil) {
panic(error);
}
fmt.Printf("Server> %s \n", response);
waitMessage(connection)
}
解説
クライアントのコードは, メッセージの受信と送信を並列化している以外
前回と変わりません
go waitMessage(connection);
sendMessage(connection);
処理を並列に行いたい場合はgo構文を使って処理をgoroutine化するだけでOK.
簡単ですね.
Server
コード
package chat
import (
"net"
)
type NotificationType int
const (
Message NotificationType = iota
Join
Defect
)
// Observerへの通知を意味する構造体
type Notification struct {
Type NotificationType
ClientId int
Connection net.Conn
Message string
}
package chat
import (
"net"
)
type Receiver struct {
Id int
Connection net.Conn
Observer chan<- Notification
}
func (receiver Receiver) Start() {
receiver.Observer <- Notification{ Type: Join, ClientId: receiver.Id, Connection:receiver.Connection }
receiver.WaitMessage();
}
func (receiver Receiver) WaitMessage() {
var buf = make([]byte, 1024);
n, error := receiver.Connection.Read(buf);
if (error != nil) {
receiver.Observer <- Notification{ Type: Defect, ClientId: receiver.Id}
return;
}
receiver.Observer <- Notification{ Type: Message, ClientId: receiver.Id, Message: string(buf[:n])}
receiver.WaitMessage();
}
package chat
import (
"net"
"fmt"
)
type Observer struct {
Senders []Sender
Subject <-chan Notification
}
func (observer Observer) WaitNotice() {
notice := <-observer.Subject
switch notice.Type {
case Message:
for i := range observer.Senders {
observer.Senders[i].SendMessage(notice.Message);
}
break;
case Join:
observer.Senders = appendSender(notice.ClientId, notice.Connection, observer.Senders);
fmt.Printf("Client %d join, now menber count is %d\n", notice.ClientId, len(observer.Senders));
break;
case Defect:
observer.Senders = removeSender(notice.ClientId, observer.Senders);
fmt.Printf("Client %d defect, now menber count is %d\n", notice.ClientId, len(observer.Senders));
break;
default:
}
observer.WaitNotice();
}
func appendSender(senderId int, connection net.Conn, senders []Sender) []Sender {
return append(senders, Sender{ Id: senderId, Connection: connection})
}
func removeSender(senderId int, senders []Sender) []Sender {
var find = -1;
for i := range senders {
if (senders[i].Id == senderId) {
find = i;
break;
}
}
if (find == -1) {
return senders;
}
return append(senders[:find], senders[find+1:]...);
}
package chat
import (
"net"
)
type Sender struct {
Id int
Connection net.Conn
}
func (sender Sender) SendMessage(message string) {
var buf = []byte(message);
_, error := sender.Connection.Write(buf)
if error != nil {
panic(error);
}
}
package main
import (
"fmt"
"net"
"./chat"
)
func main() {
listener, error := net.Listen("tcp", "localhost:10000");
if error != nil {
panic(error);
}
fmt.Println("Server running at localhost:10000");
var channel = make(chan chat.Notification);
var observer chat.Observer = chat.Observer{ Senders: make([]chat.Sender, 0, 5), Subject: channel};
go observer.WaitNotice();
waitClient(listener, 0, observer, channel);
}
func waitClient(listener net.Listener, sequence int, observer chat.Observer, channel chan chat.Notification) {
connection, error := listener.Accept();
if error != nil {
panic(error);
}
var receiver chat.Receiver = chat.Receiver{ Id: sequence, Connection: connection, Observer: channel};
go receiver.Start();
waitClient(listener, sequence + 1, observer, channel);
}
解説
メッセージの流れの上流から解説ということで, まずはReceiverから.
処理はStartメソッドを呼び出されると同時に, Observerに新たなメンバー入室の通知を行います
func (receiver Receiver) Start() {
receiver.Observer <- Notification{ Type: Join, ClientId: receiver.Id, Connection:receiver.Connection }
receiver.WaitMessage();
}
並列化した処理間でのメッセージのやり取りは, メッセージの送信側受信側で共有する
Channel型の変数を使って行います.
ここで使っている, receiver.ObserverもChannelで, あとで解説するmain関数の中で生成されています.
Receiverはクライアントからのメッセージを受信するたびに, Observerにメッセージ受信の通知を行い, 再び受信待ち状態にもどります
receiver.Observer <- Notification{ Type: Message, ClientId: receiver.Id, Message: string(buf[:n])}
receiver.WaitMessage();
また, 通信切断を検知したら, メンバー退室の通知を行い, 処理を終了します.
n, error := receiver.Connection.Read(buf);
if (error != nil) {
receiver.Observer <- Notification{ Type: Defect, ClientId: receiver.Id}
return;
}
次はObserver.
Observerは, waitNotice関数の中で通知を待ち,
func (observer Observer) WaitNotice() {
notice := <-observer.Subject
受け取った通知の種別によってメッセージの送信, あるいはメンバーの追加/削除を行います
switch notice.Type {
case Message:
for i := range observer.Senders {
observer.Senders[i].SendMessage(notice.Message);
}
break;
case Join:
observer.Senders = appendSender(notice.ClientId, notice.Connection, observer.Senders);
fmt.Printf("Client %d join, now menber count is %d\n", notice.ClientId, len(observer.Senders));
break;
case Defect:
observer.Senders = removeSender(notice.ClientId, observer.Senders);
fmt.Printf("Client %d defect, now menber count is %d\n", notice.ClientId, len(observer.Senders));
break;
default:
}
最後に, waitNotice関数を呼び出し, 再び通知待ち状態に戻ります.
observer.WaitNotice();
}
基本的にObserverはサーバプログラムが生きてる限り走り続ける想定なので, ループから抜ける処理はありません.
そしてSender.
func (sender Sender) SendMessage(message string) {
var buf = []byte(message);
_, error := sender.Connection.Write(buf)
if error != nil {
panic(error);
}
}
受け取った文字列をコネクション経由で送信するだけなので, 特に言うことはありません.
今回の設計ではメッセージの送信は並列化していないので, 引数はchannel経由ではなく
直接引数として渡しています.
最後にmain関数の説明をします.
main関数では, net#Listen関数でチャット参加待ち用のポートを確保したのち, Reciver-Observer間のChannelを生成し
Observer#WaitNoticeをgoroutineとして走らせます
listener, error := net.Listen("tcp", "localhost:10000");
if error != nil {
panic(error);
}
fmt.Println("Server running at localhost:10000");
var channel = make(chan chat.Notification);
var observer chat.Observer = chat.Observer{ Senders: make([]chat.Sender, 0, 5), Subject: channel};
go observer.WaitNotice();
その後, 自身はチャットへの参加者待ちループに入ります
waitClient(listener, 0, observer, channel);
}
func waitClient(listener net.Listener, sequence int, observer chat.Observer, channel chan chat.Notification) {
connection, error := listener.Accept();
if error != nil {
panic(error);
}
var receiver chat.Receiver = chat.Receiver{ Id: sequence, Connection: connection, Observer: channel};
go receiver.Start();
waitClient(listener, sequence + 1, observer, channel);
}
残件
基本的な動きはできるようになりましたが
- エラーハンドリング
- サバクラ間でメッセージ送受信以外の通信を行うためのプロトコル設計
- 複数チャットルームへの対応
などといった拡張まだまだ考えられます.
この辺についてはおいおい考えていこうかと思います.