7
7

More than 5 years have passed since last update.

PHP で Server::Starter を使ってみた

Last updated at Posted at 2014-05-02

さっき Server::Starter というものを知ったので PHP でも使ってみました(そもそも PHP で TCP サーバなんて作らないというツッコミは無しの方向でお願いしますm(__)m)。

Server::Starter をインストール

Server::Starter をインストールします。

自分でも、いつ、どうやって入れたのか覚えていないのですが cpanm というコマンドが使えたので cpanm で入れました。

Test::SharedFork は、理由はわかりませんがこいつがいないと Server::Starter のインストールに失敗したので入れました。

$ cpanm Test::SharedFork
$ cpanm Server::Starter

PHP のコード

次のようになりました。

はじめに書いていたコードは例として適切ではなかったので、シングルプロセス・シングルスレッドで複数の接続を捌くように書き換えました。

このコードでは TERM/INT/HUP シグナルを受けてもすぐには終了せず、listen ソケットだけ閉じてすべての接続が切れるのを待ってから終了するようになっています。

server.php
<?php
(new Server())->main();

class Server
{
    private $pid;
    private $listen;
    private $port;
    private $sockets = [];
    private $buffers = [];

    public function __construct()
    {
        $this->pid = getmypid();
        list ($this->port, $this->listen) = $this->parseServerStarterEnv();
        $this->sockets[(int)$this->listen] = $this->listen;
    }

    private function log($id, $message)
    {
        if ($id === null)
        {
            echo "[$this->pid] $message\n";
        }
        else
        {
            echo "[$this->pid($id)] $message\n";
        }
    }

    private function parseServerStarterEnv()
    {
        $pair = getenv('SERVER_STARTER_PORT');

        if (strlen($pair) === 0)
        {
            throw new \RuntimeException("!!! getenv");
        }

        list ($port, $desc) = explode("=", $pair);

        if (!ctype_digit($port) || !ctype_digit($desc))
        {
            throw new \RuntimeException("!!! env $pair");
        }

        $fp = fopen("php://fd/$desc", "r+");

        if ($fp === false)
        {
            throw new \RuntimeException("!!! fopen");
        }

        $listen = socket_import_stream($fp);

        if ($listen == false)
        {
            throw new \RuntimeException("!!! import");
        }

        return [$port, $listen];
    }

    public function main()
    {
        pcntl_signal(SIGTERM, function () { $this->sigterm(); });
        pcntl_signal(SIGHUP, function () { $this->sigterm(); });
        pcntl_signal(SIGINT, function () { $this->sigterm(); });

        $this->log(null, "listen $this->port");

        while (count($this->sockets))
        {
            $this->process();
            pcntl_signal_dispatch();
        }
    }

    private function sigterm()
    {
        $this->log(null, "close listen socket");

        foreach ($this->sockets as $socket)
        {
            if ($socket === $this->listen)
            {
                $this->close($this->listen);
            }
        }
    }

    private function process()
    {
        list ($r, $w) = $this->select();

        foreach ($w as $socket)
        {
            $this->write($socket);
        }

        foreach ($r as $socket)
        {
            if ($socket === $this->listen)
            {
                $this->accept($socket);
            }
            else
            {
                $this->read($socket);
            }
        }
    }

    private function select()
    {
        $r = $this->sockets;
        $w = [];
        $e = [];

        foreach ($this->buffers as $id => $buf)
        {
            if (strlen($buf))
            {
                $w[$id] = $this->sockets[$id];
            }
        }

        $ret = @socket_select($r, $w, $e, null);

        if ($ret === false)
        {
            $errno = socket_last_error();

            if ($errno !== SOCKET_EINTR)
            {
                $errstr = socket_strerror($errno);
                throw new \RuntimeException("!!! select ... $errstr ($errno)");
            }
            return [[], []];
        }

        return [$r, $w];
    }

    private function accept($socket)
    {
        $peer = socket_accept($socket);

        if ($peer === false)
        {
            throw new \RuntimeException("!!! accept");
        }

        $id = (int)$peer;
        $this->log($id, "accept");

        socket_set_nonblock($peer);

        $this->sockets[$id] = $peer;
        $this->buffers[$id] = '';
    }

    private function write($socket)
    {
        $id = (int)$socket;
        $buf = &$this->buffers[$id];

        $len = socket_write($socket, $buf, strlen($buf));

        if ($len === false)
        {
            $this->log($id, "!!! write");
            $this->close($socket);
        }
        else
        {
            $buf = substr($buf, $len);
        }
    }

    private function read($socket)
    {
        $id = (int)$socket;
        $tmp = socket_read($socket, 4096);

        if ($tmp === false)
        {
            $this->log($id, "!!! read");
            $this->close($socket);
        }

        if (strlen($tmp) === 0)
        {
            $this->log($id, "goodbye");
            $this->close($socket);
        }
        else
        {
            $this->buffers[$id] .= strtoupper($tmp);
            //$this->buffers[$id] .= ucfirst($tmp);

            $tmp = trim($tmp);
            $this->log($id, "recv $tmp");
        }
    }

    private function close($socket)
    {
        $id = (int)$socket;
        socket_close($socket);
        unset($this->sockets[$id]);
        unset($this->buffers[$id]);
    }
}

実験

start_server コマンドで起動します。

ターミナルその1
$ start_server --port 1234 -- php server.php
start_server (pid:2325) starting now...
starting new worker 2326
[2326] listen 1234

別のターミナルを開いて 1234 ポートに接続して "abc" と入力すると大文字になってエコーされます。

ターミナルその2
$ nc localhost 1234
abc
ABC

read メソッドの中のコメントを入れ替えて、先頭の文字だけが大文字になるように改修します。

server.php
            //$this->buffers[$id] .= strtoupper($tmp);
            $this->buffers[$id] .= ucfirst($tmp);

既に起動しているプロセスには影響しないので "xyz" と入力しても全部大文字になります。

ターミナルその2
$ nc localhost 1234
abc
ABC
xyz
XYZ

別のターミナルを開いて Server::Starter に HUP シグナルを送ります。

ターミナルその3
$ kill -HUP 2325

Server::Starter が新しいプロセスを起動し、古いプロセスに TERM シグナルを送ります。

古いプロセスは TERM シグナルを受けてもすぐには終了せず、listen ソケットだけ閉じた後、すべての接続が切れるまで待っています。

ターミナルその1
received HUP (num_old_workers=0)
spawning a new worker (num_old_workers=0)
starting new worker 2390
[2390] listen 1234
new worker is now running, sending TERM to old workers:2326
sleep 0 secs
killing old workers
[2326] close listen socket

改めて 1234 ポートに接続すると新しいプロセスに接続されるため "abc" と入力すると先頭だけ大文字になります(古いプロセスではもう accept していないためです)。

ターミナルその3
$ nc localhost 1234
abc
Abc

はじめに接続していた方も、まだ古いコードのまま生きています。

ターミナルその2
$ nc localhost 1234
abc
ABC
xyz
XYZ
ore
ORE

ターミナルその2で Ctrl+C で接続を切ると古いコードのプロセスは終了します。

ターミナルその2
$ nc localhost 1234
abc
ABC
xyz
XYZ
ore
ORE
^C
$ 
ターミナルその1
old worker 2326 died, status:0
7
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
7