Phoenix Channelで作る最先端Webアプリ - topic-subtopic編 - Qiita
Phoenix Channelで作る最先端Webアプリ - ETS編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Fault Tolerance編 - Qiita
Phoenix Channelで作る最先端Webアプリ - 地図(Geo)拡張編 - Qiita
Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Reactチャット編 - Qiita
今回の技術的なポイントは以下の通りです。
1.DynamicSupervisorでElixir Applicationを動的に複数立ち上げる
2.React画面から入力したキーワードで、Elixir ApplicationはTweetsをリッスンする
今回は前回のPhoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiitaを改善していきます。
1.DynamicSupervisorで実現しようとしているもの
DynamicSupervisorで実現することは、ListenTweets.Listenerプロセスを動的に起動できるようにすることです。今まではListenTweets Applicationを起動するときに、ListenTweets.Listenerプロセスも起動していました。起動時のkey(引数)も固定で、プロセスも一個しか起動できませんでした。これを改善します。
以下にListenTweets Applicationを修正します。関数ListenTweets.Listenerをプロセスとして動的に起動するようにします。ListenTweets.Listenerプロセスは複数個、同時に起動可能です。それぞれが別のkey("ラーメン"とか)を起動時に与えられて、そのkeyでTwitterをリッスンします。またchannelにjoinするときのuser_nameもkeyとなります。
つまり、"ラーメン"というkeyでスタートされたListenTweets.Listenerは、"ラーメン"という名前でチャットに参加しつつ、"ラーメン"でTwitterを監視し、Tweetsを見つけたらチャットに書き込みます。同時に"宮沢りえ"でListenTweets.Listenerを起動することによって、"宮沢りえ"のTweetsもチャットに書き込まれます。
複数のブラウザ(React)同士のChannelチャットにElixirクライアント(Listener)を追加します。上での説明を図にすると以下のようになります。これらの構成要素はレガシーなHTTPではなく、基本的にWeb socketで通信を行っていることに注目してください。
React ---| |- Listener("ラーメン")
|--Phoenix Channel module --|- Listener("宮沢りえ")
React ---| |- Listener("森友")
(注意)Listenerは理論的に複数個可能ですが、実際にはTwitter Stream APIの制限で、2個までしか確認できませんでした。3個以上になるとStreamが機能しなくなりました。私のVM環境の制限かTwitterライブラリの制限か、Twitter APIの制限か、わかりません。しかし今回の記事の趣旨にはあまり関係はないです。
以下に実際の動作画面を示します。
またこの画像にように「ラーメン」と「宮沢りえ」の2個のListenerプロセスを起動しているときに、以下のようにiexシェル(phoenixを起動しているiex)からプロセスの稼働状況を確認できます。最後の「森友」はRegistry(後で説明)に登録されていないので、pidの問い合わせの段階でnilを返されます。
:sys.get_stateはGenServerの状態(state)を返してくれる関数です。Listenerプロセスのstateはkeyの文字列をそのまま入れて置いたことを思い出してください。
iex(21)> alias ListenTweets.{Listener,ListenerSupervisor}
[ListenTweets.Listener, ListenTweets.ListenerSupervisor]
iex(22)> Supervisor.count_children(ListenerSupervisor)
%{active: 2, specs: 2, supervisors: 0, workers: 2}
iex(25)> via=Listener.via_tuple("ラーメン")
{:via, Registry, {Registry.Listener, "ラーメン"}}
iex(26)> state_data = :sys.get_state(via)
"ラーメン"
iex(27)> pid=GenServer.whereis(via)
#PID<0.410.0>
iex(28)> Process.alive?(pid)
true
iex(35)> via=Listener.via_tuple("宮沢りえ")
{:via, Registry, {Registry.Listener, "宮沢りえ"}}
iex(36)> state_data = :sys.get_state(via)
"宮沢りえ"
iex(37)> pid=GenServer.whereis(via)
#PID<0.12979.0>
iex(38)> Process.alive?(pid)
true
iex(47)> via=Listener.via_tuple("森友")
{:via, Registry, {Registry.Listener, "森友"}}
iex(48)> pid=GenServer.whereis(via)
nil
2.ListenTweets Applicationの修正
これから構築していく、ListenTweets ApplicationのSupervisor Treeを以下に示します。
|- Listener("ラーメン")
Supervisor --- ListenerSupervisor -|- Listener("宮沢りえ")
|- Listener("森友")
まずapplication.exを変更します。子プロセスとしてRegistryとListenTweets.ListenerSupervisorを起動するようにします。Registryは動的に起動するプロセスを、pidで管理するのではなく、nameで管理するために使われます。DNSのような役割です。これによって"ラーメン"で起動したプロセスのpidをいちいち記録する必要はなくなり、"ラーメン"という名前でプロセスを停止できるようになります。
ListenTweets.ListenerSupervisorはListenTweets.Listenerを専用に管理するSupervisorです。ここでは動的に子プロセスを起動していくのでDynamicSupervisorというものを使います。ちょっと前まではstrategy: :simple_one_for_oneを使うことによって動的プロセスの起動を行っていたようですが、最近はDynamicSupervisorを使うようになっているようです。
defmodule ListenTweets.Application do
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
{Registry, keys: :unique, name: Registry.Listener}, #追加
ListenTweets.ListenerSupervisor, #Listenerから変更
]
opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
Supervisor.start_link(children, opts)
end
end
以下にListenTweets.ListenerSupervisorの実装を示します。
先頭にuse DynamicSupervisor行をおいています。
start_link/1で起動されます。init/1がcallbackとなっています。strategy: :one_for_oneを指定していることに注意してください。ここまではListenTweets Application起動時に処理が行われます。
その後、任意のタイミングで、start_listener(key)を呼ぶことによって、Listenerをスタートします。引数keyを渡していることに注意してください。このListenerプロセスはkeyというnameで管理することになります。pid_from_key/1関数が、与えられたkeyからpidを返してくれます。ですからstop_listener(key)でkeyを元に、該当するプロセスを停止することが可能になります。
defmodule ListenTweets.ListenerSupervisor do
use DynamicSupervisor
alias ListenTweets.Listener
def start_link(_options), do:
DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)
def start_listener(key) do
spec = {Listener, key}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def stop_listener(key) do
Supervisor.terminate_child(__MODULE__, pid_from_key(key))
end
def init(:no_args), do:
DynamicSupervisor.init(strategy: :one_for_one)
defp pid_from_key(key) do
key
|> Listener.via_tuple()
|> GenServer.whereis()
end
end
前回書いたListenTweets.Listenerを少し変更する必要があります。以下の4点です。
(1)child_specを変更します。
ListenTweets.ListenerSupervisorによって正しく再起動されるように、restart: :transient を指定します。これによりクラッシュしたときのみ再起動されるようになります。
(2)Registryのためにvia_tuple/3を定義します。
今回の使われ方が定番のようです。詳細はgoogleで。
(3)start_listenは無くしてinitですべて処理するようにしました。
ListenTweets.ListenerSupervisorのstart_listener/1ができたので、 ListenTweets.Listenerでタイミングを取る必要はなくなりました。
(4)user_name=keyでjoinするように変更しました
これに伴ってget_channel(socket)からget_channel(socket,key)二変更しました。
defmodule ListenTweets.Listener do
use GenServer, restart: :transient
#公開関数 start_link - プロセスの起動
def start_link(key) do
GenServer.start_link(__MODULE__, key, name: via_tuple(key)) #via_tupleを使う
end
# via_tuple
def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}
# start_linkのcallback
def init(key) do
channel =
get_pid()
|> get_socket()
|> get_channel(key)
if channel == :error do
IO.puts("error!!!")
else
PhoenixChannelClient.join(channel)
pid = spawn(fn ->
stream = ExTwitter.stream_filter(track: key)
for tweet <- stream do
IO.puts tweet.text
PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text})
end
end)
end
{ :ok, key }
end
# 未使用
def handle_cast({ :update, new_key }, _current_key) do
{ :noreply, new_key }
end
# 未使用
def handle_info(mess, key) do
IO.inspect mess
{ :noreply, key }
end
defp get_pid do
res_pid = PhoenixChannelClient.start_link()
case res_pid do
{:ok, pid} -> pid
_ -> :error
end
end
defp get_socket (:error) do
:error
end
defp get_socket (pid) do
res_socket = PhoenixChannelClient.connect(pid,
host: "localhost",
port: 4000,
path: "/socket/websocket",
secure: false,
heartbeat_interval: 30_000)
case res_socket do
{:ok, socket} -> socket
_ -> :error
end
end
defp get_channel(:error, _key) do
:error
end
defp get_channel(socket,key) do
channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
channel
end
end
3.ReactChat Application側の修正
今の状態でReactChat Applicationから、ListenTweets Applicationの拡張が正しく利用できることを確認します。これは簡単です。以下のコマンドで確認してください。
cd react_chat
mix deps.get
iex -S mix phx.server
iex> :application.which_applications
iex>alias ListenTweets.ListenerSupervisor
iex>ListenerSupervisor.start_listener("ラーメン")
iex>ListenerSupervisor.start_listener("宮沢りえ")
iex>ListenerSupervisor.stop_listener("ラーメン")
iex>ListenerSupervisor.stop_listener("宮沢りえ")
ListenTweets.ListenerSupervisorにstart_listener/1とstop_listener/1の機能がついたので、それを使うためのReact UIを変更し、channel moduleも拡張します。
以下のchannel moduleは、start_listener/1とstop_listener/1呼ぶためのAPIを拡張しただけです。new_keyとdel_keyのメッセージを加えました。
defmodule ReactChatWeb.RoomChannel do
#
alias ListenTweets.ListenerSupervisor
#
def handle_in("new_key", %{"msg" => msg}, socket) do
ListenerSupervisor.start_listener(msg)
{:reply, :ok, socket}
end
def handle_in("del_key", %{"msg" => msg}, socket) do
ListenerSupervisor.stop_listener(msg)
{:reply, :ok, socket}
end
#
次にReact UIでこの拡張に対応するためボタンを2つ増やします。
class Chat extends React.Component {
constructor() {
super();
this.state = {
#
inputKey: "",
inputKey0: "",
#
}
}
#
handleInputKey(event) {
this.setState({
inputKey: event.target.value
})
}
handleInputKey0(event) {
this.setState({
inputKey0: event.target.value
})
}
#
handleKey(event) {
event.preventDefault();
this.channel.push("new_key", {msg: this.state.inputKey})
this.setState({ inputKey: "" })
}
handleKey0(event) {
event.preventDefault();
this.channel.push("del_key", {msg: this.state.inputKey0})
this.setState({ inputKey0: "" })
}
#
<Paper style={style1}>
<form onSubmit={this.handleKey.bind(this)}>
<label>キーワード登録</label>
<TextField hintText="key" value = {this.state.inputKey} onChange
= {this.handleInputKey.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<form onSubmit={this.handleKey0.bind(this)}>
<label>キーワード削除</label>
<TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
#
以上で終わりです。
4.修正ソースコードの全リスト
今回修正したソースの全リストを掲載します。
defmodule ListenTweets.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
{Registry, keys: :unique, name: Registry.Listener}, #追加
ListenTweets.ListenerSupervisor, #Listenerから変更
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule ListenTweets.ListenerSupervisor do
use DynamicSupervisor
alias ListenTweets.Listener
def start_link(_options), do:
DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)
def start_listener(key) do
spec = {Listener, key}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def stop_listener(key) do
Supervisor.terminate_child(__MODULE__, pid_from_key(key))
end
def init(:no_args), do:
DynamicSupervisor.init(strategy: :one_for_one)
defp pid_from_key(key) do
key
|> Listener.via_tuple()
|> GenServer.whereis()
end
end
defmodule ListenTweets.Listener do
use GenServer, restart: :transient #child_spec
def start_link(key) do
GenServer.start_link(__MODULE__, key, name: via_tuple(key))
end
# via_tuple
def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}
def init(key) do
channel =
get_pid()
|> get_socket()
|> get_channel(key)
if channel == :error do
IO.puts("error!!!")
else
PhoenixChannelClient.join(channel)
pid = spawn(fn ->
stream = ExTwitter.stream_filter(track: key)
for tweet <- stream do
IO.puts tweet.text
PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text})
end
end)
end
{ :ok, key }
end
def handle_cast({ :update, new_key }, _current_key) do
{ :noreply, new_key }
end
def handle_info(mess, key) do
IO.inspect mess
{ :noreply, key }
end
defp get_pid do
res_pid = PhoenixChannelClient.start_link()
case res_pid do
{:ok, pid} -> pid
_ -> :error
end
end
defp get_socket (:error) do
:error
end
defp get_socket (pid) do
res_socket = PhoenixChannelClient.connect(pid,
host: "localhost",
port: 4000,
path: "/socket/websocket",
secure: false,
heartbeat_interval: 30_000)
case res_socket do
{:ok, socket} -> socket
_ -> :error
end
end
defp get_channel(:error, _key) do
:error
end
defp get_channel(socket, key) do
channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
channel
end
end
defmodule ReactChatWeb.RoomChannel do
use ReactChatWeb, :channel
alias ReactChatWeb.Presence
alias ListenTweets.ListenerSupervisor
def join("room:lobby", %{"user_name" => user_name}, socket) do
send(self(), {:after_join, user_name})
{:ok, socket}
end
def handle_in("new_msg", %{"msg" => msg}, socket) do
user_name = socket.assigns[:user_name]
broadcast(socket, "new_msg", %{msg: msg, user_name: user_name})
{:reply, :ok, socket}
end
def handle_in("new_key", %{"msg" => msg}, socket) do
ListenerSupervisor.start_listener(msg)
{:reply, :ok, socket}
end
def handle_in("del_key", %{"msg" => msg}, socket) do
ListenerSupervisor.stop_listener(msg)
{:reply, :ok, socket}
end
def handle_info({:after_join, user_name}, socket) do
push(socket, "presence_state", Presence.list(socket))
{:ok, _ref} = Presence.track(socket, user_name, %{online_at: now()})
{:noreply, assign(socket, :user_name, user_name)}
end
def terminate(_reason, socket) do
{:noreply, socket}
end
defp now do
System.system_time(:seconds)
end
end
import React from "react";
import {Socket, Presence} from "phoenix"
import RaisedButton from 'material-ui/RaisedButton';
import Paper from 'material-ui/Paper';
import Divider from 'material-ui/Divider';
import TextField from 'material-ui/TextField';
class Chat extends React.Component {
constructor() {
super();
this.state = {
isJoined: false,
inputUser: "",
inputMessage: "",
inputKey: "",
inputKey0: "",
messages: [],
presences: {}
}
}
handleInputUser(event) {
this.setState({
inputUser: event.target.value
})
}
handleInputMessage(event) {
this.setState({
inputMessage: event.target.value
})
}
handleInputKey(event) {
this.setState({
inputKey: event.target.value
})
}
handleInputKey0(event) {
this.setState({
inputKey0: event.target.value
})
}
handleJoin(event) {
event.preventDefault();
if(this.state.inputUser!="") {
// assets/js/socket.jsのデフォルトの定義と同じ
this.socket = new Socket("/socket", {params:
{token: window.userToken}
});
this.socket.connect();
this.channel = this.socket.channel("room:lobby", {user_name: this.state.inputUser});
// 現在のサーバの状態を初期状態として設定
this.channel.on('presence_state', state => {
let presences = this.state.presences;
presences = Presence.syncState(presences, state);
this.setState({ presences: presences })
console.log('state', presences);
});
// 初期状態からの差分を更新していく
this.channel.on('presence_diff', diff => {
let presences = this.state.presences;
presences = Presence.syncDiff(presences, diff);
this.setState({ presences: presences })
console.log('diff', presences);
});
this.channel.on("new_msg", payload => {
let messages = this.state.messages;
messages.push(payload)
this.setState({ messages: messages })
})
this.channel.join()
.receive("ok", response => { console.log("Joined successfully", response) })
.receive('error', resp => { console.log('Unable to join', resp); });
this.setState({ isJoined: true })
}
}
handleLeave(event) {
event.preventDefault();
this.socket.disconnect();
this.setState({ isJoined: false })
}
handleSubmit(event) {
event.preventDefault();
this.channel.push("new_msg", {msg: this.state.inputMessage})
this.setState({ inputMessage: "" })
}
handleKey(event) {
event.preventDefault();
this.channel.push("new_key", {msg: this.state.inputKey})
this.setState({ inputKey: "" })
}
handleKey0(event) {
event.preventDefault();
this.channel.push("del_key", {msg: this.state.inputKey0})
this.setState({ inputKey0: "" })
}
render() {
const style1 = { margin: '16px 32px 16px 16px', padding: '10px 32px 10px 26px',};
const style2 = { display: 'inline-block', margin: '1px 8px 1px 4px',};
let messages = this.state.messages.map((message, index) => {
return (
<div key={index}>
<p><strong>{message.user_name}</strong> > {message.msg}</p>
</div>
)
});
messages = messages.reverse();
let presences = [];
Presence.list(this.state.presences, (name, metas) => {
presences.push(name);
});
let presences_list = presences.map( (user_name, index) =>
<li key={index} style={style2}>{user_name}</li>
);
let form_jsx;
if(this.state.isJoined===false) {
form_jsx = (
<form onSubmit={this.handleJoin.bind(this)} >
<label>ユーザ名を指定してJoin</label>
<TextField hintText="ユーザ名" value = {this.state.inputUser} onChange = {this.handleInputUser.bind(this)} />
<RaisedButton type="submit" primary={true} label="Join" />
</form>
);
} else {
form_jsx = (
<div>
<Paper style={style1}>
<label>参加者 : {this.state.inputUser}</label>
<ul>
{presences_list}
</ul>
<div align="right">
<form onSubmit={this.handleLeave.bind(this)} >
<RaisedButton type="submit" primary={true} label="Leave" />
</form>
</div>
</Paper>
<Paper style={style1}>
<form onSubmit={this.handleKey.bind(this)}>
<label>キーワード登録</label>
<TextField hintText="key" value = {this.state.inputKey} onChange = {this.handleInputKey.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<form onSubmit={this.handleKey0.bind(this)}>
<label>キーワード削除</label>
<TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<form onSubmit={this.handleSubmit.bind(this)}>
<label>チャット</label>
<TextField hintText="Chat Text" value = {this.state.inputMessage} onChange = {this.handleInputMessage.bind(this)} />
<RaisedButton type="submit" primary={true} label="Submit" />
</form>
<Divider />
<br />
<div>
{messages}
</div>
</Paper>
</div>
);
}
return (
<div>
{form_jsx}
</div>
)
}
}
export default Chat