C++
Qt
concurrent
qt5
Threading

Qt Concurrentのヌヌメ

はじめに

こんにちは。今回はQt Concurrentの紹介をします。コード多めの長文で独りよがりなので、苦手な方はブラウザのBackボタンをどうぞ。

概要

Qt ConcurrentはQtのマルチスレッド関連の高レベルAPIです。Qt Concurrentを使用することで、ミューテックスやセマフォといった低レベルAPIを意識せずに、スケール可能なアプリケーションを容易に作成することができます。

...と、Qt Concurrentの説明というと大体こんなカンジになると思うんですが、その割にQt Concurrentさんは扱いが低い気がしています。試しにQt Creatorの「ようこそ」-「サンプル」で検索窓に"thread"と入れると、Qt Concurrentのサンプルは一つも出てきません(Qt Creator 4.5.1)。これはいかんよ!みんなもっとQt Concurrent使おうよ!幸せになろうよ!

そんなわけでQt Concurrentを褒めたたえる記事を書くことにしました。記事の中で書いているサンプルプログラムはGitHubにあげてあります。

マルチスレッドAPIの比較

Qtでマルチスレッド処理を実装する場合、以下の3つのAPIからいずれかを選択する事になると思います。

API
低レベル
QThreadクラス
QThreadPoolクラス

高レベル
Qt Concurrentモジュール

高レベルAPIを使用した方がプログラミングがラクになることが多いですが、低レベルAPIでなければできないこともあります。比較すると大体以下のようになります。

API できること 低レベルAPIに比べてできないこと
QThread

スレッド実行(前|後)処理のカスタマイズ

イベントディスパッチャのカスタマイズ

QThreadPool QThreadインスタンスの(生成|破棄)の管理

スレッド実行優先度の制御

イベントディスパッチャのカスタマイズ

QtConcurrent QRunnableクラス実装の高度なラッピング

↑の内容

タスク実行優先度の制御

使用するQThreadPoolの指定(map/filterの場合)

実装コードがどのように違ってくるのか、以降サンプルプログラムで説明していきます。

サンプルの説明

Runボタンで10個のタスクをそれぞれWorkerクラスに実行させ、その進捗を表示するGUIプログラムを考えます。RunボタンとCancelボタンが有り、タスク実行中はCancelボタンのみ有効、タスクが実行していない時はその逆になるようにします。

worker.h
#ifndef WORKER_H
#define WORKER_H
#include <QtCore/QObject>
#include <QtCore/QThread>
#include <QtCore/QDebug>
#include <atomic>

class Worker : public QObject {
    Q_OBJECT
public:
    explicit Worker(QObject *parent = Q_NULLPTR) : QObject{parent} {qDebug() << "new" << this;}
    ~Worker() Q_DECL_OVERRIDE {qDebug() << "delete" << this;}
signals:
    void progressChanged(int done);
    void finished();
public slots:
    void doWork() {
        // work few seconds
        for (int i = 0; i <= 100; ++i) {
            if (canceled_) break;
            QThread::msleep(rand() * 10 / RAND_MAX);
            emit progressChanged(i);
        }
        emit finished();
    }
    void cancel(){canceled_ = true; qDebug() << "canceled" << this;}
private:
    std::atomic_bool canceled_ = false;
};
#endif // WORKER_H

サンプル1) 全部メインスレッド

まずはマルチスレッド処理を考えずにベタっと書いてみます。

threading.pro
QT += widgets
HEADERS += worker.h
SOURCES += main.cpp
main.cpp
#include <QtWidgets/QApplication>
#include <QtWidgets/QSlider>
#include <QtWidgets/QVBoxLayout>
#include <QtWidgets/QHBoxLayout>
#include <QtWidgets/QPushButton>
#include "worker.h"

class MainWindow : public QWidget {
    Q_OBJECT
public:
    MainWindow(QWidget *parent = Q_NULLPTR) : QWidget{parent}{
        auto mainLayout = new QVBoxLayout{};
        setLayout(mainLayout);
        auto buttonLayout = new QHBoxLayout{};
        auto runButton = new QPushButton{QStringLiteral("&Run")};
        connect(runButton, &QPushButton::clicked, this, &MainWindow::run);
        connect(this, &MainWindow::runningChanged, runButton, &QPushButton::setDisabled);
        buttonLayout->addWidget(runButton);
        cancelButton_ = new QPushButton{QStringLiteral("&Cancel")};
        connect(this, &MainWindow::runningChanged, cancelButton_, &QPushButton::setEnabled);
        buttonLayout->addWidget(cancelButton_);
        mainLayout->addLayout(buttonLayout);
        auto progressLayout = new QVBoxLayout{};
        for (int i = 0; i < 10; ++i) {
            auto bar = new QSlider{Qt::Horizontal};
            bar->setRange(0, 100);
            bars_ << bar;
            progressLayout->addWidget(bar);
        }
        mainLayout->addLayout(progressLayout);
        emit runningChanged(false);
    }
signals:
    void runningChanged(bool running);
public slots:
    void run() {
        emit runningChanged(true);
        for (int i = 0; i < 10; ++i) bars_[i]->setValue(0);

        for (int i = 0; i < 10; ++i) {
            auto bar = bars_[i];
            bar->setValue(0);
            Worker worker;
            connect(&worker, &Worker::progressChanged, bar, &QSlider::setValue);
            connect(cancelButton_, &QPushButton::clicked, &worker, &Worker::cancel);
            worker.doWork();
        }
        emit runningChanged(false);
    }
private:
    QPushButton *cancelButton_;
    QList<QSlider*> bars_;
};

int main(int argc, char *argv[]) {
    QApplication a(argc, argv);
    MainWindow w;
    w.show();
    return a.exec();
}

#include "main.moc"

「Run」ボタンをクリックするとrunスロットでタスクを開始しますが、10個のタスクを実行している間はメインスレッドがブロックされているためGUIは応答しません。Cancelボタンも効きません。

サンプル2) QThreadを使う

サンプル1のようにGUIがフリーズしてしまう現象は、QThreadを使ってタスクを別スレッドで実行することで回避できます。main.cppを以下のように書き換えます。

main.cpp
// (中略)1と同じ
#include <QtCore/QThread> // [1] QThreadヘッダをincludeします
#include "worker.h"

class MainWindow : public QWidget {
    Q_OBJECT
public:
    MainWindow(QWidget *parent = Q_NULLPTR) : QWidget{parent}{
// (中略)1と同じ
        for (int i = 0; i < 10; ++i) {
            auto bar = new QSlider{Qt::Horizontal};
            bar->setRange(0, 100);
            bars_ << bar;
            progressLayout->addWidget(bar);
        }
        mainLayout->addLayout(progressLayout);
        emit runningChanged(false);
    }
    /* [6] アプリケーション終了時にはすべてのスレッドを
       安全に停止させてやる必要があります。 */
    ~MainWindow() Q_DECL_OVERRIDE {
        emit canceled();
        for (auto thread : threads_) thread->wait();
    }
signals:
    void runningChanged(bool running);
    void canceled();
public slots:
    void run() {
        emit runningChanged(true);
        for (int i = 0; i < 10; ++i) bars_[i]->setValue(0);

        for (int i = 0; i < 10; ++i) {
            auto bar = bars_[i];
            bar->setValue(0);
            auto worker = new Worker{};
            connect(worker, &Worker::progressChanged, bar, &QSlider::setValue);
            /* [4] キャンセル処理は元のコードのままではcancelスロット呼び出しが
               ワーカースレッドのキューに入ってしまい、ワーカースレッドのイベントループへ
               制御が戻ってくるまで処理されません。cancelを直接関数コールするか、
               DirectConnectionを指定してconnectする必要があります。 */
            connect(cancelButton_, &QPushButton::clicked, worker, &Worker::cancel, Qt::DirectConnection); // [4]
            connect(worker, &Worker::finished, worker, &Worker::deleteLater);
            /* [2] タスクを実行するワーカースレッド(QThreadインスタンス)を準備し、
               WorkerをそのスレッドにmoveToThreadします。
               これ以降のQtフレームワーク側からのスロット呼び出しは
               ワーカースレッドで行われます。
               ワーカースレッドが開始したらWorkerが処理を開始し、Workerが処理を
               終了したらリソースを解放するようにそれぞれconnectしておきます。
               準備が出来たらワーカースレッドをstartします。 */
            auto thread = new QThread{};
            worker->moveToThread(thread);
            connect(thread, &QThread::started, worker, &Worker::doWork);
            connect(worker, &Worker::destroyed, thread, &QThread::quit);
            connect(thread, &QThread::finished, thread, &QThread::deleteLater);
            /* [3] QThreadではスレッドの実行優先度を指定できます。
               start後はsetPriorityで実行優先度を変更することができます。
               ただしlinuxでは実行優先度の設定は無視されるそうです。 */
            thread->start(QThread::LowestPriority);

            /* [5] 実行中のスレッドを保持しておき、全てのスレッドが停止したのを確認して
               ボタンの有効/無効を切り替える必要があります。 */
            connect(this, &MainWindow::canceled, worker, &Worker::cancel, Qt::DirectConnection);
            connect(thread, &QThread::finished, this, [this](){
                threads_.removeOne(qobject_cast<QThread*>(sender()));
                if (threads_.isEmpty()) emit runningChanged(false);
            });
        }
    }
private:
    QPushButton *cancelButton_;
    QList<QSlider*> bars_;
    QList<QThread*> threads_;
};

int main(int argc, char *argv[]) {
    QApplication a(argc, argv);
    MainWindow w;
    w.show();
    return a.exec();
}

#include "main.moc"

さて、一気にめんどくさくなりました。

上のコードではタスクが多くなると物理スレッドの数を超えてスレッドを生成してしまうため、スレッド切替のオーバーヘッドで全体の処理にかかる時間が増大してしまいます。QThreadの数を制限してWorkerを適切に振り分けてやる必要があるでしょう。
QThreadとWorkerのインスタンスを先に全部作ってしまうので、Workerが大きなリソースを扱う場合にはそこも悩みです。

また、スレッドの生成/破棄はコストが大きいため、小さなタスクが頻繁に発生する場合にはこのコードでは非効率です。かと言って事前に必要な数のQThreadインスタンスを生成してstartしておくと、今度は不要なリソースが確保される上に、ワーカースレッドのイベントループのコストが常にかかることになります。

サンプル3) QThreadPoolを使う

QThreadPoolはQThreadの生成破棄を管理してくれるクラスです。QRunnableを継承したクラスのrun関数を別スレッドで実行することができます。良く書かれるサンプルコードは以下のようなものです。

runnable_wack_sample.cpp
class MyWorker : public QRunnable {
public:
    void run() Q_DECL_OVERRIDE {
        doProcess1();
        doProcess2();
        doProcess3();
    }
// (中略)
}
void func() {
    QThreadPool::globalInstance()->start(new MyWorker{});
}

実際には上のコードではWorkerがQRunnableにガッツリ依存してしまうのでちょっとダサいです。QRunnable継承クラスの実装はWorkerを走らせるだけにした方がいいでしょう。

runnable_sample.cpp
class MyWorker {
public:
    void doWork() {
        doProcess1();
        doProcess2();
        doProcess3();
    }
// (中略)
}
class WorkerRunner : public QRunnable {
public:
    void run() Q_DECL_OVERRIDE {
        MyWorker worker;
        worker.doWork();
    }
}

とはいえ、走らせるWorker毎にQRunnable継承クラスを用意するのは面倒でつまらない作業です。それならWorkerを抽象化したクラスを作ろうか、それともRunnerをテンプレートクラスにしてdoWorkだけを共通化しようか。C++11以降ならもっといい方法があります。Workerのメンバーにstd::functionを持たせて、テンプレートパラメータパックを使えば汎用的なRunnerクラスを作れるはずです。

template_runner.cpp
template <typename ...Args>
class Runner : public QRunnable {
public:
    Runner(std::function<void(Args...)> func, Args... args) : func{std::bind(func, args...)} {}
    void run() Q_DECL_OVERRIDE {func();}
private:
    std::function<void()> func;
};
void func() {
    QThreadPool::globalInstance()->start(new Runner<>{[]{
        Worker worker;
        worker.doWork();
    }});
}

手元の環境ではテンプレートパラメータ推定がうまくいかないのでちょっとコンストラクタ呼び出し時の<>指定がダサいですが、とにかくこれでラムダでもメンバー関数でも何でもQThreadPoolに渡せるようになります。もうちょっと工夫すれば実行終了後に戻り値を取れるようなクラスを作ることだって簡単です。

...そろそろ「おいおいちょっと待ってくれよ」という声が聞こえてきそうな気がします。そう、それこそが(ほぼ)QtConcurrentRunの正体ですよね。QtConcurrentについては次の章で説明するとして、ここでは前章のコードをQThreadPool(と上の怪しげなRunnerクラス)を使って書き直しておきます。

main.cpp
// (中略)1と同じ
#include <QtCore/QThreadPool>
#include <QtCore/QRunnable>
#include "worker.h"

template <typename ...Args>
class NandemoRunner : public QRunnable {
public:
    NandemoRunner(std::function<void(Args...)> func, Args... args) : func{std::bind(func, args...)} {}
    void run() Q_DECL_OVERRIDE {func();}
private:
    std::function<void()> func;
};

class MainWindow : public QWidget {
    Q_OBJECT
public:
    MainWindow(QWidget *parent = Q_NULLPTR) : QWidget{parent}{
// (中略)1と同じ
        /* [2] setMaximumThreadCountでQThreadの最大インスタンス数を設定できます。
           既定値はシステムの論理スレッド数になっているのであまり指定することは
           無いと思います。setExpiryTimeoutではQThreadインスタンスを破棄するまでの
           タイムアウト時間を設定できます。ここでは300msec新しいタスクが
           投入されなければインスタンスを破棄するように設定しています。 */
        threadPool.setMaxThreadCount(2);
        threadPool.setExpiryTimeout(300);
        /* [3] cancel時にQThreadPoolのclearを呼ぶようにしています。
           clear呼び出しでQThreadPoolは実行待ちのRunnerを全てキューから削除し、
           autoDelete(デフォルトはtrue)のインスタンスであればdeleteしてくれます。 */
        connect(this, &MainWindow::canceled, &threadPool, &QThreadPool::clear);
        for (int i = 0; i < 10; ++i) {
            auto bar = new QSlider{Qt::Horizontal};
            bar->setRange(0, 100);
            bars_ << bar;
            progressLayout->addWidget(bar);
        }
        mainLayout->addLayout(progressLayout);
        emit runningChanged(false);
    }
    ~MainWindow() Q_DECL_OVERRIDE {
        emit canceled();
    }
signals:
    void runningChanged(bool running);
    void canceled();
public slots:
    void run() {
        emit runningChanged(true);
        for (int i = 0; i < 10; ++i) bars_[i]->setValue(0);

        /* [4] QThreadの例に比べて、QThreadインスタンスとWorkerインスタンスの
           生成/破棄を自分で管理する必要がなくなりました。Runnerはrunの上書きと
           std::functionしかもっていないのでほとんどリソースを消費しません。
           Workerは実際に処理が走っている分しかインスタンスが生成されません。
           QThreadインスタンスはQThreadPoolのstart呼び出し時に必要があれば
           必要な数だけ生成され、Runnerが全て終了した後300msec新しいRunnerが投入
           されなければ適切に破棄されます。

           ※ NandemoRunnerの使い方を示すために上は引数有り、下は引数無し(ラムダ
              でキャプチャ)を使って書いています。 */
        for (int i = 0; i < 10; ++i) {
            auto bar = bars_[i];
            bar->setValue(0);
            threadPool.start(new NandemoRunner<MainWindow*, QSlider*>{[](MainWindow *window, QSlider *bar){
                Worker worker;
                worker.connect(&worker, &Worker::progressChanged, bar, &QSlider::setValue);
                worker.connect(window, &MainWindow::canceled, &worker, &Worker::cancel, Qt::DirectConnection);
                worker.doWork();
            }, this, bar}, i);
            /* [6] QThreadPoolの最後の引数ではRunnerの優先度を指定できます。
               スレッドの実行優先度ではないことに注意が必要です。ここでは、優先度
               0~9のRunnerに対して最初の2つ(maxThreadCountまで)は投入した順
               (0, 1)、それ以降は優先度の高い順(9, 8, ... , 2)にRunnerが
               キューから取り出されて実行されます。
               (実行すればスライダーの動きで確認できます) */

            /* [5] 全てのタスクが終了したかどうかは、QThreadPool::waitForDoneを
               別のスレッドで待ち受けることにしました。 */
            QThreadPool::globalInstance()->start(new NandemoRunner<>{[this]{
                    threadPool.waitForDone();
                    emit runningChanged(false);
                }
            });
        }
    }
private:
    QPushButton *cancelButton_;
    QList<QSlider*> bars_;
    /* [1] ここではQThreadPool::globalInstanceを使わずにローカルのインスタンスを
       用意しています。QThreadPoolは自分が管理しているQThreadが全て終了するまで
       デストラクタでwaitしてくれるので、Workerが他のstaticでないリソースに
       アクセスする場合は、それよりも先に削除されるローカルな
       QThreadPoolインスタンスを使用する方が都合が良い場合が多いです。 */
    QThreadPool threadPool;
};

int main(int argc, char *argv[]) {
    QApplication a(argc, argv);
    MainWindow w;
    w.show();
    return a.exec();
}

#include "main.moc"

どうでしょうか。QThreadを使った場合に比べて随分プログラマーが考慮しなければならない事項が減ったと思います。スレッド実行優先度を制御しなければならない特殊な場合を除けば、QThreadよりもQThreadPoolを使った方が効率良くバグの少ないコードが書けそうな気がしませんか。

サンプル4) QtConcurrentを使おう!

ここまでお付き合いくださいましてありがとうございます、ようやく本題です。QtConcurrentを使えば、前章のようなテンプレートなRunnerクラスを頑張って自分で作らなくても、もっと簡単なAPIでQThreadPoolを使った並列処理ができるようになります。

main.cpp(抜粋)
/*
 * threading.proに Qt += concurrent を追加
 * <QtConcurrent/QtConcurrentRun> ヘッダをincludeしておく
 */
            // [1] Runnerクラスは必要無くなりました。
            QtConcurrent::run(&threadPool, [](MainWindow *window, QSlider *bar){
                Worker worker;
                worker.connect(&worker, &Worker::progressChanged, bar, &QSlider::setValue);
                worker.connect(window, &MainWindow::canceled, &worker, &Worker::cancel, Qt::DirectConnection);
                worker.doWork();
            }, this, bar);
            // [2] globalInstanceを使う場合はThreadPoolを指定する必要がありません。
            QtConcurrent::run([this]{
                threadPool.waitForDone();
                emit runningChanged(false);
            });

QThreadPoolのAPIを使う場合に比べてRunnerの優先度を指定できないという制限はありますが、Runnerクラスを自分で作ってメンテナンスする必要が無い、というのは大きいですよね!

...よね?

別のThreadPoolを使って全てのタスクの終了をwaitするところがイマイチですが、タスクをQListに入れられる、かつQThreadPoolのglobalInstanceを使っても良い場合はQtConcurrentMapが使えます。(そうじゃない場合は上のthreadPool.waitForDoneを使う方法より、自分でstartの時にインクリメントしてfinishedでデクリメントする整数メンバを使う方が良いんじゃないかなぁ)

main.cpp
// (中略)1と同じ
#include <QtConcurrent/QtConcurrentMap>
#include <QtCore/QFutureWatcher>
#include <QtCore/QFutureSynchronizer>
#include "worker.h"

class MainWindow : public QWidget {
    Q_OBJECT
public:
    MainWindow(QWidget *parent = Q_NULLPTR) : QWidget{parent}{
// (中略)1と同じ
        for (int i = 0; i < 10; ++i) {
            auto bar = new QSlider{Qt::Horizontal};
            bar->setRange(0, 100);
            bars_ << bar;
            progressLayout->addWidget(bar);
        }
        mainLayout->addLayout(progressLayout);
        emit runningChanged(false);
    }
    ~MainWindow() Q_DECL_OVERRIDE {
        emit canceled();
    }
signals:
    void runningChanged(bool running);
    void canceled();
public slots:
    void run() {
        emit runningChanged(true);
        for (int i = 0; i < 10; ++i) bars_[i]->setValue(0);

        auto watcher = new QFutureWatcher<void>{};
        // [1] 全タスクの終了はQFutureWatcherのfinishedシグナルで受け取れます。
        connect(watcher, &QFutureWatcher<void>::finished, this, [this, watcher]{
            watcher->deleteLater();
            emit runningChanged(false);
        });
        connect(this, &MainWindow::canceled, watcher, &QFutureWatcher<void>::cancel);
        watcher->setFuture(QtConcurrent::map(bars_, [this](QSlider *bar){
            Worker worker;
            worker.connect(&worker, &Worker::progressChanged, bar, &QSlider::setValue);
            worker.connect(this, &MainWindow::canceled, &worker, &Worker::cancel, Qt::DirectConnection);
            worker.doWork();
        }));
        // [2] 全スレッド終了の待ち受けはQFutureSynchronizerに任せるのが良いでしょう。
        sync.addFuture(watcher->future());
    }
private:
    QPushButton *cancelButton_;
    QList<QSlider*> bars_;
    QFutureSynchronizer<void> sync;
};

int main(int argc, char *argv[]) {
    QApplication a(argc, argv);
    MainWindow w;
    w.show();
    return a.exec();
}

#include "main.moc"

おわりに

はじめに にも書いたとおりQThreadPoolはスレッド実行優先度を、QtConcurrentRunはタスクの優先度を、QtConcurrentMap/QtConcurrentFilterはQThreadPoolインスタンスを、それぞれ指定できないという制限があります。ただ、その制限内ではより効率良くコードが書けるので、可能であればどんどんQtConcurrentを使えばいいんじゃないかな、と思っています。この記事を読んだ方も同じ気持ちになって頂けたなら幸いです。

上の制限は、QThreadPoolのコンストラクタ引数にスレッド実行優先度を指定できるようになれば良いしQtConcurrentRunの引数にタスク優先度取得用の関数オブジェクトを指定できるようになっても良いと思うし、QtConcurrentMap/QtConcurrentFilterの引数にQThreadPoolを渡せるようになっても良いと思います。このあたりは少しずつでもパッチを書いてみます。

長文にお付き合いくださいましてありがとうございました。