LoginSignup
2
1

More than 5 years have passed since last update.

Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編

Last updated at Posted at 2018-03-19

Phoenix Channelで作る最先端Webアプリ - topic-subtopic編 - Qiita
Phoenix Channelで作る最先端Webアプリ - ETS編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Fault Tolerance編 - Qiita
Phoenix Channelで作る最先端Webアプリ - 地図(Geo)拡張編 - Qiita
Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Reactチャット編 - Qiita

 今回の技術的なポイントは以下の通りです。

ポイント
1.DynamicSupervisorでElixir Applicationを動的に複数立ち上げる
2.React画面から入力したキーワードで、Elixir ApplicationはTweetsをリッスンする

 今回は前回のPhoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiitaを改善していきます。

1.DynamicSupervisorで実現しようとしているもの

 DynamicSupervisorで実現することは、ListenTweets.Listenerプロセスを動的に起動できるようにすることです。今まではListenTweets Applicationを起動するときに、ListenTweets.Listenerプロセスも起動していました。起動時のkey(引数)も固定で、プロセスも一個しか起動できませんでした。これを改善します。

 以下にListenTweets Applicationを修正します。関数ListenTweets.Listenerをプロセスとして動的に起動するようにします。ListenTweets.Listenerプロセスは複数個、同時に起動可能です。それぞれが別のkey("ラーメン"とか)を起動時に与えられて、そのkeyでTwitterをリッスンします。またchannelにjoinするときのuser_nameもkeyとなります。

 つまり、"ラーメン"というkeyでスタートされたListenTweets.Listenerは、"ラーメン"という名前でチャットに参加しつつ、"ラーメン"でTwitterを監視し、Tweetsを見つけたらチャットに書き込みます。同時に"宮沢りえ"でListenTweets.Listenerを起動することによって、"宮沢りえ"のTweetsもチャットに書き込まれます。

 複数のブラウザ(React)同士のChannelチャットにElixirクライアント(Listener)を追加します。上での説明を図にすると以下のようになります。これらの構成要素はレガシーなHTTPではなく、基本的にWeb socketで通信を行っていることに注目してください。

Channel-Client-Server
React ---|                           |- Listener("ラーメン")
         |--Phoenix Channel module --|- Listener("宮沢りえ")
React ---|                           |- Listener("森友")

(注意)Listenerは理論的に複数個可能ですが、実際にはTwitter Stream APIの制限で、2個までしか確認できませんでした。3個以上になるとStreamが機能しなくなりました。私のVM環境の制限かTwitterライブラリの制限か、Twitter APIの制限か、わかりません。しかし今回の記事の趣旨にはあまり関係はないです。

以下に実際の動作画面を示します。

image.png

 またこの画像にように「ラーメン」と「宮沢りえ」の2個のListenerプロセスを起動しているときに、以下のようにiexシェル(phoenixを起動しているiex)からプロセスの稼働状況を確認できます。最後の「森友」はRegistry(後で説明)に登録されていないので、pidの問い合わせの段階でnilを返されます。
 :sys.get_stateはGenServerの状態(state)を返してくれる関数です。Listenerプロセスのstateはkeyの文字列をそのまま入れて置いたことを思い出してください。

iex(21)> alias ListenTweets.{Listener,ListenerSupervisor}
[ListenTweets.Listener, ListenTweets.ListenerSupervisor]
iex(22)> Supervisor.count_children(ListenerSupervisor)
%{active: 2, specs: 2, supervisors: 0, workers: 2}

iex(25)> via=Listener.via_tuple("ラーメン")
{:via, Registry, {Registry.Listener, "ラーメン"}}
iex(26)> state_data = :sys.get_state(via)
"ラーメン"
iex(27)> pid=GenServer.whereis(via)
#PID<0.410.0>
iex(28)> Process.alive?(pid)
true

iex(35)> via=Listener.via_tuple("宮沢りえ")
{:via, Registry, {Registry.Listener, "宮沢りえ"}}
iex(36)> state_data = :sys.get_state(via)
"宮沢りえ"
iex(37)> pid=GenServer.whereis(via)
#PID<0.12979.0>
iex(38)> Process.alive?(pid)
true

iex(47)> via=Listener.via_tuple("森友")
{:via, Registry, {Registry.Listener, "森友"}}
iex(48)> pid=GenServer.whereis(via)
nil

2.ListenTweets Applicationの修正

 これから構築していく、ListenTweets ApplicationのSupervisor Treeを以下に示します。

ListenTweets
                                   |- Listener("ラーメン")
Supervisor --- ListenerSupervisor -|- Listener("宮沢りえ")
                                   |- Listener("森友")

 まずapplication.exを変更します。子プロセスとしてRegistryとListenTweets.ListenerSupervisorを起動するようにします。Registryは動的に起動するプロセスを、pidで管理するのではなく、nameで管理するために使われます。DNSのような役割です。これによって"ラーメン"で起動したプロセスのpidをいちいち記録する必要はなくなり、"ラーメン"という名前でプロセスを停止できるようになります。

 ListenTweets.ListenerSupervisorはListenTweets.Listenerを専用に管理するSupervisorです。ここでは動的に子プロセスを起動していくのでDynamicSupervisorというものを使います。ちょっと前まではstrategy: :simple_one_for_oneを使うことによって動的プロセスの起動を行っていたようですが、最近はDynamicSupervisorを使うようになっているようです。

lib/listen_tweets/application.ex
defmodule ListenTweets.Application do
  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      {Registry, keys: :unique, name: Registry.Listener}, #追加
      ListenTweets.ListenerSupervisor, #Listenerから変更
    ]

    opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

 以下にListenTweets.ListenerSupervisorの実装を示します。

 先頭にuse DynamicSupervisor行をおいています。

 start_link/1で起動されます。init/1がcallbackとなっています。strategy: :one_for_oneを指定していることに注意してください。ここまではListenTweets Application起動時に処理が行われます。

 その後、任意のタイミングで、start_listener(key)を呼ぶことによって、Listenerをスタートします。引数keyを渡していることに注意してください。このListenerプロセスはkeyというnameで管理することになります。pid_from_key/1関数が、与えられたkeyからpidを返してくれます。ですからstop_listener(key)でkeyを元に、該当するプロセスを停止することが可能になります。

lib/listen_tweets/listener_supervisor.ex
defmodule ListenTweets.ListenerSupervisor do
  use DynamicSupervisor

  alias ListenTweets.Listener

  def start_link(_options), do:
    DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)

  def start_listener(key) do
    spec = {Listener, key}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def stop_listener(key) do
    Supervisor.terminate_child(__MODULE__, pid_from_key(key))
  end

  def init(:no_args), do:
   DynamicSupervisor.init(strategy: :one_for_one)

  defp pid_from_key(key) do
    key
    |> Listener.via_tuple()
    |> GenServer.whereis()
  end
end

 前回書いたListenTweets.Listenerを少し変更する必要があります。以下の4点です。

(1)child_specを変更します。
 ListenTweets.ListenerSupervisorによって正しく再起動されるように、restart: :transient を指定します。これによりクラッシュしたときのみ再起動されるようになります。

(2)Registryのためにvia_tuple/3を定義します。
 今回の使われ方が定番のようです。詳細はgoogleで。

(3)start_listenは無くしてinitですべて処理するようにしました。
 ListenTweets.ListenerSupervisorのstart_listener/1ができたので、 ListenTweets.Listenerでタイミングを取る必要はなくなりました。

(4)user_name=keyでjoinするように変更しました
 これに伴ってget_channel(socket)からget_channel(socket,key)二変更しました。

lib/listen_tweets/listener.ex
defmodule ListenTweets.Listener do
  use GenServer, restart: :transient

  #公開関数 start_link - プロセスの起動
  def start_link(key) do
    GenServer.start_link(__MODULE__, key, name: via_tuple(key)) #via_tupleを使う
  end

  # via_tuple
  def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}


  # start_linkのcallback 
  def init(key) do
    channel =
        get_pid()
        |> get_socket()
        |> get_channel(key)

    if channel == :error do
      IO.puts("error!!!")
    else
       PhoenixChannelClient.join(channel)
       pid = spawn(fn ->
           stream = ExTwitter.stream_filter(track: key)
           for tweet <- stream do
             IO.puts tweet.text
             PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text})
           end
       end)
    end
    { :ok, key }
  end

  # 未使用
  def handle_cast({ :update, new_key }, _current_key) do
    { :noreply, new_key }
  end

  # 未使用
  def handle_info(mess, key) do
    IO.inspect mess
    { :noreply, key }
  end

  defp get_pid do
    res_pid = PhoenixChannelClient.start_link()
    case res_pid do
      {:ok, pid} -> pid
      _ -> :error
    end
  end

  defp get_socket (:error) do
    :error
  end
  defp get_socket (pid) do
     res_socket = PhoenixChannelClient.connect(pid,
        host: "localhost",
        port: 4000,
        path: "/socket/websocket",
        secure: false,
        heartbeat_interval: 30_000)
      case res_socket do
        {:ok, socket} -> socket
        _ -> :error
      end
  end

  defp get_channel(:error, _key) do
    :error
  end
  defp get_channel(socket,key) do
    channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
    channel
  end
end

3.ReactChat Application側の修正

 今の状態でReactChat Applicationから、ListenTweets Applicationの拡張が正しく利用できることを確認します。これは簡単です。以下のコマンドで確認してください。

cd react_chat
mix deps.get
iex -S mix phx.server

iex> :application.which_applications
iex>alias ListenTweets.ListenerSupervisor
iex>ListenerSupervisor.start_listener("ラーメン") 
iex>ListenerSupervisor.start_listener("宮沢りえ") 
iex>ListenerSupervisor.stop_listener("ラーメン") 
iex>ListenerSupervisor.stop_listener("宮沢りえ") 

 ListenTweets.ListenerSupervisorにstart_listener/1とstop_listener/1の機能がついたので、それを使うためのReact UIを変更し、channel moduleも拡張します。

 以下のchannel moduleは、start_listener/1とstop_listener/1呼ぶためのAPIを拡張しただけです。new_keyとdel_keyのメッセージを加えました。

lib/react_chat_web/channels/room_channel.ex
defmodule ReactChatWeb.RoomChannel do
#
  alias ListenTweets.ListenerSupervisor
#
  def handle_in("new_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.start_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_in("del_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.stop_listener(msg)
    {:reply, :ok, socket}
  end
#

 次にReact UIでこの拡張に対応するためボタンを2つ増やします。

assets/js/Chat.js
class Chat extends React.Component {
  constructor() {
    super();
    this.state = {
#
      inputKey: "",
      inputKey0: "",
#
    }
  }
#
  handleInputKey(event) {
    this.setState({
      inputKey: event.target.value
    })
  }

  handleInputKey0(event) {
    this.setState({
      inputKey0: event.target.value
    })
  }
#
  handleKey(event) {
    event.preventDefault();
    this.channel.push("new_key", {msg: this.state.inputKey})
    this.setState({ inputKey: "" })
  }

  handleKey0(event) {
    event.preventDefault();
    this.channel.push("del_key", {msg: this.state.inputKey0})
    this.setState({ inputKey0: "" })
  }
#
           <Paper  style={style1}>
             <form onSubmit={this.handleKey.bind(this)}>
               <label>キーワード登録</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey} onChange
= {this.handleInputKey.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleKey0.bind(this)}>
               <label>キーワード削除</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
#

以上で終わりです。

4.修正ソースコードの全リスト

 今回修正したソースの全リストを掲載します。

lib/listen_tweets/application.ex
defmodule ListenTweets.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      {Registry, keys: :unique, name: Registry.Listener}, #追加
      ListenTweets.ListenerSupervisor, #Listenerから変更
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
lib/listen_tweets/listener_supervisor.ex
defmodule ListenTweets.ListenerSupervisor do
  use DynamicSupervisor

  alias ListenTweets.Listener

  def start_link(_options), do:
    DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)

  def start_listener(key) do
    spec = {Listener, key}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def stop_listener(key) do
    Supervisor.terminate_child(__MODULE__, pid_from_key(key))
  end

  def init(:no_args), do:
   DynamicSupervisor.init(strategy: :one_for_one)

  defp pid_from_key(key) do
    key
    |> Listener.via_tuple()
    |> GenServer.whereis()
  end
end
lib/listen_tweets/listener.ex
defmodule ListenTweets.Listener do
  use GenServer, restart: :transient  #child_spec

  def start_link(key) do
    GenServer.start_link(__MODULE__, key, name: via_tuple(key))
  end


  # via_tuple
  def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}


  def init(key) do
    channel =
        get_pid()
        |> get_socket()
        |> get_channel(key)

    if channel == :error do
      IO.puts("error!!!")
    else
       PhoenixChannelClient.join(channel)
       pid = spawn(fn ->
           stream = ExTwitter.stream_filter(track: key)
           for tweet <- stream do
             IO.puts tweet.text
             PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text})
           end
       end)
    end
    { :ok, key }
  end

  def handle_cast({ :update, new_key }, _current_key) do
    { :noreply, new_key }
  end

  def handle_info(mess, key) do
    IO.inspect mess
    { :noreply, key }
  end


  defp get_pid do
    res_pid = PhoenixChannelClient.start_link()
    case res_pid do
      {:ok, pid} -> pid
      _ -> :error
    end
  end

  defp get_socket (:error) do
    :error
  end
  defp get_socket (pid) do
     res_socket = PhoenixChannelClient.connect(pid,
        host: "localhost",
        port: 4000,
        path: "/socket/websocket",
        secure: false,
        heartbeat_interval: 30_000)
      case res_socket do
        {:ok, socket} -> socket
        _ -> :error
      end
  end

  defp get_channel(:error, _key) do
    :error
  end
  defp get_channel(socket, key) do
    channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
    channel
  end
end
lib/react_chat_web/channels/room_channel.ex
defmodule ReactChatWeb.RoomChannel do
  use ReactChatWeb, :channel
  alias ReactChatWeb.Presence
  alias ListenTweets.ListenerSupervisor

  def join("room:lobby", %{"user_name" => user_name}, socket) do
    send(self(), {:after_join, user_name})
    {:ok, socket}
  end


  def handle_in("new_msg", %{"msg" => msg}, socket) do
    user_name = socket.assigns[:user_name]
    broadcast(socket, "new_msg", %{msg: msg, user_name: user_name})
    {:reply, :ok, socket}
  end

  def handle_in("new_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.start_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_in("del_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.stop_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_info({:after_join, user_name}, socket) do
    push(socket, "presence_state", Presence.list(socket))
    {:ok, _ref} = Presence.track(socket, user_name, %{online_at: now()})
    {:noreply, assign(socket, :user_name, user_name)}
  end

  def terminate(_reason, socket) do
    {:noreply, socket}
  end

  defp now do
    System.system_time(:seconds)
  end
end
assets/js/Chat.js
import React from "react";
import {Socket, Presence} from "phoenix"
import RaisedButton from 'material-ui/RaisedButton';
import Paper from 'material-ui/Paper';
import Divider from 'material-ui/Divider';
import TextField from 'material-ui/TextField';

class Chat extends React.Component {
  constructor() {
    super();
    this.state = {
      isJoined: false,
      inputUser: "",
      inputMessage: "",
      inputKey: "",
      inputKey0: "",
      messages: [],
      presences: {}
    }
  }

  handleInputUser(event) {
    this.setState({
      inputUser: event.target.value
    })
  }

  handleInputMessage(event) {
    this.setState({
      inputMessage: event.target.value
    })
  }

  handleInputKey(event) {
    this.setState({
      inputKey: event.target.value
    })
  }

  handleInputKey0(event) {
    this.setState({
      inputKey0: event.target.value
    })
  }

  handleJoin(event) {
    event.preventDefault();
    if(this.state.inputUser!="") {

        // assets/js/socket.jsのデフォルトの定義と同じ
        this.socket = new Socket("/socket", {params:
          {token: window.userToken}
        });
        this.socket.connect();

        this.channel = this.socket.channel("room:lobby",  {user_name: this.state.inputUser});

        // 現在のサーバの状態を初期状態として設定
        this.channel.on('presence_state', state => {
          let presences = this.state.presences;
          presences = Presence.syncState(presences, state);
          this.setState({ presences: presences })
          console.log('state', presences);
        });
        // 初期状態からの差分を更新していく
        this.channel.on('presence_diff', diff => {
          let presences = this.state.presences;
          presences = Presence.syncDiff(presences, diff);
          this.setState({ presences: presences })
          console.log('diff', presences);
        });

        this.channel.on("new_msg", payload => {
          let messages = this.state.messages;
          messages.push(payload)
          this.setState({ messages: messages })
        })

        this.channel.join()
          .receive("ok", response => { console.log("Joined successfully", response) })
          .receive('error', resp => { console.log('Unable to join', resp); });

        this.setState({ isJoined: true })
    }
  }

  handleLeave(event) {
    event.preventDefault();
    this.socket.disconnect();
    this.setState({ isJoined: false })
  }


  handleSubmit(event) {
    event.preventDefault();
    this.channel.push("new_msg", {msg: this.state.inputMessage})
    this.setState({ inputMessage: "" })
  }

  handleKey(event) {
    event.preventDefault();
    this.channel.push("new_key", {msg: this.state.inputKey})
    this.setState({ inputKey: "" })
  }

  handleKey0(event) {
    event.preventDefault();
    this.channel.push("del_key", {msg: this.state.inputKey0})
    this.setState({ inputKey0: "" })
  }

  render() {
    const style1 = { margin: '16px 32px 16px 16px', padding: '10px 32px 10px 26px',};
    const style2 = { display: 'inline-block', margin: '1px 8px 1px 4px',};

    let messages = this.state.messages.map((message, index) => {
        return (
            <div key={index}>
              <p><strong>{message.user_name}</strong> > {message.msg}</p>
            </div>
        )
    });
    messages = messages.reverse();

    let presences = [];
    Presence.list(this.state.presences, (name, metas) => {
        presences.push(name);
    });
    let presences_list = presences.map( (user_name, index) =>
      <li key={index} style={style2}>{user_name}</li>
    );

    let form_jsx;
    if(this.state.isJoined===false) {
       form_jsx = (
        <form onSubmit={this.handleJoin.bind(this)} >
          <label>ユーザ名を指定してJoin</label>&nbsp;&nbsp;&nbsp;&nbsp;
          <TextField hintText="ユーザ名" value = {this.state.inputUser} onChange = {this.handleInputUser.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
          <RaisedButton type="submit" primary={true} label="Join" />
        </form>
       );
    } else {
       form_jsx = (
         <div>
           <Paper  style={style1}>
             <label>参加者 : {this.state.inputUser}</label>
             <ul>
               {presences_list}
             </ul>
             <div align="right">
               <form onSubmit={this.handleLeave.bind(this)} >
                 <RaisedButton type="submit" primary={true} label="Leave" />
               </form>
             </div>
           </Paper>
           <Paper  style={style1}>
             <form onSubmit={this.handleKey.bind(this)}>
               <label>キーワード登録</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey} onChange = {this.handleInputKey.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleKey0.bind(this)}>
               <label>キーワード削除</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleSubmit.bind(this)}>
               <label>チャット</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="Chat Text" value = {this.state.inputMessage} onChange = {this.handleInputMessage.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <Divider />
             <br />
             <div>
               {messages}
             </div>
           </Paper>
         </div>
      );
    }
    return (
      <div>
        {form_jsx}
      </div>
    )
  }
}
export default Chat
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1