2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PHPでバッチ処理を並列で行う方法

Last updated at Posted at 2021-06-27

はじめに

直近の記事を書くにあたり、ダミーデータが大量に必要でした。
1500万レコードほどのデータを用意したかったので、PHPでバッチを並列処理できるライブラリを探しました。
Packagistや各種記事をあさり、実際にいろいろ試してみたところ「qxsch/WorkerPool」というライブラリが一番お手軽にできましたので、サンプルコードや調査結果などを交えつつ記事にしたいと思います。

直近記事

動作確認環境

  • PHP 8.0
  • Laravel 8.0
  • MySQL 8.0

※下位環境でも動作する場合がございます

計測

時間の都合上、150万レコードの挿入で計測時間を比較しました。
Eloquentを使った一番シンプルなインサートではあまりにも遅すぎるため、計測はマルチインサートとの比較としました。

通常の方法

できる限り高速化させるためにマルチインサートで入れ込みます。
Laravelでは下記の方法でマルチインサートが可能です。

DB::table('dummy_users')->insert($insert_records);
// 150万record
$insert_records = [];
for ($i = 0; $i < 10; $i++) {
    for ($j = 0; $j < 15; $j++) {
        $time1 = Carbon::today('Asia/Tokyo');
        $time2 = Carbon::now('Asia/Tokyo');
        $time3 = Carbon::now('Asia/Tokyo');
        $time4 = Carbon::now('Asia/Tokyo');
        $time5 = Carbon::now('Asia/Tokyo');

        $attributes = [];
        $attributes['name'] = 'hogehoge';
        $attributes['session_token'] = 'fugefuge';
        $attributes['user_token'] = 'token';
        $attributes['sub_name'] = 'ながれぼし';
        $attributes['ios_gold'] = mt_rand(0, 9999);
        $attributes['android_gold'] = mt_rand(0, 9999);

        $add_day = mt_rand(0, 50);
        $attributes['last_login_datetime'] = $time5->addDays($add_day);
        $attributes['updated_at_timestamp'] = $time1->addDays($add_day)->timestamp;
        $attributes['updated_at_timestamp2'] = $time2->addDays($add_day)->timestamp;
        $attributes['created_at'] = $time3->addDays($add_day);
        $attributes['updated_at'] = $time4->addDays($add_day);

        $insert_records[] = $attributes;
    }
}

for ($i = 0; $i < 10000; $i++) {
    echo $i."\n";
    DB::table('dummy_users')->insert($insert_records);
}
// 150万レコードの計測結果
mysql> select count(*) from dummy_users;
+----------+
| count(*) |
+----------+
|  1500000 |
+----------+

time php artisan command:create_dummy_user
real    4m51.665s ← 約5分

1500万レコードではこの10倍、実運用ではもっと量が多かったり、複数テーブルを同時に作ったりする必要もあるので、5分でも遅すぎますね。

並列処理

では並列処理ではどれくらいか、計測してみました。

// 150万レコードの計測結果
mysql> select count(*) from dummy_users;
+----------+
| count(*) |
+----------+
|  1500000 |
+----------+

time php artisan command:create_dummy_user
real    0m51.103s ← 約50秒

並列プロセス数によっても時間は変わってくると思いますが、劇的に短縮することはできました!
以下、その導入方法についてです。

使用ライブラリ

ライブラリの導入

composer require qxsch/worker-pool

必要な拡張モジュール

該当ライブラリは拡張モジュールを必要となるため、足りない拡張モジュールがあった場合には適時導入します。
Dcokerを使っている場合は、下記のコマンドで追加が簡単に可能です。

// サンプル
docker-php-ext-install pcntl

// 導入されたかどうかの確認
php -i | grep pcntl
→ enabledとなっていればOKです

psコマンドの導入

当該ライブラリは複数プロセスによって並列処理を実現しています。
デバッグのためにはプロセスを表示するコマンド「ps」を利用します。
Dockerのコンテナに入っていない場合には、下記コマンドで追加可能です。

apt install procps -y

サンプルコード

<?php

namespace App\Console\Commands\DummyUserCreate;

use Carbon\Carbon;
use DB;
use QXS\WorkerPool\Semaphore;
use QXS\WorkerPool\WorkerInterface;

/**
 * Class CreateDummyUserWorker.
 */
class CreateDummyUserWorker implements WorkerInterface
{
    /**
     * ワーカーが別のプロセスに分岐した後に実行
     *
     * @param  \QXS\WorkerPool\Semaphore  $semaphore  the semaphore to run synchronized tasks
     * @throws \Exception in case of a processing Error an Exception will be thrown
     */
    public function onProcessCreate(Semaphore $semaphore): void
    {
        // 開始したことを示すメッセージ
        echo "\t[".getmypid()."] has been created.\n";

        // それぞれのプロセスでmt_randの値が異なるようにする
        [$sec1, $sec2] = explode(' ', microtime());
        mt_srand((float) $sec2 + ((float) $sec1 * 100000));
    }

    /**
     * ワーカープロセスが破棄される前に実行
     *
     * @throws \Exception in case of a processing Error an Exception will be thrown
     */
    public function onProcessDestroy(): void
    {
        // 終了したことを示すメッセージ
        echo "\t[".getmypid()."] will be destroyed.\n";
    }

    /**
     * run the work
     *
     * @param  mixed  $input  the data, that the worker should process
     * @throws \Exception in case of a processing Error an Exception will be thrown
     */
    public function run($input): void
    {
        $insert_records = [];
        for ($i = 0; $i < 10; $i++) {
            for ($j = 0; $j < 15; $j++) {
                $time1 = Carbon::today('Asia/Tokyo');
                $time2 = Carbon::now('Asia/Tokyo');
                $time3 = Carbon::now('Asia/Tokyo');
                $time4 = Carbon::now('Asia/Tokyo');
                $time5 = Carbon::now('Asia/Tokyo');

                $attributes = [];
                $attributes['name'] = 'hogehoge';
                $attributes['session_token'] = 'fugefuge';
                $attributes['user_token'] = 'token';
                $attributes['sub_name'] = 'ながれぼし';
                $attributes['ios_gold'] = mt_rand(0, 9999);
                $attributes['android_gold'] = mt_rand(0, 9999);

                $add_day = mt_rand(0, 50);
                $attributes['last_login_datetime'] = $time5->addDays($add_day);
                $attributes['updated_at_timestamp'] = $time1->addDays($add_day)->timestamp;
                $attributes['updated_at_timestamp2'] = $time2->addDays($add_day)->timestamp;
                $attributes['created_at'] = $time3->addDays($add_day);
                $attributes['updated_at'] = $time4->addDays($add_day);

                $insert_records[] = $attributes;
            }
        }

        for ($i = 0; $i < 20; $i++) {
            DB::table('dummy_users')->insert($insert_records);
        }
    }
}
<?php

namespace App\Console\Commands\DummyUserCreate;

use Illuminate\Console\Command;
use QXS\WorkerPool\WorkerPool;

/**
 * Class CreateDummyUser.
 */
class CreateDummyUser extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'command:create_dummy_user';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'create dummy user';

    /**
     * Execute the console command.
     */
    public function handle()
    {
        // 初期設定
        $wp = new WorkerPool();
        $wp->setWorkerPoolSize(64)
            ->disableSemaphore()
            ->create(new CreateDummyUserWorker());

        // タスクの実行
        for ($i = 0; $i < 500; $i++) {
            $wp->run($i);
        }

        // すべてのタスク完了を待つ
        $wp->waitForAllWorkers();
    }
}

おまけ知識

上記のコードはプロセスへのタスクが完了次第、同じプロセスを使いまわし次の処理を実行します。
もし、メモリオーバーなどを懸念し同じプロセスを使い回さない場合は、全プロセスの完了を待って一度開放処理を入れるようにしましょう。

/**
 * Execute the console command.
 */
public function handle()
{
    for ($count = 0; $count < 10; $count++) {
        // 初期設定
        $wp = new WorkerPool();
        $wp->setWorkerPoolSize(64)
            ->disableSemaphore()
            ->create(new CreateDummyUserWorker());

        // タスクの実行
        for ($i = 0; $i < 50; $i++) {
            $wp->run($i);
        }

        // すべてのタスク完了を待つ
        $wp->waitForAllWorkers();

        // 開放処理
        $wp->destroy();
    }
}

まとめ

Laravelではキュー(ジョブ)を使った並列処理がありますが、当記事のように複数プロセスでバッチ処理を走らせることで並列処理を実現することも可能です。
遅延を極力なくしてコマンドを並列処理したい場合は当記事の方法は有効な手段の一つになると思います。

関連記事

2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?