スレッドの同期について学ぼう(その1)

  • 4
    いいね
  • 0
    コメント

はじめに

この日記は、Qt Advent Calendar 2016の9日目の日記です。

昨日は@nekomatuさんによる「QtCreatorの翻訳をレビューした話」でした。Qt Creatorは便利なIDEでメニューなども日本語化されていますが、これらは朝木さんをはじめ有志のみなさんのおかげです。ありがたいことですね。近々 Qt Creator 4.2もリリースされることでしょうし、翻訳プロジェクトが募集されれば是非お手伝いしたいところです。

さて、Qtのカレンダーも3年目ですが、1年目、2年目にスレッド関連を2件ほど書かせていただきました。

せっかくですので、今年も1件スレッド関連をということで。

Single Threaded Executionパターン

結城浩先生の『Java言語で学ぶデザインパターン入門マルチスレッド編』で「この橋をわたれるのは一人だけ」として紹介されているのが、Single Thread Executionです。

マルチスレッドは、並列に動作を行うための機構ですが、並列動作する複数のスレッドが、同時に同じリソースに変更を加えるとその結果は未定義です。そこで、同時に一つのスレッドだけが実行を行うようにするというパターンが、Single Thread Executionです。

スレッドセーフではない例

まず、門(Gate)を作り、3人の人物の通行記録をとります。

・井出氏(石川県)
・三木氏(三重県)
・福田氏(福岡県)

偶然、名前のイニシャルと住所のイニシャルが一緒です。

プログラム

gate.h
#ifndef GATE_H
#define GATE_H

#include <QString>

class Gate
{
public:
    Gate();
    void pass(const QString& name, const QString& address);

private:
    void check();

    int cnt_;
    QString name_;
    QString address_;
};

#endif // GATE_H
gate.cpp
#include "gate.h"
#include <QDebug>

Gate::Gate()
    : cnt_(0)
{
}

void Gate::pass(const QString &name, const QString &address)
{
    ++cnt_;
    name_ = name;
    address_ = address;
    check();
}

void Gate::check()
{
    if (name_.at(0) != address_.at(0)) {
        qWarning() << "****** BROKEN ****** No." << cnt_ << " : " << name_ << "," << address_;
    }
}

ゲートを渡った後、一応名前とアドレスのイニシャルが一致しているか検査しておきましょう。

userthread.h
#ifndef USERTHREAD_H
#define USERTHREAD_H

#include <QThread>
class Gate;

class UserThread : public QThread
{
    Q_OBJECT
public:
    explicit UserThread(Gate* gate, const QString& name, const QString& addr, QObject *parent = 0);

protected:
    void run();

private:
    Gate*   gate_;
    QString name_;
    QString address_;
};

#endif // USERTHREAD_H

userthread.cpp
#include "userthread.h"
#include "gate.h"

UserThread::UserThread(Gate *gate, const QString &name, const QString &addr, QObject *parent)
    : QThread(parent), gate_(gate), name_(name), address_(addr)
{
}

void UserThread::run()
{
    forever {
        gate_->pass(name_, address_);
    }
}

main.cpp
#include <QCoreApplication>
#include "gate.h"
#include "userthread.h"

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Gate* gate = new Gate;
    UserThread alice(gate, "Ide", "Ishikawa");
    UserThread bobby(gate, "Miki", "Mie");
    UserThread chris(gate, "Fukuda", "Fukuoka");
    alice.start();
    bobby.start();
    chris.start();
    alice.wait();
    bobby.wait();
    chris.wait();
    delete gate;
    return 0;
}

実行結果

上記プログラムの実行結果は、散々です。

****** BROKEN ****** No. 792  :  "Fukuda" , "Fukuoka"
****** BROKEN ****** No. 792  :  "Fukuda" , "Fukuoka"
gate_nothreadsafe(7706,0x7000006b5000) malloc: *** error for object 0x7f9e9fa06f60: pointer being freed was not allocated
*** set a breakpoint in malloc_error_break to debug
****** BROKEN ****** No. 4269  :  "Miki" , "Mie"
****** BROKEN ****** No. 4522  :  "Miki" , "Mie"
****** BROKEN ****** No. 4658  :  "Miki" , "Mie"
gate_nothreadsafe(7706,0x700000632000) malloc: *** error for object 0x7f9e9fa06e70: pointer being freed was not allocated
*** set a breakpoint in malloc_error_break to debug
gate_nothreadsafe(7706,0x7000005af000) malloc: *** error for object 0x7f9e9fa06e70: pointer being freed was not allocated
*** set a breakpoint in malloc_error_break to debug
<リターン>キーを押してウィンドウを閉じてください...

792番目で壊れていると出ているのに、イニシャルが一致しています。その後、mallocでエラーが出たり、挙句にプログラムがクラッシュしたりしました。

なぜこんなことになるのでしょうか。

解説

このプログラムは、3つのスレッドが動作しています。この3つのスレッドは並列に動作しながら、同じ1つのGateクラスのインスタンスにアクセスしています。
処理的には、カウンタをカウントアップしてnameとaddressを記録しているだけですが、

1)cnt_を加算する
2)name_をnameに置き換える
3)address_をaddressに置き換える
4)name_の一文字目を取り出し
5)address_の一文字目を取り出し
6)比較
7)不一致なら画面に表示

と大雑把にわけても上記のような作業が3人分並列作業されます。一連の処理が並列に割り込まれながら動作するとして

1-1) cnt_(0) + 1
1-2) name_ = "Ide"
1-3) address_ = "Ishikawa"
2-1) cnt_(1) + 1
2-2) name_ = "Miki"
3-1) cnt_(2) + 1
1-4) name_[0] == 'M'
1-5) address_[0] = 'I'
1-6) 比較 (不一致)
2-3) address_ = "Mie"
1-7) warning "Miki","Mie"

ということになると、Warningとして正しく見える情報が表示されたりもするわけです。

おまけに、QString型は内部で文字列用のヒープを確保、コピー、解放などをしています。何の対策もせず並列に割り込んでしまったらメモリ周りで深刻な破綻を起こすこともあるでしょう。

このように、共有のリソースに別々に書き込むため、並列に実行すると破綻する部分をクリティカルセッションと呼びます。

クリティカルセッションではこのような破壊を防ぐため、スレッド間で排他制御を行って、一つのレッドだけが実行されるようにする必要があります。このような時に利用するのが、QMutexです。

gate.h
#ifndef GATE_H
#define GATE_H

#include <QString>
#include <QMutex>

class Gate
{
public:
    Gate();
    void pass(const QString& name, const QString& address);

private:
    void check();

    int cnt_;
    QString name_;
    QString address_;
    QMutex mutex_;
};

#endif // GATE_H

QMutex型のメンバを一つ用意しました。

gate.cpp
void Gate::pass(const QString &name, const QString &address)
{
    mutex_.lock();
    ++cnt_;
    name_ = name;
    address_ = address;
    check();
    mutex_.unlock();
}

あとは、Gate通過記録前にlockをかけ、通過記録・検証後にunlockするだけです。

こうすることで、誰か一人がlockして値を読み書きしている最中、他の人はlock()時点で待ち合わせを行い、unlockされるのを待ちます。このような排他制御のために使うのが、QMutexです。

QMutexLockerを使おう

ところで、十分に安定してきたのでcheckは10回に1回行われればいいとしましょう。

gate.cpp
void Gate::pass(const QString &name, const QString &address)
{
    mutex_.lock();
    ++cnt_;
    name_ = name;
    address_ = address;
    if (cnt_ % 10) 
        return;
    check();
    mutex_.unlock();
}

何が起きるかおわかりでしょうか。上記はunlock前にreturnしてしまっています。このまま抜けてしまうと、lockしたままとなるためこの後のlockはすべて待ち状態になります。
このため、プログラムは永遠に止まったままとなります。このようなバグを防ぐにはQMutexLockerが便利です。

gate.cpp
#include "gate.h"
#include <QMutexLocker>
#include <QDebug>

Gate::Gate()
    : cnt_(0)
{
}

void Gate::pass(const QString &name, const QString &address)
{
    QMutexLocker lock(&mutex_);
    ++cnt_;
    name_ = name;
    address_ = address;
    if (cnt_ % 10)
        return;
    check();
}

void Gate::check()
{
    if (name_.at(0) != address_.at(0)) {
        qWarning() << "****** BROKEN ****** No." << cnt_ << " : " << name_ << "," << address_;
    }
}

こうすると、QMutexLockerはコンストラクト時にロックを行い、デストラクト時にunlockしてくれるのです。途中で間違ってreturnや例外のthrowで抜けてしまっても、mutexをロックしたままになることが防げます。

Guarded Suspensionパターン

続いては、結城先生のデザインパターン本で「用意ができるまで、待っててね」と紹介されているGuarded Suspensionパターンです。

ファミレスで、注文を決めたらボタンを押してお知らせ下さいと言われますよね。ボタンを押すと、手の空いた店員さんが注文を伺いに来てくれますが、手が空かない場合に「待たせる」という挙動をスレッドに実現させるのが、このパターンです。

サンプルプログラム

RequestQueueクラス

requestqueue.h
#ifndef REQUESTQUEUE_H
#define REQUESTQUEUE_H

#include <QString>
#include <QQueue>
#include <QMutex>
#include <QWaitCondition>

class RequestQueue
{
public:
    struct Request {
        Request(const QString& name) : name_(name) {}
        QString name_;
    };

    explicit RequestQueue();
    Request getRequest();
    void putRequest(const Request& request);

private:
    QMutex mutex_;
    QWaitCondition cond_;
    QQueue<Request> queue_;
};

#endif // REQUESTQUEUE_H
requestqueue.cpp
#include "requestqueue.h"
#include <QMutexLocker>

RequestQueue::RequestQueue()
{
}

RequestQueue::Request RequestQueue::getRequest()
{
    QMutexLocker lock(&mutex_);
    while (queue_.isEmpty()) {
        cond_.wait(&mutex_);
    }
    return queue_.dequeue();
}

void RequestQueue::putRequest(const RequestQueue::Request &request)
{
    QMutexLocker lock(&mutex_);
    queue_.enqueue(request);
    cond_.wakeAll();
}

このクラスは、リクエストの処理を仲介するクラスです。先ほどの例でいうとGateクラスと同じ立場にいますが、今回はputされた情報を保持して、getされたら返すためキューを有しています。

putするよりも多くgetされると、当然キューがカラになるケースがあるため、待ち状態が発生します。

このような場合に使うのが、QWaitConditionです。putRequestは、最初にMutexをlockして排他制御し、queueの空き状態を確認します。もしカラだった場合、QWaitCondition::waitにmutex_を渡しています。

QWaitConditionにQMutexを渡すと、mutexを一時的にunlockしてconditionの成立を待ち合わせ、conditionの成立を通知されるとmutexを取得します。

putRequestの側は、mutexをロックしてqueueにRequestを入れた後、QWaitConditionのwakeAllを実施しています、これは、QWaitConditionでwaitしている全てのスレッドに対して、条件成立を通知して起こすための処理で、この時点で全てのスレッドがMutexのlockをしようとして、ロックできたスレッドから動作を再開します。

clienthread.h
#ifndef CLIENTTHREAD_H
#define CLIENTTHREAD_H

#include <QThread>
class RequestQueue;

class ClientThread : public QThread
{
    Q_OBJECT
public:
    explicit ClientThread(RequestQueue* queue, const QString& name,
                          QObject *parent = 0);

protected:
    void run();

private:
    RequestQueue* queue_;
    QString name_;
};

#endif // CLIENTTHREAD_H

clientthread.cpp
#include "clientthread.h"
#include "requestqueue.h"
#include <QDebug>

ClientThread::ClientThread(RequestQueue *queue, const QString &name, QObject *parent)
    : QThread(parent), queue_(queue), name_(name)
{
}

void ClientThread::run()
{
    qsrand(3141592);
    for (int i=0; i<100000; ++i) {
        RequestQueue::Request request(tr("No.%1").arg(i));
        qDebug() << name_ << " request " << request.name_;
        queue_->putRequest(request);
        msleep(qrand() % 1000);
    }
}
serverthread.h
#ifndef SERVERTHREAD_H
#define SERVERTHREAD_H

#include <QThread>
class RequestQueue;

class ServerThread : public QThread
{
    Q_OBJECT
public:
    ServerThread(RequestQueue* queue, const QString& name, QObject* parent=0);

protected:
    void run();

private:
    RequestQueue* queue_;
    QString name_;
};

#endif // SERVERTHREAD_H
serverthread.cpp
#include "serverthread.h"
#include "requestqueue.h"
#include <QDebug>

ServerThread::ServerThread(RequestQueue *queue, const QString &name, QObject *parent)
    : QThread(parent), queue_(queue), name_(name)
{
}

void ServerThread::run()
{
    qsrand(6535897);
    for (int i=0; i<10000; ++i) {
        RequestQueue::Request request = queue_->getRequest();
        qDebug() << name_ << " handles " << request.name_;
        msleep(qrand()%1000);
    }
}
main.cpp
#include <QCoreApplication>
#include "clientthread.h"
#include "serverthread.h"
#include "requestqueue.h"

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    RequestQueue* queue = new RequestQueue;
    ClientThread client(queue, "Alice");
    ServerThread server(queue, "Bobby");
    client.start();
    server.start();
    client.wait();
    server.wait();
    delete queue;
    return 0;
}

あとは、このRequestQueueにputするclientとgetするserverを用意して両者でやりとりさせます。この両者は、適度に動作がばらけるようにランダムにsleepするように実装しています。

実行結果

"Alice"  request  "No.0"
"Bobby"  handles  "No.0"
"Alice"  request  "No.1"
"Alice"  request  "No.2" // Clientが先にrequest2まで入れる
"Bobby"  handles  "No.1"
"Alice"  request  "No.3"
"Bobby"  handles  "No.2"
"Alice"  request  "No.4"
"Bobby"  handles  "No.3"
"Alice"  request  "No.5"
"Bobby"  handles  "No.4"
"Alice"  request  "No.6"
"Bobby"  handles  "No.5"
"Alice"  request  "No.7"
"Bobby"  handles  "No.6"
"Alice"  request  "No.8"
"Bobby"  handles  "No.7"
"Alice"  request  "No.9"
"Alice"  request  "No.10"
"Alice"  request  "No.11"
"Bobby"  handles  "No.8"
"Bobby"  handles  "No.9"
"Alice"  request  "No.12"
"Bobby"  handles  "No.10"
"Bobby"  handles  "No.11"
"Alice"  request  "No.13"
"Bobby"  handles  "No.12"
"Alice"  request  "No.14"
"Alice"  request  "No.15"
"Alice"  request  "No.16" // Clientが長めのsleepに突入
"Bobby"  handles  "No.13"
"Bobby"  handles  "No.14"
"Bobby"  handles  "No.15"
"Bobby"  handles  "No.16" // Serverが追いついて空になってwait
"Alice"  request  "No.17" // Clientが追加してwakeAll()
"Bobby"  handles  "No.17" // Serverが処理再開

Producer-Consumerパターン

さて、Guarded Suspensionの例では、Clientは永遠とputし続けて、Serverはemptyなら待つという実装でした。もし、Server側がミリ秒単位ではなく秒単位で待ち合わせており、Clientが回数制限なく無制限にputしてたらどうなるでしょうか。

RequestQueueが保持できる限界に達して破綻するか、メモリサイズをとりすぎて破綻するか、条件にもよりますが、put側にも相応のガードを入れる必要があります。

そういうことを考慮し、生産者(Producer)、消費者(Consumer)が、それぞれGuarded Suspensionを行うパターンをProducer-Consumerパターンと言います。

結城先生の本にあやかって、書籍のサンプルをC++化するのも良いのですが、ちょっと執筆活動が滞ってますし、Qtには豊富なサンプルが含まれています。そこで、Qtの「ようこそ」からサンプルを開きましょう。

WaitConditionを使う例

スクリーンショット 2016-12-09 12.17.56.png

Wait Condition Exampleです。

waitconditions.cpp
/*
**
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the examples of the Qt Toolkit.
**
** イカ略)BSDライセンスで提供されています
*/

#include <QtCore>

#include <stdio.h>
#include <stdlib.h>

const int DataSize = 100000;

const int BufferSize = 8192;
char buffer[BufferSize];

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;
QMutex mutex;
int numUsedBytes = 0;

class Producer : public QThread
{
public:
    Producer(QObject *parent = NULL) : QThread(parent)
    {
    }

    void run() Q_DECL_OVERRIDE
    {
        qsrand(QTime(0,0,0).secsTo(QTime::currentTime()));

        for (int i = 0; i < DataSize; ++i) {
            mutex.lock();
            if (numUsedBytes == BufferSize)
                bufferNotFull.wait(&mutex);
            mutex.unlock();

            buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4];

            mutex.lock();
            ++numUsedBytes;
            bufferNotEmpty.wakeAll();
            mutex.unlock();
        }
    }
};

class Consumer : public QThread
{
    Q_OBJECT
public:
    Consumer(QObject *parent = NULL) : QThread(parent)
    {
    }

    void run() Q_DECL_OVERRIDE
    {
        for (int i = 0; i < DataSize; ++i) {
            mutex.lock();
            if (numUsedBytes == 0)
                bufferNotEmpty.wait(&mutex);
            mutex.unlock();

            fprintf(stderr, "%c", buffer[i % BufferSize]);

            mutex.lock();
            --numUsedBytes;
            bufferNotFull.wakeAll();
            mutex.unlock();
        }
        fprintf(stderr, "\n");
    }

signals:
    void stringConsumed(const QString &text);
};

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

#include "waitconditions.moc"

最後に#include しているのは、wait conditions.cppにQObjectのコードを入れているためで、mocの生成するコードを取り込む必要があるからです。

このサンプルは、char型の8192byteのbufferをリングバッファとして使い、ProducerとConsumer間で100000byteのA/C/G/Tのうち任意の一文字を受けわたすというサンプルです。

2つのQWaitConditionと1つのQMutexを使い、バッファが空ならConsumerが待ち、バッファが一杯ならProducerが待つという実装となっています。

ところで、QMutexは排他制御ですから、バッファの使用状況を管理するタイミングでは片方のスレッドが必ず停止してしまいます。バッファが半分使用されているタイミングでは、ProducerもConsumerも動ける余地があるのに、待ち合わせを頻繁に行うことになります。そういう場合に有用なのがQSemaphoreです。

セマフォは手旗信号の意味で、スレッド間の同期機構の一つです。今度は、サンプルでSemaphoreを開いてみましょう。

スクリーンショット 2016-12-09 12.35.42.png

semaphores.cpp
/*
**
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the examples of the Qt Toolkit.
**
** イカ略)BSDライセンスで提供されています
*/
#include <QtCore>

#include <stdio.h>
#include <stdlib.h>

const int DataSize = 100000;

const int BufferSize = 8192;
char buffer[BufferSize];

QSemaphore freeBytes(BufferSize);
QSemaphore usedBytes;

class Producer : public QThread
{
public:
    void run() Q_DECL_OVERRIDE
    {
        qsrand(QTime(0,0,0).secsTo(QTime::currentTime()));
        for (int i = 0; i < DataSize; ++i) {
            freeBytes.acquire();
            buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4];
            usedBytes.release();
        }
    }
};

class Consumer : public QThread
{
    Q_OBJECT
public:
    void run() Q_DECL_OVERRIDE
    {
        for (int i = 0; i < DataSize; ++i) {
            usedBytes.acquire();
            fprintf(stderr, "%c", buffer[i % BufferSize]);
            freeBytes.release();
        }
        fprintf(stderr, "\n");
    }

signals:
    void stringConsumed(const QString &text);

protected:
    bool finish;
};

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

#include "semaphores.moc"

QWaitConditionを使っていた例では、QMutexを使いロックをしながら値を検査していました。そしてデータの追加後再度ロックをかけて、使用量を操作していましたが、そのあたりの操作をQSemaphoreが一回の操作で行ってくれています。

セマフォは、初期カウント値をもち、スレッド間で資源の利用可能なカウントを共有する同期機構です。

void QSemaphore::acquire(int n = 1) // 加算(V操作)
void QSemaphore::release(int n = 1) // 減算(P操作)

という処理により、値を加算・減算していき、カウントが0となると待ち合わせが発生します。プラットフォーム依存の実装にもよりますが、基本的にユーザーがQMutexを操作するよりは軽量に利用なバッファサイズを同期できるのです。

まとめ

本日の記事では
- QMutex
- QWaitCondition
- QSemaphore

について簡単に何に使うものなのかを説明しました。

結城 浩先生の『Java言語で学ぶ デザインパターン入門 [マルチスレッド編]』は、マルチスレッドプログラミングで利用する定石を記した名著です。

マルチスレッドが、いかに簡単に破綻するかはサンプルで示した通りですが、QMutexの排他機構等は、一つ間違えば全てのスレッドがロックの解放待ちを起こす「デッドロック」という恐ろしい不具合を引き起こします。

並列処理は、マルチコアCPUのご時世には処理速度を稼ぐ頼もしい味方ですが、一度不具合を起こせばデバッグの難しい恐ろしい敵にかわります。クラス分割以上に定石集の持つ意味は重要です。Java言語の機能を使った書籍で、単純にそのままC++に適応できるわけではありませんが、この手の定石というものについては知っておいて損はないかと思います。

マルチスレッドに関しては、Qtの提供するクラスと機能の他に、それをどう組み合わせてどのように使うのか、そういう学習をすることがとても重要な分野です。Qtのサンプルとヘルプを拾い読みながらぜひ色々実験してみて下さい。

さて、本当はこのほかに

  • Read-Write Lock (QReadWriteLock)
  • Thread-Specific Storage (QThreadStorage)

といった説明をしなくてはなのですが、師走で忘年会などに引っ張り出されてしまって、頼みのお昼休み時間も尽きてしまいそうです。というわけで、残りのクラスの説明と、書いたサンプルをもっと今風のQThreadの使い方に直すあたりを次の日記にかければいいなぁと思いつつ今日のAdvent Calendarはここまでとします。

ちょっと駆け足で書き付けたので、誤字、脱字、間違いなどを見つけたらご一報ください。

明日は、@sharkppさんによる「QMacCocoaViewContainerを使ってみる」です。お楽しみに。

この投稿は Qt Advent Calendar 20169日目の記事です。