はじめに
クラウドネイティブという言葉が流行っていますね。昨日は@chiyoppy がMicroservisesとクラウドについての記事を書いてくれました。クラウドでWebサービスを設計する上で大切な事が簡潔にまとまった良い記事だと思います。今回はそれを受けて、クラウドネイティブなWebSocketアプリケーションをどのように作っていくかを考えて書こうと思います。昨日の記事ではアプリケーションについて、HTTPはステートレスだからすごい!スケールできる!みたいな事が書かれていましたが、今回の記事ではHTTPよりスケールしにくいと思われるWebSocketのアプリケーションについて考えてみます。
ちなみに僕は2015新卒で、仕事ではSwiftを使ってiOSアプリを書いてます。 それと今日誕生日です。プレゼントお待ちしています。
また僕は業務ではSwiftでiOSアプリを書いている事もあり、この記事はドワンゴで行っている業務とは全く関係がありません。
今回この話題を取り上げた理由は、ドワンゴの新卒何人かでだいたい週1回位で行っている勉強会で、現在クラウドネイティブとMicroservisesを勉強しているからです。それらを勉強している中で、 サーバー自体に状態を持ちやすいWebSocketアプリケーションの作り方 はどうすればいいんだ?という疑問が湧きました。この記事では小さいチャットアプリケーションを作りながらクラウドで動くWebSocketアプリケーションの設計を考えていきます。
クラウドネイティブ
クラウドネイティブは簡単に言うと、クラウド上で動かす事を前提にアプリケーションを設計することです。クラウド上で動かすためにコンテナ化や自動デプロイ、オートスケーリングやアプリケーションの設計も考えなくてはいけません。この記事ではクラウドで動かすアプリケーションの設計を考えます。その他の事については昨日chiyoppyがいろいろまとめてくれてるはずなので、今回は省きます。
この記事ではAWSで動かす事を仮定して書くので、AWS用語をどんどん使っていきます。
クラウドネイティブアプリケーションに求められる要件
クラウドで動くアプリケーションを作るためにはクラウドの特性を考える必要があります。その代表的なものとしてコンポーネントに信頼性がない事があります。EC2は突如として落ちるかもしれないという前提があるため、アプリケーションに求められる要件として
- EC2に状態を持たない
- EC2のインスタンスは突然消える可能性がある
- インスタンスがいつ破棄されても大丈夫な設計
- 高可用性(対障害性)
- 常に2つ以上のインスタンスを配置し、フェイルオーバーできるように
- 複数AZにインスタンスを配置
- AZがまるごと落ちても自動的にフェイルオーバーしたい
このようなものがあります。HTTPの場合はステートレスでスケールするアプリケーションを求められています。今回作成するWebSocketアプリケーションでも似たような考え方で
- 状態を外に押し出す
- スケールアウト可能な構成を目指す
- 接続中のコネクションにおいて障害が起きた時にユーザーが意識することなくフェイルオーバーする
事を目指していきます。3つ目はHTTPの場合は考える必要のない、WebSocketならではのポイントです。
今回目指すWebSocketアプリケーションは、簡単に言うと、2つのAZに2つのインスタンスA,Bを配置して、Aを手動で落とした時にユーザーがアプリケーションが落ちたと感じなければ良い という状態になることを目指します。WebSocketアプリケーションでは、コネクションを保持するので、サーバーが落ちてしまって切れたコネクションをうまく繋ぎ直す必要があります。Aのコネクションが自動的にBにフェイルオーバーされて、その間にまた新しいAが自動的に起動されることで、落ちないアプリケーション(落ちてるけどユーザーが気づかないアプリケーション)を作成することが目的です。今回は、太字で書いた部分を実現することを目標にします。
まずはクラウドネイティブを意識しないで書いてみる
ひとまず何も意識せずにアプリを書いてみましょう。今回はサンプルアプリケーションとして、チャットを採用しました。この状態のコードはこちらです。
https://github.com/kouki-dan/CloudChat/tree/basic-chat
起動すると、テキストフィールドが出てきます。そこに文字を打つと他のユーザーにも同期されるという、まさしくチャットです。
コードの解説
このクラスがWebSocket通信を担当しています。
class ChatSocketHandler(tornado.websocket.WebSocketHandler):
# クラス変数になる(ChatSocketHandler.xxxでアクセス、クラスメソッド内ではcls.xxxでアクセス可能)
waiters = set() # 今接続しているユーザーの集合
cache = [] # チャット履歴
def open(self): # WebSocketが開かれた時に呼ばれるメソッド
ChatSocketHandler.waiters.add(self)
self.write_message({"chats": ChatSocketHandler.cache}) # 接続時に履歴を送信
def on_close(self): # WebSocket切断時に呼ばれるメソッド
ChatSocketHandler.waiters.remove(self)
@classmethod
def update_cache(cls, chat):
cls.cache.append(chat) # 履歴を追加
@classmethod
def send_updates(cls, chat): # 全ユーザーにチャットをbroadcast
for waiter in cls.waiters:
try:
waiter.write_message(chat)
except:
logging.error("Error sending message", exc_info=True)
def on_message(self, message): # WebSocket経由でメッセージを受信した時に呼ばれるメソッド
parsed = tornado.escape.json_decode(message)
chat = {
"body": parsed["body"],
}
ChatSocketHandler.update_cache(chat)
ChatSocketHandler.send_updates({
"chats": [chat]
})
Python読めない方はごめんなさい。感じてください。Pythonは比較的読みやすい言語なので感じれるはずです。
さて、やっていることは、履歴の保持と、チャットが送られた時にbroadcastすることの2つです。この状態でも単体で動かす分には動くのですが、クラウド上で動かすことを考えると今の状態だと以下のような問題点があります
- アプリケーションが状態を持っている
- cache(チャットの履歴)をメモリ上に持っている
- インスタンスが落ちた時にデータが消えてしまう
- 複数インスタンスで同期が取れていない
- インスタンスが複数あった時に接続しているインスタンスに接続しているユーザーにしかチャットが送られなくなってしまう
- スケールアウトしない
- インスタンスが落ちた時に再接続されない
まずは、アプリケーションに状態(cache)を持たせないようにしてみましょう。 今回はRedisを使い、cacheをRedisのlistに格納するように変更します。
AWSのElastiCacheを使うと、複数のAZから接続できるRedisを簡単に使うことができるので便利です。
アプリケーションに状態を持たないチャット
今までメモリ上にチャットのデータをcacheという変数に保存していました。それをRedisに持たせる事で、EC2が落ちてもデータを失わない構成に変えてみましょう。実装した結果は以下になります
https://github.com/kouki-dan/CloudChat/tree/cache-in-redis
diffを見てみましょう。
class ChatSocketHandler(tornado.websocket.WebSocketHandler):
waiters = set()
- cache = [] # CacheはRedisに保存するため、削除しました
+ redis = Redis(decode_responses=True) # ↑のcacheをredis上に保存するためのインスタンスです
def open(self):
ChatSocketHandler.waiters.add(self)
# Cacheを直接参照していたものを、メソッド経由で取得するように変更しました
- self.write_message({"chats": ChatSocketHandler.cache})
+ self.write_message({"chats": ChatSocketHandler.get_caches()})
def on_close(self):
ChatSocketHandler.waiters.remove(self)
@classmethod
def update_cache(cls, chat):
# メモリ上ではなく、Redisに保存するように変更
- cls.cache.append(chat)
+ chat_id = cls.redis.incr("nextChatId") # chatのIDをRedisから取得
+ redis_chat_key = "chat:{}".format(chat_id) # 保存先のkeyを作成 (chat:1, chat:2 のようになる)
+
+ cls.redis.set(redis_chat_key, json.dumps(chat)) # Redisには文字列しかsetできないため、JSON文字列として保存
+ cls.redis.rpush("chats", redis_chat_key) # チャットのリストにチャットのキーを追加
+
+ @classmethod
+ def get_caches(cls):
+ # Redisからチャット一覧を取得するメソッド
+ chat_ids = cls.redis.lrange("chats", 0, -1) # チャット全てを取得
+ chats = []
+ for chat_id in chat_ids:
+ chat = json.loads(cls.redis.get(chat_id)) # JSON文字列からPythonで扱う辞書型に変換
+ chats.append(chat)
+ return chats # 辞書型の配列を返す
Pythonが読めない方のために、必要なことは概ねコメントに書きました。
投稿部分と取得部分にRedisを使う事で、EC2が落ちてもデータが失われない構成になっています。また、データをインスタンスの外に配置することの副作用として、複数インスタンスで同一のデータを参照できるというメリットも生まれました。これでスケールアウト可能な構成に一歩近づきました。
実際に複数のインスタンスを起動してみます。
この写真ではポートを変えて2つのアプリケーションを起動してみました。以前のアプリケーションではインスタンス間でチャットの内容は同期されませんでしたが、データをRedisに保存する事で同じ内容が表示されます。しかしこれではまだスケールアウトが完璧ではありません。起動時にチャット履歴の状態は同期されますが、リアルタイムなメッセージ送受信がインスタンス間で同期されていないため、以下の画像のようにlocalhost:8888にチャットを送っても、localhost:8889には届かない結果になってしまいます。逆も同じです。
次はこれを解決し、完全にスケールアウトが可能な構成を考えてみましょう。
スケールアウトするWebSocketアプリケーション
スケールアウトさせるためには、あるインスタンスで起きたイベントを他のインスタンスに通知する必要があります。上の例では、localhost:8888で起きたチャットが投稿されたイベントをlocalhost:8889に通知する仕組みが整っていなかったため、ユーザーが参照するデータに不整合が生じてしまいました。これを解決するためにもRedisを使うことができます。Redisに用意されているPubSubの仕組みを利用して、スケールアウト可能なチャットに進化させてみましょう。
PubSubとは
PubSubとは、Publish(Pub)とSubscribe(Sub)を表したもので、プロセス間やアプリ間の通信で使われます。あるコネクションでイベントをPublishする事ができ、そのイベントは事前にSubscribeしてあるコネクションにイベントが通知されます。
スケールアウトするチャット
インスタンス間の通信をPubSubを使うことでスケールアウトできるように改良していきます。これを実装したコードは以下になります
https://github.com/kouki-dan/CloudChat/tree/pubsub-chat
さぁ、diffを見てみましょう
+class Listener(threading.Thread):
+ def __init__(self, r):
+ threading.Thread.__init__(self)
+ self.redis = r
+ self.pubsub = self.redis.pubsub()
+ self.pubsub.subscribe(["chats"])
+ def work(self, item):
+ if item["type"] == "message":
+ chat = json.loads(item["data"])
+ ChatSocketHandler.send_updates(chat)
+ def run(self):
+ for item in self.pubsub.listen():
+ if item['data'] == "KILL":
+ self.pubsub.unsubscribe()
+ else:
+ self.work(item)
+
まず、Subscribeするためのクラスを作成しています。Subscribeをメインスレッドで扱うと、メインスレッドをロックしてしまうため、この部分はマルチスレッドで扱います。マルチスレッドのためにPython標準モジュールのthreadingモジュールを利用しています。
このクラスでは、メッセージがPublishされてきた時に、ChatSocketHandler.send_updates(chat)
を発行しています。これにより、各インスタンスに接続されたユーザー全員にチャットが送信されたというイベントを送ることができます。
次にチャット本体部分を見てみましょう
class ChatSocketHandler(tornado.websocket.WebSocketHandler):
waiters = set()
redis = Redis(decode_responses=True)
+ # Listenerを追加
+ client = Listener(redis)
+ client.start()
# ......
# ......
def on_message(self, message):
parsed = tornado.escape.json_decode(message)
chat = {
"body": parsed["body"],
}
ChatSocketHandler.update_cache(chat)
- ChatSocketHandler.send_updates({
- "chats": [chat]
- })
-
+ ChatSocketHandler.redis.publish("chats", json.dumps({
+ "chats": [chat]
+ }))
変更点は、今まで直接send_updates
を呼んでいた部分を、redisのpublishに変更しただけです。これにより、以下のように通信が行われます。(図作ろうと思ったんですが、ツール力の無さに断念しました。いつか図を書くかもしれないです。TODO)
ユーザー
↓ (チャットをWebSocketで送信)
各インスタンス
↓ (Publish)
Redis
↓ (Subscribe済みのインスタンスへ通知)
インスタンスA, インスタンスB,....(全てのインスタンス)
↓ (各インスタンスに接続しているユーザーへチャットメッセージをWebSocketで送信)
ユーザー
これにより、インスタンス間で完全に同期が取れた、スケールアウト可能なチャットアプリケーションが完成しました。アプリケーション部分はひとまずこれで完成です。クラウドネイティブなアプリケーションを目指すために足りない部分は、障害発生時のフェイルオーバーです。これはサーバー側ではどうしようもないのでクライアントから再接続しに行くことにしましょう。
WebSocketアプリケーションのフェイルオーバー
これまでにスケールアウト可能なWebSocketアプリケーションを作成してきました。一つのインスタンスでは信頼性が担保されないクラウドを利用する上で、障害が発生した時にどのようにフェイルオーバーするかが重要になります。これは、AB2つのインスタンスが起動されていたとすると、Aのインスタンスが落ちた時にAにつながっていたコネクションをすべてBのインスタンスに繋ぎ直す事で実現できます。
今まで通り、まずはこれを実装したURLから書いていきます。
https://github.com/kouki-dan/CloudChat/tree/reconnect-for-failover
今までは接続の度にすべてのチャットをサーバーから送信していたのですが、再接続時はチャットの情報は必要ないためサーバー側コードも少し変更しています。再接続に対応したクライアントコードを見てみましょう。
window.addEventListener("load", function() {
// URLが開かれた時の接続
connect(function() {
// エラー時のハンドラー
alert("connection error");
}, closeFunction, function() {
// 接続時のハンドラー
// 初回接続時は、チャット履歴を要求する
socket.send(JSON.stringify({"type":"command", "command":"requestFirstChat"}))
});
// WebSocketに接続する関数
function connect(errorFunction, closeFunction, openFunction) {
var url = "ws://" + location.host + "/chatsocket";
socket = new WebSocket(url);
// | メッセージを受けた時は共通の処理だが
socket.onmessage = function(event) {
receiveChat(JSON.parse(event.data)["chats"]) // DOMに反映する関数
}
// | エラー等の処理は、初回と再接続で異なるので、引数で与えられるようにする
socket.onclose = closeFunction;
socket.onerror = errorFunction;
socket.onopen = openFunction;
}
// EC2が落ちる等してコネクションが閉じられた時の関数
function closeFunction(event) {
var retry = 0;
setTimeout(function() {
var callee = arguments.callee; // setTimeoutで繰り返し自分自身を呼ぶため
connect(function() {
// エラー時の関数
retry++;
// エラーは3回まで、自分を呼び直すことで再接続を試みる
if(retry < 3) {
setTimeout(function() {
callee();
}, 3000);
} else {
alert("connection error");
}
}, null /* closeはこの段階では入れない */, function() {
// エラーでcloseした時もcloseは呼ばれてしまうため、接続成功時にcloseを入れる。
//
socket.onclose = closeFunction;
});
}, 1000);
};
// ........
当初想定していた倍のコード量になってしまいました。再接続なんて繋ぎ直すだけじゃん?と軽く考えていたのですが、初回起動時と再接続時の違い、再接続の回数等、コードを書いていくうちに書かなければいけない事が次々とでてきました。
これでついに、コネクション確立後、おもむろにctrl+cでアプリケーションを落とすと、別のインスタンスに再接続しにいくアプリケーションの完成です。実際に想定した通りに動かすためにはロードバランサーやDNSの設定やチューニングも必要になるのですが、ここではアプリケーションの実装だけに話を留めることにします。
長々とコードを書いてきましたが、今回目標としていたアプリケーションはこれで完成になります。今回作成したアプリケーションでは再接続中に送信されたメッセージは失われてしまうので完全に完成とはいえないのですが、ひとまず完成ということにしておきます。送信メッセージについてはクライアントにキャッシュしておく。受信メッセージについてはクライアントに最後に送信したメッセージIDをクライアントから受け取り、受信できなかった分のメッセージをサーバーに再送してもらう、等と言った実装方法が考えられます。興味がある方は書いてみてください、そしてPRください。ということで、これでこの記事は終わりです。お疲れ様でした。
実際に動くコードはgithubにtagを付けてリンクを張っているので、そちらを参照していただければと思います。
まとめ
ここまで、何も考えずに書いたチャットをクラウドで高可用性を保ちながら動くアプリケーションに進化させていく過程を見てきました。僕自信もコードを書きながらこの記事を書いていたので、最後にインスタンスを落とした時の再接続ができるところまで書けてホッとしました。
しかし、実際に動かすアプリケーションを書くとなると考える事はたくさんあります。セッションもそうですし、述べたとおり、ここで作ったチャットも実は完全ではありません。チャットの場合で言うと、ダイレクトメッセージを送る。なんて処理も難しそうです。WebSocketをクラウドで動かすことはとても考えることがあり大変ですね。たまたま個人的にWebSocketアプリケーションを書く機会があるので、ここで書いた内容を使って 実際にクラウドネイティブを意識してWebSocketを使ったリアルタイム通信アプリケーションを作成してみた
なんて記事もかけたらいいなと思います。
あと、この記事で一番大事なことは最初にも書きましたが、僕の誕生日は今日 ということです。お祝いメッセージ、プレゼントお待ちしております。
こちらも読みたい
参考にしたURL、読みたいと思ってるURLなどなどです
- Migrating to Cloud-Native Application
- AWS Well-Architected Framework
- Building Microservises
- Microservices とクラウド、それからオンプレミスの可能性について新卒Webエンジニア向けにまとめてみる(昨日のアドベントカレンダーの記事)
明日は
明日はひろっぴー(@about_hiroppy)が 何か を書いてくれるみたいですね。 何か とは何でしょう。楽しみです。