Phoenix Channelで作る最先端Webアプリ - topic-subtopic編 - Qiita
Phoenix Channelで作る最先端Webアプリ - ETS編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Fault Tolerance編 - Qiita
Phoenix Channelで作る最先端Webアプリ - 地図(Geo)拡張編 - Qiita
Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Reactチャット編 - Qiita
今回の技術的なポイントは以下の通りです。
1.Elixir ApplicationのETSで状態を保存し、Fault Toleranceに対応する
今回はElixirに最初から組み込まれているDB(?)であるETS(Erlang Term Storage)を使ってみます。嬉しいことにETSは特別な設定とか無しに、すぐに使えます。
1.ETSでFault Toleranceに対応
ListenTweets.ListenerではTweetsの入力を待って、入力があったらブロードキャストを行うループがあります。for tweet <- stream do という文になりますが、ここでカウンターを仕掛けて、何番目の書き込みかもブロードキャストで渡したいと思います。試してみるとわかるのですが、このループ文をそのままにカウンター変数を導入することは難しいです。私の理解不足かもしれませんが、グローバル変数のようなものをカウンターとして、ここでインクリメントすることはできません。全てのデータはimmutableですから。
しかしETSを使うとこの問題は簡単にクリアーできます。しかもFault Toleranceに対応した形で実現できます。ETSはメモリーデータです。非常に高速で、システム全体で共有できます。通常、Elixirでは専用のGenServerを用意して状態をキープしますが、その必要もありません。以下に特徴のいくつかを列挙します。
・テーブルはglobal name(atom)でアクセスできます。
・テーブルは mutableです。
・複数のプロセスが同時にアクセス(read & write)できます。
・テーブルは全く分離されたメモリースペースに置かれ、出し入れ時にディープコピーされます。
・ガベージコレクターによる弊害はありません。上書きや削除時に直ぐに解放されます。
・テーブルを作成したプロセスが死ぬと、テーブルは回収されます。
・Cコードで実装されており、自分で実装するより、速く効率的です。
2.ETSの実装
それでは早速実装していきましょう。application.exでテーブルを作成します。:listener_stateと名付けます。:named_tableをオプションに含めることで、この名前を用いて直接テーブルにアクセスできます。:publicを指定しているので、全てのプロセスで読み/書きが可能です。
#
:ets.new(:listener_state, [:public, :named_table])
opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
Supervisor.start_link(children, opts)
end
end
次にlistener.exでこのテーブルを使います。「# msg_counterを取り出す」部分を見てください。テーブルにkeyの値が無ければ1を、あればその値をmsg_counterに設定します。「# msg_counterを更新する」を見てください。msg_counterを1インクリメントし、keyの新しい値としてテーブルを更新しています。
ETSに関する設定とかコーディングは以上で全てです。超簡単ですね。Fault Toleranceについても、再起動時にETSを値を取得しているので、カウンターも連続した値をキープしたまま再起動します。pidを意識的に比較しない限りは、クラッシュした事実にも気づきません。
#
def init(key) do
channel =
get_pid()
|> get_socket()
|> get_channel(key)
if channel == :error do
IO.puts("error!!!")
else
PhoenixChannelClient.join(channel)
pid = spawn(fn ->
stream = ExTwitter.stream_filter(track: key)
for tweet <- stream do
msg_counter = # msg_counterを取り出す
case :ets.lookup(:listener_state, key) do
[] -> 1
[{_key, state}] -> state
end
PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text, geo: tweet.geo, counter: msg_counter})
# msg_counterを更新する
:ets.insert(:listener_state, {key, msg_counter+1})
end
end)
end
{ :ok, key }
end
#
上でPhoenixChannelClient.push/3でcounterも渡すように変更しました。全体のコードをシンプルにするために形式を合わせ、ブラウザのクライアントの方も、channel.push()でcounter:0を渡すように変更します。
#
handleSubmit(event) {
event.preventDefault();
this.channel.push("new_msg", {msg: this.state.inputMessage, geo:null, counter:0})
this.setState({ inputMessage: "" })
}
#
channel moduleの方も、counterの追加に対応します。
#
def handle_in("new_msg", %{"msg" => msg, "geo" => geo, "counter" => counter}, socket) do
user_name = socket.assigns[:user_name]
broadcast(socket, "new_msg", %{msg: msg, user_name: user_name, time: now(), geo: geo, counter: counter})
{:reply, :ok, socket}
end
#
クライアントでcounterを表示するようにします
#
return (
<List key={index}>
<ListItem
leftAvatar={<Avatar>{message.user_name}</Avatar>}
primaryText={unixTime2ymd(message.time)}
secondaryText={geo}
secondaryTextLines={2} >
【{message.counter}】 {message.msg}
</ListItem>
</List>
#
以上でソースコードは全て修正しました。
3.Fault Toleranceの確認
次にコマンドでReactChat Applicationを立ち上げます。ブラウザを開き「さんど」でjoinします。キーワード「らーめん」と「花見」を登録します。以下のように「らーめん」と「花見」でそれぞれ独立したカウンターが動作しているのがわかります。
それでは、Fault Toleranceの確認を行ってみます。iexシェルから以下のコマンドを打って、プロセスをクラッシュさせます。
alias ListenTweets.{Listener,ListenerSupervisor}
pid=ListenerSupervisor.pid_from_key("らーめん")
send pid, {:div,0}
以下がクラッシュ時の出力です。
iex(2)> [debug] INCOMING "new_msg" on "room:lobby" to ReactChatWeb.RoomChannel
Transport: Phoenix.Transports.WebSocket
Parameters: %{"counter" => 14, "geo" => nil, "msg" => "RT @ishibashitaka: お知らせ\nいつもご愛顧いただきましてありがとうございます。\nこのたび らーめん 鎹 は\n30年3月30日をもちまして、閉店することとなりました。\nこれまでの皆さまからのご支援、心より感謝申し上げます。\nなお、近日中に移転再開する予定でござい…"}
{"new_msg",
%{
"counter" => 14, <===※クラッシュ前
"geo" => nil,
"msg" => "RT @ishibashitaka: お知らせ\nいつもご愛顧いただきましてありがとうございます。\nこのたび らーめ ん 鎹 は\n30年3月30日をもちまして、閉店することとなりました。\nこれまでの皆さまからのご支援、心より感謝申し 上げます。\nなお、近日中に移転再開する予定でござい…",
"time" => 1521694170,
"user_name" => "らーめん"
}}
{"phx_reply", %{"response" => %{}, "status" => "ok"}}
alias ListenTweets.{Listener,ListenerSupervisor}
[ListenTweets.Listener, ListenTweets.ListenerSupervisor]
iex(3)> pid=ListenerSupervisor.pid_from_key("らーめん")
#PID<0.403.0>
iex(4)> send pid, {:div,0}
{:div, 0}
iex(5)> [error] GenServer {Registry.Listener, "らーめん"} terminating
** (ArithmeticError) bad argument in arithmetic expression
(listen_tweets) lib/listen_tweets/listener.ex:48: ListenTweets.Listener.handle_info/2
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:div, 0}
State: "らーめん"
[info] JOIN "room:lobby" to ReactChatWeb.RoomChannel
Transport: Phoenix.Transports.WebSocket (1.0.0)
Serializer: Phoenix.Transports.WebSocketSerializer
Parameters: %{"user_name" => "らーめん"}
[info] Replied room:lobby :ok
{"phx_reply", %{"response" => %{}, "status" => "ok"}}
{"presence_state",
%{
"さんど" => %{
"metas" => [%{"online_at" => 1521693757, "phx_ref" => "bR+hgTs0Cfk="}]
}
}}
{"presence_diff",
%{
"joins" => %{
"らーめん" => %{
"metas" => [%{"online_at" => 1521694208, "phx_ref" => "GkLxy5uQLek="}]
}
},
"leaves" => %{}
}}
nil
iex(6)> [debug] INCOMING "new_msg" on "room:lobby" to ReactChatWeb.RoomChannel
Transport: Phoenix.Transports.WebSocket
Parameters: %{"counter" => 15, "geo" => nil, "msg" => "RT @tenshinmukai: 4月1日に「CLAMP of STARS」をご購 入いただいた方の特典会に関しましては、石橋のご協力のもと、都内の別の場所にて鎹の台湾もつ鍋をご提供できる形を検討しております。\nよろしくお願いします。。 #SDB https://t.co/mF…"}
{"new_msg",
%{
"counter" => 15, <===※クラッシュ後
"geo" => nil,
"msg" => "RT @tenshinmukai: 4月1日に「CLAMP of STARS」をご購入いただいた方の特典会に関しましては、石橋の ご協力のもと、都内の別の場所にて鎹の台湾もつ鍋をご提供できる形を検討しております。\nよろしくお願いします。。 #SDB https://t.co/mF…",
"time" => 1521694242,
"user_name" => "らーめん"
}}
{"phx_reply", %{"response" => %{}, "status" => "ok"}}
※クラッシュ前と※クラッシュ後のcounterは、14と15になっています。クリアーされずに、再起動時にETSから正しい値を取り出していることがわかります。
今回のケースは少し特殊ですが、通常はGenServerが保持しているstateの値を丸ごとETSに保管し、再起動時にリカバリーとしてETSからstateを戻す処理にすることが多いようです。
4.修正ソースコードの全リスト
今回修正したソースの全リストを掲載します。
defmodule ListenTweets.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
{Registry, keys: :unique, name: Registry.Listener}, #追加
ListenTweets.ListenerSupervisor, #Listenerから変更
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
:ets.new(:listener_state, [:public, :named_table])
opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule ListenTweets.Listener do
use GenServer, restart: :transient #child_spec
# use GenServer, restart: :permanent #child_spec
# use GenServer, restart: :temporary #child_spec
def start_link(key) do
GenServer.start_link(__MODULE__, key, name: via_tuple(key))
end
# via_tuple
def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}
def init(key) do
channel =
get_pid()
|> get_socket()
|> get_channel(key)
if channel == :error do
IO.puts("error!!!")
else
PhoenixChannelClient.join(channel)
pid = spawn(fn ->
stream = ExTwitter.stream_filter(track: key)
for tweet <- stream do
msg_counter =
case :ets.lookup(:listener_state, key) do
[] -> 1
[{_key, state}] -> state
end
PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text, geo: tweet.geo, counter: msg_counter})
:ets.insert(:listener_state, {key, msg_counter+1})
end
end)
end
{ :ok, key }
end
def handle_cast({ :update, new_key }, _current_key) do
{ :noreply, new_key }
end
def handle_info({ :div, a}, key) do
ans = 40 / a
IO.puts ("ans=#{ans}")
{ :noreply, key }
end
def handle_info(mess, key) do
IO.inspect mess
{ :noreply, key }
end
defp get_pid do
res_pid = PhoenixChannelClient.start_link()
case res_pid do
{:ok, pid} -> pid
_ -> :error
end
end
defp get_socket (:error) do
:error
end
defp get_socket (pid) do
res_socket = PhoenixChannelClient.connect(pid,
host: "localhost",
port: 4000,
path: "/socket/websocket",
secure: false,
heartbeat_interval: 30_000)
case res_socket do
{:ok, socket} -> socket
_ -> :error
end
end
defp get_channel(:error, _key) do
:error
end
defp get_channel(socket, key) do
channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
channel
end
end
defmodule ReactChatWeb.RoomChannel do
use ReactChatWeb, :channel
alias ReactChatWeb.Presence
alias ListenTweets.ListenerSupervisor
def join("room:lobby", %{"user_name" => user_name}, socket) do
send(self(), {:after_join, user_name})
{:ok, socket}
end
def handle_in("new_msg", %{"msg" => msg, "geo" => geo, "counter" => counter}, socket) do
user_name = socket.assigns[:user_name]
broadcast(socket, "new_msg", %{msg: msg, user_name: user_name, time: now(), geo: geo, counter: counter})
{:reply, :ok, socket}
end
def handle_in("new_key", %{"msg" => msg}, socket) do
ListenerSupervisor.start_listener(msg)
{:reply, :ok, socket}
end
def handle_in("del_key", %{"msg" => msg}, socket) do
ListenerSupervisor.stop_listener(msg)
{:reply, :ok, socket}
end
def handle_info({:after_join, user_name}, socket) do
push(socket, "presence_state", Presence.list(socket))
{:ok, _ref} = Presence.track(socket, user_name, %{online_at: now()})
{:noreply, assign(socket, :user_name, user_name)}
end
def terminate(_reason, socket) do
{:noreply, socket}
end
defp now do
System.system_time(:seconds)
end
end
import React from "react";
import {Socket, Presence} from "phoenix"
import RaisedButton from 'material-ui/RaisedButton';
import Paper from 'material-ui/Paper';
import Divider from 'material-ui/Divider';
import TextField from 'material-ui/TextField';
import {Map, Marker, Popup, TileLayer, LayerGroup} from 'react-leaflet'
//import Paper from 'material-ui/Paper';
import {Tabs, Tab} from 'material-ui/Tabs';
import FontIcon from 'material-ui/FontIcon';
import {List, ListItem} from 'material-ui/List';
//import Divider from 'material-ui/Divider';
import Subheader from 'material-ui/Subheader';
import Avatar from 'material-ui/Avatar';
const max_messages_length =30;
function unixTime2ymd(intTime){
// var d = new Date( intTime );
var d = new Date( intTime * 1000 );
var year = d.getFullYear();
var month = d.getMonth() + 1;
var day = d.getDate();
var hour = ( '0' + d.getHours() ).slice(-2);
var min = ( '0' + d.getMinutes() ).slice(-2);
var sec = ( '0' + d.getSeconds() ).slice(-2);
return( year + '-' + month + '-' + day + ' ' + hour + ':' + min + ':' + sec );
}
class Chat extends React.Component {
constructor() {
super();
this.state = {
isJoined: false,
inputUser: "",
inputMessage: "",
inputKey: "",
inputKey0: "",
messages: [],
geo_messages: [],
presences: {},
lat: 40,
lng: -74,
zoom: 6,
}
}
handleInputUser(event) {
this.setState({
inputUser: event.target.value
})
}
handleInputMessage(event) {
this.setState({
inputMessage: event.target.value
})
}
handleInputKey(event) {
this.setState({
inputKey: event.target.value
})
}
handleInputKey0(event) {
this.setState({
inputKey0: event.target.value
})
}
handleJoin(event) {
event.preventDefault();
if(this.state.inputUser!="") {
// assets/js/socket.jsのデフォルトの定義と同じ
this.socket = new Socket("/socket", {params:
{token: window.userToken}
});
this.socket.connect();
this.channel = this.socket.channel("room:lobby", {user_name: this.state.inputUser});
// 現在のサーバの状態を初期状態として設定
this.channel.on('presence_state', state => {
let presences = this.state.presences;
presences = Presence.syncState(presences, state);
this.setState({ presences: presences })
console.log('state', presences);
});
// 初期状態からの差分を更新していく
this.channel.on('presence_diff', diff => {
let presences = this.state.presences;
presences = Presence.syncDiff(presences, diff);
this.setState({ presences: presences })
console.log('diff', presences);
});
this.channel.on("new_msg", payload => {
let messages = this.state.messages;
messages.push(payload)
if(messages.length > max_messages_length) {
messages.shift(); // 先頭を削除
}
this.setState({ messages: messages })
if( !!payload.geo && !!payload.geo.coordinates ) {
let geo_messages = this.state.geo_messages;
geo_messages.push(payload);
if(geo_messages.length > max_messages_length) {
geo_messages.shift(); // 先頭を削除
}
this.setState({ geo_messages: geo_messages })
}
})
this.channel.join()
.receive("ok", response => { console.log("Joined successfully", response) })
.receive('error', resp => { console.log('Unable to join', resp); });
this.setState({ isJoined: true })
}
}
handleLeave(event) {
event.preventDefault();
this.socket.disconnect();
this.setState({ isJoined: false })
}
handleSubmit(event) {
event.preventDefault();
this.channel.push("new_msg", {msg: this.state.inputMessage, geo:null, counter:0})
this.setState({ inputMessage: "" })
}
handleKey(event) {
event.preventDefault();
this.channel.push("new_key", {msg: this.state.inputKey})
this.setState({ inputKey: "" })
}
handleKey0(event) {
event.preventDefault();
this.channel.push("del_key", {msg: this.state.inputKey0})
this.setState({ inputKey0: "" })
}
render() {
const style1 = { margin: '16px 32px 16px 16px', padding: '10px 32px 10px 26px',};
const style2 = { display: 'inline-block', margin: '1px 8px 1px 4px',};
//--------------------------
let messages = this.state.messages.map((message, index) => {
let geo = "";
if( !!message.geo && !!message.geo.coordinates ) {
geo = JSON.stringify(message.geo.coordinates);
}
return (
<List key={index}>
<ListItem
leftAvatar={<Avatar>{message.user_name}</Avatar>}
primaryText={unixTime2ymd(message.time)}
secondaryText={geo}
secondaryTextLines={2} >
【{message.counter}】 {message.msg}
</ListItem>
</List>
)
});
messages = messages.reverse();
let geo_messages = this.state.geo_messages;
geo_messages = geo_messages.reverse();
let center_position = [this.state.lat, this.state.lng]; // 緯度,経度
if( geo_messages && geo_messages[0]) {
center_position = [ geo_messages[0].geo.coordinates[0], geo_messages[0].geo.coordinates[1] ];
}
let Markers = [];
for (let i = 0; i < geo_messages.length; i++) {
let tweet_position = [ geo_messages[i].geo.coordinates[0], geo_messages[i].geo.coordinates[1] ];
let tweet_coordinates = JSON.stringify(geo_messages[i].geo);
Markers.push(
<Marker position={tweet_position} key={i}>
<Popup>
<div>
<div>{unixTime2ymd(geo_messages[i].time)}</div>
<div>{geo_messages[i].msg}</div>
<div>{geo_messages[i].user_name}</div>
<div>{tweet_coordinates}</div>
</div>
</Popup>
</Marker>
)
}
//--------------------------
let presences = [];
Presence.list(this.state.presences, (name, metas) => {
presences.push(name);
});
let presences_list = presences.map( (user_name, index) =>
<li key={index} style={style2}>{user_name}</li>
);
let form_jsx;
if(this.state.isJoined===false) {
form_jsx = (
<form onSubmit={this.handleJoin.bind(this)} >
<label>ユーザ名を指定してJoin</label>
<TextField hintText="ユーザ名" value = {this.state.inputUser} onChange = {this.handleInputUser.bind(this)} />
<RaisedButton type="submit" primary={true} label="Join" />
</form>
);
} else {
form_jsx = (
<div>
<Paper style={style1}>
<label>参加者 : {this.state.inputUser}</label>
<ul>
{presences_list}
</ul>
<div align="right">
<form onSubmit={this.handleLeave.bind(this)} >
<RaisedButton type="submit" primary={true} label="Leave" />
</form>
</div>
</Paper>
<Paper style={style1}>
<form onSubmit={this.handleKey.bind(this)}>
<label>キーワード登録</label>
<TextField hintText="key" value = {this.state.inputKey} onChange = {this.handleInputKey.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<form onSubmit={this.handleKey0.bind(this)}>
<label>キーワード削除</label>
<TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<form onSubmit={this.handleSubmit.bind(this)}>
<label>チャット</label>
<TextField hintText="Chat Text" value = {this.state.inputMessage} onChange = {this.handleInputMessage.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<Divider />
<br />
<div>
{messages}
</div>
</Paper>
</div>
);
}
return (
<Tabs>
<Tab label="チャット タイムライン">
<br />
<h1>チャット タイムライン編</h1>
<div>
{form_jsx}
</div>
</Tab>
<Tab label="チャット マップ">
<br />
<h1>チャット マップ編</h1>
<Map center={center_position} zoom={this.state.zoom}>
<TileLayer attribution='© <a href="http://osm.org/copyright">OpenStreetMap</a> contributors' url='http://{s}.tile.osm.org/{z}/{x}/{y}.png' />
<LayerGroup>
{Markers}
</LayerGroup>
</Map>
</Tab>
</Tabs>
)
}
}
export default Chat
以上です。