Elixir
Phoenix
channel
DynamicSupervisor

Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編

Phoenix Channelで作る最先端Webアプリ - topic-subtopic編 - Qiita
Phoenix Channelで作る最先端Webアプリ - ETS編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Fault Tolerance編 - Qiita
Phoenix Channelで作る最先端Webアプリ - 地図(Geo)拡張編 - Qiita
Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Reactチャット編 - Qiita

 今回の技術的なポイントは以下の通りです。

ポイント
1.DynamicSupervisorでElixir Applicationを動的に複数立ち上げる
2.React画面から入力したキーワードで、Elixir ApplicationはTweetsをリッスンする

 今回は前回のPhoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiitaを改善していきます。

1.DynamicSupervisorで実現しようとしているもの

 DynamicSupervisorで実現することは、ListenTweets.Listenerプロセスを動的に起動できるようにすることです。今まではListenTweets Applicationを起動するときに、ListenTweets.Listenerプロセスも起動していました。起動時のkey(引数)も固定で、プロセスも一個しか起動できませんでした。これを改善します。

 以下にListenTweets Applicationを修正します。関数ListenTweets.Listenerをプロセスとして動的に起動するようにします。ListenTweets.Listenerプロセスは複数個、同時に起動可能です。それぞれが別のkey("ラーメン"とか)を起動時に与えられて、そのkeyでTwitterをリッスンします。またchannelにjoinするときのuser_nameもkeyとなります。

 つまり、"ラーメン"というkeyでスタートされたListenTweets.Listenerは、"ラーメン"という名前でチャットに参加しつつ、"ラーメン"でTwitterを監視し、Tweetsを見つけたらチャットに書き込みます。同時に"宮沢りえ"でListenTweets.Listenerを起動することによって、"宮沢りえ"のTweetsもチャットに書き込まれます。

 複数のブラウザ(React)同士のChannelチャットにElixirクライアント(Listener)を追加します。上での説明を図にすると以下のようになります。これらの構成要素はレガシーなHTTPではなく、基本的にWeb socketで通信を行っていることに注目してください。

Channel-Client-Server
React ---|                           |- Listener("ラーメン")
         |--Phoenix Channel module --|- Listener("宮沢りえ")
React ---|                           |- Listener("森友")

(注意)Listenerは理論的に複数個可能ですが、実際にはTwitter Stream APIの制限で、2個までしか確認できませんでした。3個以上になるとStreamが機能しなくなりました。私のVM環境の制限かTwitterライブラリの制限か、Twitter APIの制限か、わかりません。しかし今回の記事の趣旨にはあまり関係はないです。

以下に実際の動作画面を示します。

image.png

 またこの画像にように「ラーメン」と「宮沢りえ」の2個のListenerプロセスを起動しているときに、以下のようにiexシェル(phoenixを起動しているiex)からプロセスの稼働状況を確認できます。最後の「森友」はRegistry(後で説明)に登録されていないので、pidの問い合わせの段階でnilを返されます。
 :sys.get_stateはGenServerの状態(state)を返してくれる関数です。Listenerプロセスのstateはkeyの文字列をそのまま入れて置いたことを思い出してください。

iex(21)> alias ListenTweets.{Listener,ListenerSupervisor}
[ListenTweets.Listener, ListenTweets.ListenerSupervisor]
iex(22)> Supervisor.count_children(ListenerSupervisor)
%{active: 2, specs: 2, supervisors: 0, workers: 2}

iex(25)> via=Listener.via_tuple("ラーメン")
{:via, Registry, {Registry.Listener, "ラーメン"}}
iex(26)> state_data = :sys.get_state(via)
"ラーメン"
iex(27)> pid=GenServer.whereis(via)
#PID<0.410.0>
iex(28)> Process.alive?(pid)
true

iex(35)> via=Listener.via_tuple("宮沢りえ")
{:via, Registry, {Registry.Listener, "宮沢りえ"}}
iex(36)> state_data = :sys.get_state(via)
"宮沢りえ"
iex(37)> pid=GenServer.whereis(via)
#PID<0.12979.0>
iex(38)> Process.alive?(pid)
true

iex(47)> via=Listener.via_tuple("森友")
{:via, Registry, {Registry.Listener, "森友"}}
iex(48)> pid=GenServer.whereis(via)
nil

2.ListenTweets Applicationの修正

 これから構築していく、ListenTweets ApplicationのSupervisor Treeを以下に示します。

ListenTweets
                                   |- Listener("ラーメン")
Supervisor --- ListenerSupervisor -|- Listener("宮沢りえ")
                                   |- Listener("森友")

 まずapplication.exを変更します。子プロセスとしてRegistryとListenTweets.ListenerSupervisorを起動するようにします。Registryは動的に起動するプロセスを、pidで管理するのではなく、nameで管理するために使われます。DNSのような役割です。これによって"ラーメン"で起動したプロセスのpidをいちいち記録する必要はなくなり、"ラーメン"という名前でプロセスを停止できるようになります。

 ListenTweets.ListenerSupervisorはListenTweets.Listenerを専用に管理するSupervisorです。ここでは動的に子プロセスを起動していくのでDynamicSupervisorというものを使います。ちょっと前まではstrategy: :simple_one_for_oneを使うことによって動的プロセスの起動を行っていたようですが、最近はDynamicSupervisorを使うようになっているようです。

lib/listen_tweets/application.ex
defmodule ListenTweets.Application do
  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      {Registry, keys: :unique, name: Registry.Listener}, #追加
      ListenTweets.ListenerSupervisor, #Listenerから変更
    ]

    opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

 以下にListenTweets.ListenerSupervisorの実装を示します。

 先頭にuse DynamicSupervisor行をおいています。

 start_link/1で起動されます。init/1がcallbackとなっています。strategy: :one_for_oneを指定していることに注意してください。ここまではListenTweets Application起動時に処理が行われます。

 その後、任意のタイミングで、start_listener(key)を呼ぶことによって、Listenerをスタートします。引数keyを渡していることに注意してください。このListenerプロセスはkeyというnameで管理することになります。pid_from_key/1関数が、与えられたkeyからpidを返してくれます。ですからstop_listener(key)でkeyを元に、該当するプロセスを停止することが可能になります。

lib/listen_tweets/listener_supervisor.ex
defmodule ListenTweets.ListenerSupervisor do
  use DynamicSupervisor

  alias ListenTweets.Listener

  def start_link(_options), do:
    DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)

  def start_listener(key) do
    spec = {Listener, key}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def stop_listener(key) do
    Supervisor.terminate_child(__MODULE__, pid_from_key(key))
  end

  def init(:no_args), do:
   DynamicSupervisor.init(strategy: :one_for_one)

  defp pid_from_key(key) do
    key
    |> Listener.via_tuple()
    |> GenServer.whereis()
  end
end

 前回書いたListenTweets.Listenerを少し変更する必要があります。以下の4点です。

(1)child_specを変更します。
 ListenTweets.ListenerSupervisorによって正しく再起動されるように、restart: :transient を指定します。これによりクラッシュしたときのみ再起動されるようになります。

(2)Registryのためにvia_tuple/3を定義します。
 今回の使われ方が定番のようです。詳細はgoogleで。

(3)start_listenは無くしてinitですべて処理するようにしました。
 ListenTweets.ListenerSupervisorのstart_listener/1ができたので、 ListenTweets.Listenerでタイミングを取る必要はなくなりました。

(4)user_name=keyでjoinするように変更しました
 これに伴ってget_channel(socket)からget_channel(socket,key)二変更しました。

lib/listen_tweets/listener.ex
defmodule ListenTweets.Listener do
  use GenServer, restart: :transient

  #公開関数 start_link - プロセスの起動
  def start_link(key) do
    GenServer.start_link(__MODULE__, key, name: via_tuple(key)) #via_tupleを使う
  end

  # via_tuple
  def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}


  # start_linkのcallback 
  def init(key) do
    channel =
        get_pid()
        |> get_socket()
        |> get_channel(key)

    if channel == :error do
      IO.puts("error!!!")
    else
       PhoenixChannelClient.join(channel)
       pid = spawn(fn ->
           stream = ExTwitter.stream_filter(track: key)
           for tweet <- stream do
             IO.puts tweet.text
             PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text})
           end
       end)
    end
    { :ok, key }
  end

  # 未使用
  def handle_cast({ :update, new_key }, _current_key) do
    { :noreply, new_key }
  end

  # 未使用
  def handle_info(mess, key) do
    IO.inspect mess
    { :noreply, key }
  end

  defp get_pid do
    res_pid = PhoenixChannelClient.start_link()
    case res_pid do
      {:ok, pid} -> pid
      _ -> :error
    end
  end

  defp get_socket (:error) do
    :error
  end
  defp get_socket (pid) do
     res_socket = PhoenixChannelClient.connect(pid,
        host: "localhost",
        port: 4000,
        path: "/socket/websocket",
        secure: false,
        heartbeat_interval: 30_000)
      case res_socket do
        {:ok, socket} -> socket
        _ -> :error
      end
  end

  defp get_channel(:error, _key) do
    :error
  end
  defp get_channel(socket,key) do
    channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
    channel
  end
end

3.ReactChat Application側の修正

 今の状態でReactChat Applicationから、ListenTweets Applicationの拡張が正しく利用できることを確認します。これは簡単です。以下のコマンドで確認してください。

cd react_chat
mix deps.get
iex -S mix phx.server

iex> :application.which_applications
iex>alias ListenTweets.ListenerSupervisor
iex>ListenerSupervisor.start_listener("ラーメン") 
iex>ListenerSupervisor.start_listener("宮沢りえ") 
iex>ListenerSupervisor.stop_listener("ラーメン") 
iex>ListenerSupervisor.stop_listener("宮沢りえ") 

 ListenTweets.ListenerSupervisorにstart_listener/1とstop_listener/1の機能がついたので、それを使うためのReact UIを変更し、channel moduleも拡張します。

 以下のchannel moduleは、start_listener/1とstop_listener/1呼ぶためのAPIを拡張しただけです。new_keyとdel_keyのメッセージを加えました。

lib/react_chat_web/channels/room_channel.ex
defmodule ReactChatWeb.RoomChannel do
#
  alias ListenTweets.ListenerSupervisor
#
  def handle_in("new_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.start_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_in("del_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.stop_listener(msg)
    {:reply, :ok, socket}
  end
#

 次にReact UIでこの拡張に対応するためボタンを2つ増やします。

assets/js/Chat.js
class Chat extends React.Component {
  constructor() {
    super();
    this.state = {
#
      inputKey: "",
      inputKey0: "",
#
    }
  }
#
  handleInputKey(event) {
    this.setState({
      inputKey: event.target.value
    })
  }

  handleInputKey0(event) {
    this.setState({
      inputKey0: event.target.value
    })
  }
#
  handleKey(event) {
    event.preventDefault();
    this.channel.push("new_key", {msg: this.state.inputKey})
    this.setState({ inputKey: "" })
  }

  handleKey0(event) {
    event.preventDefault();
    this.channel.push("del_key", {msg: this.state.inputKey0})
    this.setState({ inputKey0: "" })
  }
#
           <Paper  style={style1}>
             <form onSubmit={this.handleKey.bind(this)}>
               <label>キーワード登録</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey} onChange
= {this.handleInputKey.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleKey0.bind(this)}>
               <label>キーワード削除</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
#

以上で終わりです。

4.修正ソースコードの全リスト

 今回修正したソースの全リストを掲載します。

lib/listen_tweets/application.ex
defmodule ListenTweets.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      {Registry, keys: :unique, name: Registry.Listener}, #追加
      ListenTweets.ListenerSupervisor, #Listenerから変更
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
lib/listen_tweets/listener_supervisor.ex
defmodule ListenTweets.ListenerSupervisor do
  use DynamicSupervisor

  alias ListenTweets.Listener

  def start_link(_options), do:
    DynamicSupervisor.start_link(__MODULE__, :no_args, name: __MODULE__)

  def start_listener(key) do
    spec = {Listener, key}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  def stop_listener(key) do
    Supervisor.terminate_child(__MODULE__, pid_from_key(key))
  end

  def init(:no_args), do:
   DynamicSupervisor.init(strategy: :one_for_one)

  defp pid_from_key(key) do
    key
    |> Listener.via_tuple()
    |> GenServer.whereis()
  end
end
lib/listen_tweets/listener.ex
defmodule ListenTweets.Listener do
  use GenServer, restart: :transient  #child_spec

  def start_link(key) do
    GenServer.start_link(__MODULE__, key, name: via_tuple(key))
  end


  # via_tuple
  def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}


  def init(key) do
    channel =
        get_pid()
        |> get_socket()
        |> get_channel(key)

    if channel == :error do
      IO.puts("error!!!")
    else
       PhoenixChannelClient.join(channel)
       pid = spawn(fn ->
           stream = ExTwitter.stream_filter(track: key)
           for tweet <- stream do
             IO.puts tweet.text
             PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text})
           end
       end)
    end
    { :ok, key }
  end

  def handle_cast({ :update, new_key }, _current_key) do
    { :noreply, new_key }
  end

  def handle_info(mess, key) do
    IO.inspect mess
    { :noreply, key }
  end


  defp get_pid do
    res_pid = PhoenixChannelClient.start_link()
    case res_pid do
      {:ok, pid} -> pid
      _ -> :error
    end
  end

  defp get_socket (:error) do
    :error
  end
  defp get_socket (pid) do
     res_socket = PhoenixChannelClient.connect(pid,
        host: "localhost",
        port: 4000,
        path: "/socket/websocket",
        secure: false,
        heartbeat_interval: 30_000)
      case res_socket do
        {:ok, socket} -> socket
        _ -> :error
      end
  end

  defp get_channel(:error, _key) do
    :error
  end
  defp get_channel(socket, key) do
    channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
    channel
  end
end
lib/react_chat_web/channels/room_channel.ex
defmodule ReactChatWeb.RoomChannel do
  use ReactChatWeb, :channel
  alias ReactChatWeb.Presence
  alias ListenTweets.ListenerSupervisor

  def join("room:lobby", %{"user_name" => user_name}, socket) do
    send(self(), {:after_join, user_name})
    {:ok, socket}
  end


  def handle_in("new_msg", %{"msg" => msg}, socket) do
    user_name = socket.assigns[:user_name]
    broadcast(socket, "new_msg", %{msg: msg, user_name: user_name})
    {:reply, :ok, socket}
  end

  def handle_in("new_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.start_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_in("del_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.stop_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_info({:after_join, user_name}, socket) do
    push(socket, "presence_state", Presence.list(socket))
    {:ok, _ref} = Presence.track(socket, user_name, %{online_at: now()})
    {:noreply, assign(socket, :user_name, user_name)}
  end

  def terminate(_reason, socket) do
    {:noreply, socket}
  end

  defp now do
    System.system_time(:seconds)
  end
end
assets/js/Chat.js
import React from "react";
import {Socket, Presence} from "phoenix"
import RaisedButton from 'material-ui/RaisedButton';
import Paper from 'material-ui/Paper';
import Divider from 'material-ui/Divider';
import TextField from 'material-ui/TextField';

class Chat extends React.Component {
  constructor() {
    super();
    this.state = {
      isJoined: false,
      inputUser: "",
      inputMessage: "",
      inputKey: "",
      inputKey0: "",
      messages: [],
      presences: {}
    }
  }

  handleInputUser(event) {
    this.setState({
      inputUser: event.target.value
    })
  }

  handleInputMessage(event) {
    this.setState({
      inputMessage: event.target.value
    })
  }

  handleInputKey(event) {
    this.setState({
      inputKey: event.target.value
    })
  }

  handleInputKey0(event) {
    this.setState({
      inputKey0: event.target.value
    })
  }

  handleJoin(event) {
    event.preventDefault();
    if(this.state.inputUser!="") {

        // assets/js/socket.jsのデフォルトの定義と同じ
        this.socket = new Socket("/socket", {params:
          {token: window.userToken}
        });
        this.socket.connect();

        this.channel = this.socket.channel("room:lobby",  {user_name: this.state.inputUser});

        // 現在のサーバの状態を初期状態として設定
        this.channel.on('presence_state', state => {
          let presences = this.state.presences;
          presences = Presence.syncState(presences, state);
          this.setState({ presences: presences })
          console.log('state', presences);
        });
        // 初期状態からの差分を更新していく
        this.channel.on('presence_diff', diff => {
          let presences = this.state.presences;
          presences = Presence.syncDiff(presences, diff);
          this.setState({ presences: presences })
          console.log('diff', presences);
        });

        this.channel.on("new_msg", payload => {
          let messages = this.state.messages;
          messages.push(payload)
          this.setState({ messages: messages })
        })

        this.channel.join()
          .receive("ok", response => { console.log("Joined successfully", response) })
          .receive('error', resp => { console.log('Unable to join', resp); });

        this.setState({ isJoined: true })
    }
  }

  handleLeave(event) {
    event.preventDefault();
    this.socket.disconnect();
    this.setState({ isJoined: false })
  }


  handleSubmit(event) {
    event.preventDefault();
    this.channel.push("new_msg", {msg: this.state.inputMessage})
    this.setState({ inputMessage: "" })
  }

  handleKey(event) {
    event.preventDefault();
    this.channel.push("new_key", {msg: this.state.inputKey})
    this.setState({ inputKey: "" })
  }

  handleKey0(event) {
    event.preventDefault();
    this.channel.push("del_key", {msg: this.state.inputKey0})
    this.setState({ inputKey0: "" })
  }

  render() {
    const style1 = { margin: '16px 32px 16px 16px', padding: '10px 32px 10px 26px',};
    const style2 = { display: 'inline-block', margin: '1px 8px 1px 4px',};

    let messages = this.state.messages.map((message, index) => {
        return (
            <div key={index}>
              <p><strong>{message.user_name}</strong> > {message.msg}</p>
            </div>
        )
    });
    messages = messages.reverse();

    let presences = [];
    Presence.list(this.state.presences, (name, metas) => {
        presences.push(name);
    });
    let presences_list = presences.map( (user_name, index) =>
      <li key={index} style={style2}>{user_name}</li>
    );

    let form_jsx;
    if(this.state.isJoined===false) {
       form_jsx = (
        <form onSubmit={this.handleJoin.bind(this)} >
          <label>ユーザ名を指定してJoin</label>&nbsp;&nbsp;&nbsp;&nbsp;
          <TextField hintText="ユーザ名" value = {this.state.inputUser} onChange = {this.handleInputUser.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
          <RaisedButton type="submit" primary={true} label="Join" />
        </form>
       );
    } else {
       form_jsx = (
         <div>
           <Paper  style={style1}>
             <label>参加者 : {this.state.inputUser}</label>
             <ul>
               {presences_list}
             </ul>
             <div align="right">
               <form onSubmit={this.handleLeave.bind(this)} >
                 <RaisedButton type="submit" primary={true} label="Leave" />
               </form>
             </div>
           </Paper>
           <Paper  style={style1}>
             <form onSubmit={this.handleKey.bind(this)}>
               <label>キーワード登録</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey} onChange = {this.handleInputKey.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleKey0.bind(this)}>
               <label>キーワード削除</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleSubmit.bind(this)}>
               <label>チャット</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="Chat Text" value = {this.state.inputMessage} onChange = {this.handleInputMessage.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <Divider />
             <br />
             <div>
               {messages}
             </div>
           </Paper>
         </div>
      );
    }
    return (
      <div>
        {form_jsx}
      </div>
    )
  }
}
export default Chat