PHP
MySQL
ReactPHP

PHP非同期処理チュートリアル: MySQLのクエリ処理を非同期化する

More than 3 years have passed since last update.

前回、ReactPHPを利用して非同期処理を書くための準備をしました。

今回は、いよいよ実践的な非同期処理を書いていきたいと思います。

で、今回の非同期化処理のターゲットは、MySQLのクエリ投げです。


MySQL(DB)の非同期化


動機

アプリケーションで頻繁に発生するI/Oといえば、やはりDBアクセスではないでしょうか?このDBアクセスさえ非同期化してしまえば、他に非同期化するべきものは殆ど無いかもしれません。

ファイルアクセスはストリームパッケージを使ってしまえば、それでいいかなと思います。

正直いってしまうと、非同期化したい処理がDBアクセスくらいしかなかったのです。。。

で、ちょうど非同期化を実装しやすいmysqliを使って見ようと考えたのです。


やり方を決める

さて、DBアクセスを非同期処理化するにしても、どうやってやればいいのでしょうか。

探してみると、ストリームを使ってMySQLの非同期処理を実現している方がいました。

https://github.com/bixuehujin/reactphp-mysql

node.jsのmysqlパッケージを元に、自前でプロトコル書いてるみたいです。

こりゃ私のレベルじゃ手が出ません。。。

ここは安直な感じが否めませんが、タイマー使って周期的に結果の有無を確認することにしましょう。


mysqliの非同期処理を使う

mysqliと言うよりはmysqlndに非同期クエリの仕組みがあるので、こいつを使わせてもらいましょう。

http://fukaoi.org/2012/03/31/php_mysqli


mysqli::query

mysqliのメソッドqueryの第2引数に定数MYSQLI_ASYNCを指定することで、結果が返るのを待たずに、次の処理に移行できます


mysqli::poll

現在の接続の状態を問い合わせます。

ここでは、結果が返ってきたかどうかを調べます。


mysqli::reap_async_query

非同期クエリの結果の取り出しを行います。

これらの仕組みは元はDBへの接続を並列して行うために作られたものですが、今回は非同期処理作成のために使うことにします。


実装方針

これで使用するものは揃ったので、方針を決めちゃいましょう


  1. 非同期クエリをmysqli::queryを使って投げる

  2. ループ上で定期的に接続の状態をmysqli::pollを使って確認する

  3. 結果が返ってきていたらmysqli::reap_async_queryで結果セットを取り出し、コールバックに渡す

こんな感じでイケルと思います。


実装

実装結果をGistにおいておきます

https://gist.github.com/niisan-tokyo/dd61d54d670e0702d955

ここではMysqlWrapperというクラスを作って非同期処理の仕組みを実装しています。


処理概要

主要な処理を解説します。


query

非同期でクエリを投げます

    public function query($sql, $callback = null)

{
$this->removeAllListeners();
$this->connection->query($sql, MYSQLI_ASYNC);
$that = $this;

$this->timer = $this->loop->addPeriodicTimer(self::INTERVAL, function () use ($that) {
$that->checkPolling();
});

if ($callback === null) {
return;
}

$this->on('success', $callback);

}

非同期処理を投げた後、結果の返却を定期的に問い合わせるように、$loopにその処理を登録しています。

$that = $thisはコンテキストとかどうなるかわからないので、エイリアスを作っているだけです。

コールバックが指定されていれば、successイベントにその処理を登録します。


checkPolling

    private function checkPolling()

{
$read = $error = $reject = [$this->connection];
if (! \mysqli::poll($read, $error, $reject, 0, 1)) {
return;
}

if ($res = $this->connection->reap_async_query()) {
$this->emit('success', [$res]);
} else {
$this->emit('error');
}

$this->loop->cancelTimer($this->timer);

}

定期的問い合わせ処理の中身となります。

mysqli::pollは静的メソッドとして働くので、こんな書き方になっています。

http://php.net/manual/ja/mysqli.poll.php

pollの待ち時間を1マイクロ秒にしています。

イベントループの方でもう問い合わせ周期を決めているので、pollで結果の返りを待つ必要はありません。

というより、ここで待ち時間をある程度設定してしまうと、処理が滞る可能性があります。

イベントループはシングルスレッド処理なので、一つでも待ち時間の発生する同期処理があると、後に控える処理全てがつっかえてしまうからです。

このループ処理内では一つの接続についてしか問い合わせをしないので、結果の返りを検知したら、その接続から結果をreap_async_queryを使って取り出します。

あとは、結果セットを持たせてイベントを発火($this->emit('success', [$res]))させればいいだけ、となります。


使ってみる

使い方は簡単です。

$loop = React\EventLoop\Factory::create();

MysqlWrapper::setDefaultLoop($loop);
$config = [
'host' => '192.168.33.40',
'user' => 'root',
'pass' => 'password',
'db' => 'async_test'
];

$con = new MysqlWrapper($config);

$con->query('SELECT * FROM test WHERE name="testNo4444"', function ($res) {
$res_ass = $res->fetch_assoc();
echo $res_ass['name'] . PHP_EOL;
});

$loop->run();

こんな感じです。

ちなみに、テスト用のテーブルにはidとnameだけのテストテーブルを用意し、各nameフィールドにはtestNo{id}という文字列を入れてあります。

10000レコードほど入れてあります。


検証


非同期処理であることの検証

さて、この実装が非同期処理であるかどうかは次のようにして確かめることができます。

$loop = React\EventLoop\Factory::create();

MysqlWrapper::setDefaultLoop($loop);
$config = [
'host' => '192.168.33.40',
'user' => 'root',
'pass' => 'password',
'db' => 'async_test'
];

$connects = [];

//ここから検証
for ($num = 1; $num < 10; $num++) {
$connect = new MysqlWrapper($config);
$rand_time = rand(1, 2);
$sql = 'SELECT * FROM test WHERE name="testNo' . $num . '"';
$connect->query('SELECT SLEEP(' . $rand_time . ')', function () use ($sql, $connect) {
$connect->query($sql, function ($res) {
$res_ass = $res->fetch_assoc();
echo $res_ass['id'] . PHP_EOL;
});
});

$connects[] = $connect;
}

$loop->run();

これは9個の接続を作成し、各々の接続で簡単なクエリを非同期に投げているものです。

ここで、各接続はmysql上で1秒か2秒待つことになります。ですが、非同期処理を使っているおかげで、他の処理に影響を与えません。

実際、これをphpで実行すると、1秒毎2秒後にまとまって結果が表示されると思います。

また、出力される内容も実行するごとにその順番がバラバラなのがわかります。

この処理結果により、たしかに非同期処理が実現できていることが確認できます。


同期処理化

ここまでやったら、今度は同期処理の文法に持って行きたいですよね。

やり方は以前に紹介したとおりです。

そうです、Promiseを使います。

<?php

require '../vendor/autoload.php';

use NiisanTokyo\As2sm\MysqlWrapper;
use NiisanTokyo\As2sm\As2sm;
use React\Promise\Deferred;

$loop = React\EventLoop\Factory::create();
MysqlWrapper::setDefaultLoop($loop);

// queryの返却値をPromiseにして返す
class MysqlPromise extends MysqlWrapper
{

public function query($sql)
{
$defer = new Deferred();
parent::query($sql);

$this->on('success', function($res) use ($defer) {
$defer->resolve($res);
});

return $defer->promise();
}
}

//メイン処理
As2sm::wrap(function(){
$con = new MysqlPromise([
'host' => '192.168.33.40',
'user' => 'root',
'pass' => 'password',
'db' => 'async_test'
]);

$time = microtime(true);
$otime = $time;

//クエリの処理時間と、適当な結果を返す
for ($num = 0; $num < 20; $num++) {
$rand = rand(1, 10000);
$res = (yield $con->query('SELECT * FROM test WHERE name="testNo' . $rand . '"'));
$data = $res->fetch_assoc();
echo '経過時間: ' . (microtime(true) - $time) . '[sec], 結果: ' . $data['name'];
echo PHP_EOL;

$time = microtime(true);
}

echo '総時間: ' . (microtime(true) - $otime) . '[sec]' . PHP_EOL;
});

$loop->run();

結果はこんな感じです

経過時間: 0.033907890319824[sec], 結果: testNo3708

経過時間: 0.0039401054382324[sec], 結果: testNo5413
...
経過時間: 0.0050308704376221[sec], 結果: testNo9618
経過時間: 0.0039379596710205[sec], 結果: testNo5960
経過時間: 0.0046210289001465[sec], 結果: testNo8060
総時間: 0.12896704673767[sec]

慣れれば簡単です。


WEBサーバ

最後に、WEBサーバで使ってみましょう

WEBサーバの立て方は

http://qiita.com/niisan-tokyo/items/4b129cf5b2baba80e02a

を見てください。

$app = function ($request, $response) {

As2sm::wrap(function () use ($response) {
$con = new MysqlPromise([
'host' => '192.168.33.40',
'user' => 'root',
'pass' => 'password',
'db' => 'async_test'
]);

$time = microtime(true);
$otime = $time;
$output = '<html><header><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head><body>';
for ($num = 0; $num < 20; $num++) {
$rand = rand(1, 10000);
$res = (yield $con->query('SELECT * FROM test WHERE name="testNo' . $rand . '"'));
$data = $res->fetch_assoc();
$output .= '経過時間: ' . (microtime(true) - $time) . '[sec], 結果: ' . $data['name'];
$output .= '<br />';

$time = microtime(true);
}

$output .= '総時間: ' . (microtime(true) - $otime) . '[sec]<br /></body></html>';
$response->writeHead(200, []);
$response->end($output);
$con->disconnect();
});
};

先ほどのコードをWEBサーバ用に書き換えました。

ちなみに、ここでちょっとだけハマりました。

最後にdisconnectで強制的にMySQLとの接続を切っています。

ReactのWEBサーバはプロセスが死なないので、オブジェクトのガベージコレクタが働くまでしばらく接続が残り続けてしまうようです。

で、接続を繰り返すと、MySQLサーバの接続上限に引っかかったらしく、エラーをすごい勢いで吐き出していました。

性能は・・・あまり出ません。

というのも、私のローカル環境のMySQLが貧弱で同時接続数が多すぎると対応できなくなったのです。

特に、今回の検索条件は、なにげにall検索なので、負荷がかかりまくるようで、接続数が増えると飛躍的にレスポンスが遅くなりました。

この辺はテーブル設計と、DBサーバ自体の性能で何とかするしか無いところですね。


まとめ

PHPで非同期処理を書くというのは、ある意味冒険のようなところがあります。

しかし、決して不可能ではないし、順をおっていけば割と簡単に書けるものです。

また、ReactPHPは、WEBサーバにかぎらず、非同期処理の支援をしてくれる便利なツールなので、流行ってくれるといいなぁとか思います。


参考資料

https://github.com/bixuehujin/reactphp-mysql

PHPの非同期クエリで並行処理をやってみる - Fukaoi.org

PHPはReactで非同期処理対応のWEBサーバを構築する