QtConcurrentでマルチスレッドに挑戦

  • 17
    Like
  • 2
    Comment
More than 1 year has passed since last update.

QtConcurrent入門

Qtおやつ部のhermit4です。カレンダー3日目の本日は、@yuntan_t さんが体調を崩されたということで、急遽ピンチヒッター参加です。実は引き受けるつもりで1日に登録したのですが、2日に仕事仲間の忘年会というのをすっかり失念してまして、ウーロン茶片手に下ネタおっさんトークを聞きながらカキカキしています。

本日は、@yuntan_t さんの当初の計画の通りマルチスレッド系ということで、Qtのマルチスレッドの中でもハイレベルでお手軽実行なQtConcurrentについて簡単にご紹介したいと思います。

なお、@hermit4は、仕事においてC++11がまだ利用できない環境下でQtを利用しているため、C++11は使っていません。書き方が古すぎる的な件については、ごめんなさいとだけ言っておきます。今回はboostとか他のライブラリリンクもしていません。Qtの開発環境があれば動かせるようにしています。Qtのバージョンは5.3.1で検証しましたが、Qt4.8系でも動くのではないかと思います(近日中に動作を確認します)。

また、@hermit4は、文系出身で体系だってプログラミングを学んだ事がないので、書物やWeb、あるいは使ってみながら感覚的に身につけた知識となります。理解が間違ってる、使い方がおかしい等あれば、どしどしご指摘下さい。

マルチスレッドについて

プログラムを学ぶにはいきなり作ってみるというのが一番だと思うのですが、念のためにマルチタスク・マルチプロセス・マルチスレッドについて少し記述しておきます。そんなのどうでもいいやって思う方は、さっくりQtConcurrentに挑戦して下さい。

シングルタスクとマルチタスク

ノイマン型CPUは、フェッチ・デコード・実行という一連の流れを同時に1つだけ実行出来るように作られています。この特性に従い、同時に動作するプログラムも1つだけであったシステムをシングルタスクと呼んでいます。今は昔、PCを立ち上げるとBASICやMS-DOSが動いていた時代のお話です。

その後、CPUの処理速度の向上と周辺機器との動作速度の乖離により、CPUの空き時間が多くなると、OS等によりCPUが処理するプログラムを短い時間で切り替えて、見かけ上並列に動いているかのように見せる技術が向上しました。同時に複数のプログラムが動作している(かのような状態を持つ)システムをマルチタスクと呼びます。

マルチタスクの実現手段の一つとして、プログラム毎にアドレス空間等のリソースを割り当てた「プロセス」という処理単位で管理・スケジューリングする機構をマルチプロセスと呼びます。現在では、一部の組込みOS等を除き、多くのOSはマルチプロセスのシステムとなっています。

シングルスレッドとマルチスレッド

マルチプロセスでは、メモリ空間、アドレス空間といったリソースを割り当てたプロセスとしてプログラムを実行しますが、基本的にプロセスの処理は、順次実行されていきます。手続き型の場合は関数、オブジェクト指向の場合はオブジェクトという、人に理解のおよぶ単位で分断して実装されたコードもコンピュータに与えられると、解きほぐされた1本の糸につながれたような処理の羅列にすぎません。この1本の糸の流れをシングルスレッドと呼びます。

シングルスレッドのシステムではプロセスは順次実行されます。それが例えばGUIアプリケーションの場合、重い処理を実行中は描画が止まる事になります。この辺りについては、明日詳しく語る事として、この解決策は、1つのプロセスの1本の糸を途中で分割し、複数を並列実行する(ように見せる)ことで、回避できます。この実現がマルチスレッドとなります。

マルチスレッドは、1つのプロセス内においてデータ領域(ヒープ領域および静的領域)やディスクリプタを共有したまま、スタックとプログラムカウンタを異にして並列動作させる仕組みとなります。

マルチスレッドの現在

ここまで、同時実行については少し含みを持たせた記述をしてきました。ノイマン型CPUは、同時に1つの命令だけを実行するもので、過去においては、同時に動いているように見せてきたわけですが、このCPUコアを複数持たせることで、同時に演算可能なCPUがマルチコアのプロセッサが現在は主流となっています。現在ではスマートフォンのような端末でもマルチコアの時代に突入しており、見かけ上だけではなく、実際に並列に演算が行えるのです。

CPUのリソースを十分に使い、より高速て快適な動作を提供するためには、マルチスレッドプログラミングが欠かせない知識となるでしょう。

Qtのマルチスレッド対応

QtはC++で実装されたフレームワークでありネイティブアプリケーションを実装する場合、C++で実装する事になりますが、C++では、長い間言語としてスレッドをサポートしていませんでした。

また、マルチスレッドの実現方法は、プラットフォーム毎に異なる場合も多く、POSIX互換のpthread等である程度の互換性はあるものの、完全な互換性は保たれてきませんでした。Qtはクロスプラットフォーム向けのGUIフレームワークとして発展してきたため、当然マルチスレッドについてもプラットフォーム間の差分を吸収するマルチスレッドフレームワークを内包しています。

QtConcurrentとは

QtConcurrentは、マルチスレッドのためのクラスの中でも、特にハイレベルでのマルチスレッドの実現手段であり、難しい排他処理や同期処理を簡略化して提供しています。

もちろん、ローレベルなQThread等のクラスに比べると柔軟性には欠けますが、一定の条件下でマルチスレッドを利用した場合には、有効な実現手段となることでしょう。

そこで、本日は、QtConcurrentについて、その利用方法を見て行きたいと思います。

QtConcurrentを有効にするには

QtConcurrentは、モジュールとしては、concurrentとなります。.proファイルに、以下の記述を追加してqmakeを行うことで利用可能となります。

QT += concurrent

QtConcurrentは、QtConcurrent namespaceで提供されるいくつかの関数と、実行結果の制御インターフェースとなるクラス、それらのコントロールに利用するUtility的なクラスで構成されています。

QtConcurrent::map

  • QFuture map(const Sequence & sequence, MapFunction function)
  • QFuture map(ConstIterator begin, ConstIterator end, MapFunction function)

map関数は、QListやQVectorといったコンテナのitemに対して、一定の処理を並列に行い値を更新する事ができます。並列数は、CPUのコア数等に応じて自動的に調整されます。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

struct Calc
{
    typedef void result_type;

    Calc(const int& a, const int&  b) :a_(a), b_(b) {}
    result_type operator()(int& x) { x = a_*x+b_; }

    int a_;
    int b_;
};

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);    
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    int a = 12;
    int b = 3;
    qDebug() << " --- Calc " << a << " * data[n] + " << b;
    QFuture<void> ret = QtConcurrent::map(data,Calc(a,b));
    ret.waitForFinished();
    foreach (int i, data) {
        qDebug() << i;
    }

    return 0;
}

上記のコード自体は、マルチスレッドにするのもバカバカしい軽い処理のコードで、単に0から99までのデータが設定されたQVectorを data[i] = 12 * data[i] + 3 する計算を並列実行させるコードです。まぁ、マルチスレッドにする意味のあるようなコードを書くと複雑になるので、簡単な例題にしておきます。

実行結果は、以下のようになります(一部省略)。

0
1
2
3
4
:
:
95
96
97
98
99
 --- Calc  12  * data[n] +  3 
3
15
27
39
51
:
:
1143
1155
1167
1179
1191

QtConcurrent::mapped

  • QFuture mapped(const Sequence & sequence, MapFunction function)
  • QFuture mapped(ConstIterator begin, ConstIterator end, MapFunction function)

続いて、mappedですが、こちらは、mapと良く似ているものの、渡すコンテナ内のデータに直接手を加えることはできません。QFutureクラスを経由して、演算後の結果を取得する形になります。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

struct Calc
{
    typedef int result_type;

    Calc(const int& a, const int&  b) :a_(a), b_(b) {}
    result_type operator()(const int& x) { return a_*x+b_;}

    int a_;
    int b_;
};

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    int a = 12;
    int b = 3;
    qDebug() << " --- Calc " << a << " * data[n] + " << b;
    QFuture<int> ret = QtConcurrent::mapped(data,Calc(a,b));
    ret.waitForFinished();

    foreach (int i, ret.results()) {
        qDebug() << i;
    }

    return 0;
}

微妙に違う箇所を探せ的ないぢわる問題に見えてしまうかもしれませんが、Calc::operator()の実装と、実行後の結果の取り出し方が異なります。

実行結果自体は、map時と同じ結果になります。

map関数が、コンテナの値を更新するのに対し、mappedは元になるシーケンスデータに手を加えないため dataは元データを保持しています。

QtConcurrent::mappedReduced

  • QFuture mappedReduced(const Sequence & sequence, MapFunction mapFunction, ReduceFunction reduceFunction, QtConcurrent::ReduceOptions reduceOptions)
  • QFuture mappedReduced(ConstIterator begin, ConstIterator end, MapFunction mapFunction, ReduceFunction reduceFunction, QtConcurrent::ReduceOptions reduceOptions)

mappedReducedは、mappedした結果をreduceFunctionを呼び出して一つに集計するクラスです。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

struct Calc
{
    typedef int result_type;

    Calc(const int& a, const int&  b) :a_(a), b_(b) {}
    result_type operator()(const int& x) { return a_*x+b_;}

    int a_;
    int b_;
};

void sum(int& total, const int& val)
{
    total += val;
}

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    int a = 12;
    int b = 3;
    qDebug() << " --- sum( " << a << " * data[n] + " << b << ")";
    QFuture<int> ret = QtConcurrent::mappedReduced(data,Calc(a,b), sum, QtConcurrent::SequentialReduce);
    ret.waitForFinished();

    foreach (int i, ret.results()) {
        qDebug() << i;
    }

    return 0;
}

実行結果は、以下のようになります(一部省略)。

0
1
2
3
4
:
:
95
96
97
98
99
 --- sum(  12  * data[n] +  3 )
59700

QtConcurrent::filter

filter関数は、コンテナに対してテストを行い、判定Functionにitemを渡して結果がfalseの場合、コンテナからitemを削除するという判定を並列に実行します。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

bool Filter(const int& x)
{
    return x < 10;
}

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    qDebug() << " --- remove if data[n] < 10";
    QFuture<void> ret = QtConcurrent::filter(data,Filter);
    ret.waitForFinished();

    foreach (int i, data) {
        qDebug() << i;
    }

    return 0;
}

実行結果は、以下のようになります(一部省略)。

0
1
2
3
4
:
:
95
96
97
98
99
 --- remove if data[n] < 10
0
1
2
3
4
5
6
7
8
9

QtConcurrent::filtered

mapの時の命名規則からおよそ想像がつくかもしれませんが、元のコンテナに手を加えずにfilter後のデータをQFutureを通じて返します。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

bool Filter(const int& x)
{
    return x < 10;
}

void sum(int& total, const int& val)
{
    total += val;
}

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    qDebug() << " --- sum (remove if data[n] < 10)";
    QFuture<int> ret = QtConcurrent::filtered(data,Filter);
    ret.waitForFinished();

    foreach (int i, ret.results()) {
        qDebug() << i;
    }

    return 0;
}

この実行結果は、filter関数と同じになります。なおdata領域は元のままとなります。

QtConcurrent::filteredReduced

となれば、このクラスは当然、filtered結果に集計を行う事になりますね。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

bool Filter(const int& x)
{
    return x < 10;
}

void sum(int& total, const int& val)
{
    total += val;
}

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    qDebug() << " --- sum (remove if data[n] < 10)";
    QFuture<int> ret = QtConcurrent::filteredReduced(data,Filter, sum, QtConcurrent::UnorderedReduce);
    ret.waitForFinished();

    foreach (int i, ret.results()) {
        qDebug() << i;
    }

    return 0;
}

実行結果は、以下のようになります(一部省略)。

0
1
2
3
4
:
:
95
96
97
98
99
 --- sum (remove if data[n] < 10)
45

QtConcurrent::run

さて、ここまではコンテナに対してスレッドを起こして定型処理を行う関数群でしたが、さらに特定の引数を渡す関数をスレッドを起こして実行するのがQtConcurrent::runでの実行となります。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QThread>
#include <QDebug>

void hello(const QString& name)
{
    qDebug() << "Hello" << name << "from" << QThread::currentThread();
    QThread::sleep(1);
}

int main(int argc, char **argv)
{
    QCoreApplication app(argc, argv);
    QFuture<void> f1 = QtConcurrent::run(hello, QString("Azusa"));
    QFuture<void> f2 = QtConcurrent::run(hello, QString("Bibi"));
    QFuture<void> f3 = QtConcurrent::run(hello, QString("Clarisse"));
    f1.waitForFinished();
    f2.waitForFinished();
    f3.waitForFinished();
    return 0;
}

今回は、処理に時間のかからない呼び出しの羅列ですが、実行すると以下のような結果になります。

Hello "Azusa" from QThread(0x102201140, name = "Thread (pooled)")
Hello "Bibi" from QThread(0x1022019f0, name = "Thread (pooled)")
Hello "Clarisse" from QThread(0x102201ef0, name = "Thread (pooled)")

異なるスレッドで動作しているのが見て取れます。

QFuture

このクラスは、QtConcurrentの関数から起動されるスレッドの実行の制御及び結果の取得へアクセスするためのクラスです。ここまでの例で、結果へのアクセスや待ち合わせが例示されていますが、他にもcancel()ですとかpause()ですとかresume()といった制御が用意されています。

QFutureIterator

QFutureのイテレーションのために、resultsを使わず、Java-Styleのイテレータを作成できます。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QVector>
#include <QDebug>

struct Increment
{
    Increment() : i_(0) {}
    int operator()() { return i_++; }

    int i_;
};

bool Filter(const int& x)
{
    return x < 10;
}

void sum(int& total, const int& val)
{
    total += val;
}

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QVector<int> data(100);

    // vectorを0-99で設定する
    std::generate(data.begin(), data.end(), Increment());
    foreach (int i, data) {
        qDebug() << i;
    }

    qDebug() << " --- sum (remove if data[n] < 10)";
    QFuture<int> ret = QtConcurrent::filtered(data,Filter);
    ret.waitForFinished();

    QFutureIterator<int> itr(ret);
    while (itr.hasNext()) {
        qDebug() << itr.next();
    }
    return 0;
}

QFutureWatcher

このクラスは、QFutureをQtのシグナルとスロットの機構を使ったメッセージで制御するためのクラスです。Qtに興味のある人はシグナルとスロットを使いこなしているものと思うので割愛します。

QFutureSynchronizer

これは、複数のQFutureをまとめて処理するクラスです。

#include <QCoreApplication>
#include <QtConcurrent>
#include <QThread>
#include <QDebug>

void hello(const QString& name)
{
    qDebug() << "Hello" << name << "from" << QThread::currentThread();
    QThread::sleep(1);
}

int main(int argc, char **argv)
{
    QCoreApplication app(argc, argv);
    QFuture<void> f1 = QtConcurrent::run(hello, QString("Azusa"));
    QFuture<void> f2 = QtConcurrent::run(hello, QString("Bibi"));
    QFuture<void> f3 = QtConcurrent::run(hello, QString("Clarisse"));
    QFutureSynchronizer<void> synchronizer;
    synchronizer.setFuture(f1);
    synchronizer.setFuture(f2);
    synchronizer.setFuture(f3);
    synchronizer.waitForFinished();
    return 0;
}

Blockingクラス

ここまでに上げた他に、以下のようなメソッドが用意されています。
- blockingFilter
- blockingFiltered
- blockingFilteredReduced
- blockingMap
- blockingMapped
- blockingMappedReduced

これらの関数は、例題に上げたQFutured.waitForFinished()まで内部で実施してくれる関数です。使い方や機能はこれまでの説明と殆ど差がないので割愛します。

まとめ

今回は、ざっくりとQtConcurrentを使ってスレッドを起動して、何らかの処理をさせる簡素な例を取り上げてみました。実際は、マルチスレッドを使うのはもっと負荷の高い処理等の場合が多いかと思います。

QtConcurrentを使えば、そういう負荷の高い処理について、一定の条件に当てはまるケースについてではありますが、難しい事を考えずに、マルチスレッドで並列処理を簡単に実行する事ができます。

もっと複雑な例としては、Qtのexampleには、ファイルのリストを渡して、それらファイル内のword数をカウントアップする処理をマルチスレッド化するものなどが用意されていますので、そちらを参照してみて下さい。

より細かな制御が必要なマルチスレッドが必要な方向けに、明日はQThreadについて少し記事を書きたいと思います。