PHP Promiseで複数のソケットを同時に捌く


PHP Promise とは :question:

半分宣伝になってしまいますが、 PHP Promiseはマルチスレッディングを用いて動作する pthreads のラッパーライブラリです :cheese:

主に、JavaScriptの Promise ライクに動作することを目指しており、現在も開発が続けられています :airplane:

今後は async/await の実装や、CLIベースではない環境でも動作するミドルウェアの開発などもする予定です :upside_down:


さっそく ソケットを捌いてみる :stars:

Dockerで軽く pthreads を扱える環境を用意します。

FROM centos:7

# Setup
RUN yum -y install epel-release wget
RUN cd /tmp && wget http://jp2.php.net/get/php-7.2.7.tar.gz/from/this/mirror -O php-7.2 && tar zxvf php-7.2

# Dependencies installation
RUN yum -y install git gcc gcc-c++ make libxml2-devel libicu-devel openssl-devel

# PHP installation
RUN cd /tmp/php-7.2.7 && \
./configure --enable-maintainer-zts --enable-pcntl --enable-intl --enable-zip --enable-pdo --enable-sockets --with-openssl && \
make && \
make install

# pthreads installation
RUN yum -y install autoconf
RUN cd /tmp && git clone https://github.com/krakjoe/pthreads.git && cd pthreads && \
phpize && \
./configure && \
make && \
make install

# Add an extension to php.ini
RUN echo extension=pthreads.so >> /usr/local/lib/php.ini

composerでライブラリをインストールします。

$ composer require php-promise/promise:dev-master

下記のようなコードを書いてみます。

require_once __DIR__ . '/vendor/autoload.php';

use Promise\Exceptions\PromiseException;
use Promise\Processors\Rejecter;
use Promise\Processors\Resolver;
use Promise\Promise;

$port = 8000;

$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

// 待機状態にしたい!
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);
socket_bind($socket, 0, $port);
socket_listen($socket);

$clients = [$socket];

while (true) {
$read = $write = $except = $clients;
if (!socket_select($read, $write, $except, 0)) {
continue;
}
try {
// Promise のインスタンスを作り、実行する
new Promise(function (Resolver $resolve, Rejecter $reject, $socket) {
$waiting = mt_rand(1, 6);
printf("Wait {$waiting}s \n");
$client = socket_accept($socket);
socket_write($client, "Hello World\n");
sleep($waiting);
}, $socket);
} catch (PromiseException $e) {

}
}

curlでマルチスレッディングになっているかテストしてみます。

$ curl -XGET localhost:8000 & curl -XGET localhost:8000 & curl -XGET localhost:8000

上記のように叩いた後、サーバー側は

Wait 2s

Wait 1s
Wait 4s

クライアント側は

[1] 11379

[2] 11380
Hello World
Hello World
Hello World

上記のようになっており、同時に処理されていることがわかります。


PHP Promiseの他の提供している機能について :metal:

Promiseには Promise::all($promises) が用意されており、複数のタスクを並列処理にて処理可能です。


$promises = [
new Promise(function (Resolver $resolve, Rejecter $reject) { sleep(3); }),
new Promise(function (Resolver $resolve, Rejecter $reject) { sleep(5); }),
new Promise(function (Resolver $resolve, Rejecter $reject) { sleep(2); }),
new Promise(function (Resolver $resolve, Rejecter $reject) { sleep(1); }),
new Promise(function (Resolver $resolve, Rejecter $reject) { sleep(4); }),
];

Promise::all($promises)->then(function () {
echo "5秒経ちました。\n";
});

すべて同時に並列処理として扱われるので上記だと、5秒待っている間にすべてのタスクを実行することが可能です。