GuzzleHttp は非同期リクエストを送信できる
- PHPはシングルスレッド
- GuzzleHttp/Promiseクラスの存在
- Promise/A+準拠インターフェース
一体、どのように実現しているのか?
GuzzleHttp がマルチスレッドというわけではない
既に試している人がいたので、引用する。
https://qiita.com/tadsan/items/63b8d84193498b1c6191
thenの処理でsleepすると次の処理が待たされることが確認されている。
php_curl 並列処理メソッド
See: https://www.php.net/manual/ja/ref.curl.php
主要な処理を抜粋
- curl_multi_add_handle
- curl_multi_exec
- curl_multi_info_read
- curl_multi_remove_handle
GuzzleHttp はAPI
- php_curlに並列処理の関数がある
- GuzzleHttp は php_curl の並列処理を利用しているだけ
それにしても、どのように実装したら非同期(のように)処理できるのか?
ストリームを利用すれば…
例えば次のように、ファイルハンドルを複数開いて順番にreadするループを作る。
$fps = [];
for ($i = 0; $i < 3; $i++) {
$fps[] = fopen("tmp/$i", 'r');
}
while ($fps) {
foreach ($fps as $i => $fp) {
rewind($fp);
$line = fgets($fp);
if (preg_match('/finish/', $line)) {
unset($fps[$i]);
}
}
}
ファイルに別のプロセスから書き込めばそれらしい動きができる。
GuzzleHttp の標準的な並列処理のコード
$concurrency = 3;
$url = 'http://localhost/wait-random.php';
$generator = function () use ($concurrency, $url) {
for ($i = 0; $i < $concurrency * 2; $i++) {
yield new Request('GET', $url, ['Accept' => 'application/json']);
}
};
$pool = new Pool(new Client(), $generator(), [
'concurrency' => $concurrency,
'fulfilled' => fn (Response $_, int $index) => trace(...),
'rejected' => fn ($_, int $index) => trace(...),
]);
$pool->promise()->wait();
調査を始める
https://github.com/MingSiro71/guzzle-async-code-reading
以下のファイルが入っています。
- PHP環境を立ち上げるDockerの設定ファイル
- GuzzleHttpのコードにトレース出力を組み込んだもの
- GuzzleHttpの非同期処理を実行するスクリプト
- ファイルハンドルで非同期らしいことをするサンプル
ログを表示するメソッド
function trace(string $message): void
{
$backtrace = debug_backtrace();
echo ($backtrace[1]['class'] ?? 'root')
. '::'
. ($backtrace[1]['function'] ?? '\Closuer')
. " $message [" . Carbon::now()->format('H:i:s.u') . ']'
. PHP_EOL;
}
リクエストを受ける側の処理
$delay = rand(500000, 999000);
usleep($delay);
echo json_encode([
'status' => 'OK.',
'delay' => (string) $delay/1000 . 'ms',
]);
GuzzleHttp のコードに関数を差し込んでトレースを書き出す
root::\Closuer start tracing code. [07:52:03.957759]
GuzzleHttp\Pool::__construct Initialize "EachPromise" with sendAsync generator. [07:52:03.959862]
GuzzleHttp\Pool::promise Call promise of "each" of the pool. [07:52:03.959974]
GuzzleHttp\Handler\CurlMultiHandler::__invoke Start. [07:52:03.961380]
GuzzleHttp\Handler\CurlMultiHandler::addRequest Add an entry to "handles". Keys in "handles": [41] [07:52:03.961484]
GuzzleHttp\Handler\CurlMultiHandler::addRequest Hand a request to curl. [07:52:03.961490]
GuzzleHttp\Handler\CurlMultiHandler::__invoke Start. [07:52:03.961563]
GuzzleHttp\Handler\CurlMultiHandler::addRequest Add an entry to "handles". Keys in "handles": [41,59] [07:52:03.961586]
GuzzleHttp\Handler\CurlMultiHandler::addRequest Hand a request to curl. [07:52:03.961590]
GuzzleHttp\Handler\CurlMultiHandler::__invoke Start. [07:52:03.961625]
GuzzleHttp\Handler\CurlMultiHandler::addRequest Add an entry to "handles". Keys in "handles": [41,59,76] [07:52:03.961644]
GuzzleHttp\Handler\CurlMultiHandler::addRequest Hand a request to curl. [07:52:03.961648]
GuzzleHttp\Handler\CurlMultiHandler::execute Start. [07:52:03.961659]
...
GuzzleHttp の構成要素
GuzzleHttpの並列リクエストを理解するための概念
- wait: Promiseを解決を待っている状態にする、つまりPromiseの実行
- wait function: Promiseが実行されると行われる処理
- onflulfilled: wait function正常終了時のイベントフック、thenで登録する
- onrejected: wait function異常終了時のイベントフック、thenで登録する
- state: Promiseの状態、Pending, Fulfiled, Rejectedで表す
- task: 1つのリクエストを行う処理
- aggregate: taskを統合した処理全体のPromise
- concurrency: 並列処理数
- pending: aggregateに含まれるPromiseのうち実行中のもの
- concurrencyと同数以下の数スタックされる
- queue: イベントフックを処理するためのキュー、taskは取り扱わない
Poolの役割
- 送信するリクエスト配列を管理する
- taskをClientに渡していく
- 正確には渡す処理を生成するジェネレータをEachPromiseに渡す
EachPromiseの役割
- Poolから受け取ったジェネレータとaggregate・pendingを管理するクラス
- 自身もPromiseインターフェースを持つ
- concurrencyの分だけtaskをジェネレータからpendingに移す
- その際、taskをPromiseにし、ジェネレータを進める処理をtaskのフックに登録する
- 解決されたtaskが次のtaskをpendingに詰める動きをキックするように仕向ける
- 全taskがResoleveまたはRejectされたらaggregateを解決する
TaskQueueは何をするのか
- TaskQueue はstaticでqueueを公開する
- どこからでもシングルトンなキューに処理を追加できる
- どこからでもrunでき、キューの中身を先頭から一括実行できる
Promiseはtaskやaggregateに使われる
- Promise/A+インターフェースを実現するため、クロージャをラップする役割
- waitするとwait functionのクロージャを実行する
- thenでonFlulfilledとonRejectedをいくつでも登録できる
- resoleveされることでonFulfilledを実行する
- wait functionが解決したかPromise自体は認識できない
- 正確には、onFulfilledが実行されるようにqueueに処理を詰める
- rejectされると、同様にonRejectedを実行する
つまり、wait function, onFlulfilled, onRejected, stateをセットで保存する容器。外から仕様に沿うように取り扱ってやる必要がある。
CurlMultiHandlerが全体をハンドリング
- __invokeされると、addRequestをコールしてphp_curlにリクエストを登録する
- executeは1回起動するとPoolに渡した全てのtaskを処理するまでtickをループする
- tickはqueue::runとphp_curlからの応答チェックを行う
- 応答の分だけレスポンスに対応するtaskのフックをqueueに登録する
- queue::runして自身が登録したフックを実行する
- queueからtaskのフックが実行されるとEachPromiseでジェネレータが前に進む
トレース
初回のリクエスト登録とexecute
tick tack tick tack...
ユーザ定義フックの実行とpendingの補充
繰り返す
最後のレスポンスが返ってきた時の動き
感想
率直なところ設計が複雑に感じられる。ある程度コードを繰り返し読んだが、すぐに理解が曖昧になるくらい記憶しにくい。
CurlMultiHandlerがcurlのアダプターとPromiseの処理エンジンの一部を兼務しているからではないか。
しかし、PromiseはCurlMultiHandlerでしか使わないので、Promiseの処理エンジンを切り離してCurlMultiHandlerよりも下のレイヤーに配置しにくい為だと思われる。
非同期の仕組みが標準装備されている言語は大変ありがたいものだと実感できる。