Help us understand the problem. What is going on with this article?

普通のLockと、効率の良いLockのパフォーマンス測定。

More than 3 years have passed since last update.

はじめに

マルチスレッドからデータを扱う時、どのようなlockをしていますか?

lockする時間が少しでも長いだけで
プログラム全体の速度の低下を招くため、lockは非常に難しいですね。

http://qiita.com/episteme/items/9e6e17512ecadb2ccb40
面白い記事を拝見し、効率の良いLockでデータを扱うと
どれぐらいパフォーマンスに違いがあるのかなと気になり

それを検証してみました。

若干コードが多めです(。。

この記事での普通のLockとは、以下のコードの事を指しています。
純粋にlockが取れるまでblockするコードですね。
std::lock_guard<decltype(mtx)> l(mtx);

結果だけ先に知りたい方は↓に行ってしまって下さい!

測定の概要

測定には以下のルールで行います。

  • 1.concurrent_queueを用意し、測定前にthread-safeなのか検証。
  • 2.concurrent_queueに対して、複数スレッドからデータのpush/popを繰り返し、その回数をスコアとして扱う。
  • 3.測定時間は5秒として、3回行う。

concurrent_queueの実装

今回は普通のlockと、効率の良いlockを比較するので
lockに関して抽象クラスを用意します。

ILock
struct ILock {
    virtual bool lock() = 0;
    virtual void unlock() = 0;
};

スコープを抜けたら勝手にunlockされて欲しいですね。
なので以下のhelperも追加しておきます。

releaser
struct releaser {
    using release_func = std::function <void()>;
    releaser(release_func f) : f_(std::move(f)) {}
    ~releaser() {
        if (f_) {
            f_();
        }
    }

    operator bool() const {
        return (f_) ? true : false;
    }
    release_func f_;
};

次にconcurrent_queueの実装です。

concurrent_queue
template <typename T, typename L>
class concurrent_queue {
    static_assert(std::is_base_of <flowTumn::ILock, L>::value, "L must be a descendant of ILock.");

public:
    enum struct Result {
        Success,
        Failed,
        Empty,
        Timeout,
    };

    // lockをRAIIに。
    struct locker {
        releaser lock() {
            if (l_.lock()) {
                return{ [this]{ l_.unlock(); } };
            }
            else {
                return{ nullptr };
            }
        }
        L l_;
    } lock_controller;

    concurrent_queue() = default;

    //queueにデータを詰め込み。
    Result push(const T& v) {
        for (;;) {
            if (auto l = lock_controller.lock()) {
                queue_.push(v);
                return Result::Success;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        return Result::Failed;
    }

    //fから取り出したデータをqueueに詰め込み。
    Result push(std::function <T()> f) {
        for (;;) {
            if (auto l = lock_controller.lock()) {
                queue_.push(f());
                return Result::Success;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        return Result::Failed;
    }

    //queueからデータを取得。
    std::pair <Result, T> pop() {
        for (;;) {
            if (auto l = lock_controller.lock()) {
                if (0 < queue_.size()) {
                    auto r = queue_.front();
                    queue_.pop();
                    return{ Result::Success, r };
                } else {
                    return{ Result::Empty, {} };
                }
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        return{ Result::Failed, {} };
    }

private:
    std::queue <T> queue_;
};

以上で今回に使用するconcurrent_queueの完成です。

concurrent_queueは型パラメーターでILockを渡してもらい
それに対してlockを行い、成功が返されたら
データをpush/popするといった作りになっています。

このconcurrent_queueを用いてパフォーマンスを測定してみます。

Lockの方式

さて今回のメインですね。
ILock#lockの中で純粋にmutexをlockしてしまっても構いませんし
別のやり方でlockをしても良いです。

とにかくlock出来た事だけを返せば良いので
以下の3パターンのILockを実装します。

LockImplement
//普通のlock。mutexをlockできるまでblockします。
struct lockNormal : public ILock {
    bool lock() override {
        mutex_.lock();
        return true;
    }
    void unlock() override {
        mutex_.unlock();
    }
    std::mutex mutex_;
};

//blockせずにlockを獲得。
struct lockTry : public ILock {
    bool lock() override {
        return mtx_.try_lock();
    }
    void unlock() override{
        mtx_.unlock();
    }
    std::mutex mtx_;
};

//atomic<bool>を用いてtrueに変更できたら成功。
struct lockWeak : public ILock {
    bool lock() override {
        bool expected = false;
        bool desired = true;
        return flg_.compare_exchange_weak(expected, desired);
    }
    void unlock() override {
        flg_ = false;
    }
    std::atomic <bool> flg_{ false };
};

以上を持って、concurrent_queueは以下の用にして使う事が出来る様になりました。

auto q = concurrent_queue <int64_t, lockNormal>{};

後は3パターンのlockを使って測定するだけです。

測定の実装

測定前に、測定の実装になります。
コードばかりで申し訳ありません(..

queueがきちんとthread-safeなのかを検証した後にスコアを測ります。

perfomance
template <typename T, typename Lock, typename F>
uint64_t queue_tester(
    F gen,
    uint32_t createGenCount,
    std::function <void(T)> notify,
    uint32_t createNotifyCount,
    std::chrono::milliseconds timeout) {
    using queue_type = concurrent_queue <int64_t, Lock>;

    auto queue = queue_type{};
    auto counter = std::atomic <uint64_t> {0};
    std::atomic <bool> g{ false };
    std::vector <std::thread> thr;

    auto push = [&gen, &queue, &counter, &g]() {
        while (!g) {}
        while (g) {
            queue.push(gen());
            ++counter;
        }
    };

    auto pop = [&notify, &g, &counter, &queue]() {
        while (!g) {}
        while (g) {
            auto r = queue.pop();
            if (r.first == queue_type::Result::Success) {
                notify(r.second);
            }
            ++counter;
        }
    };

    std::generate_n(std::back_inserter(thr), createGenCount, [push] { return std::thread{ push }; });
    std::generate_n(std::back_inserter(thr), createNotifyCount, [pop] { return std::thread{ pop }; });

    g = !g;
    std::this_thread::sleep_for(timeout);
    g = !g;

    for (auto& elem : thr) {
        elem.join();
    }

    return counter.load();
}

//queueがthread safeなのかを検証します。
template <typename L>
bool checkQueue() {
    auto v1 = std::atomic <int64_t> {0};
    auto v2 = std::atomic <int64_t> {0};
    auto result = std::atomic <bool> {true};

    //cpuのコア数分詰める処理を追加し、popしたデータを検証するのは一つが担う。
    queue_tester <int64_t, L>(
        [&v1] {
#if 1
            //検証するデータを生成するfunctionを返す。(この処理はlockされた後に呼ばれる。)
            return std::function <int64_t()> {
                [&v1] {
                    ++v1;
                    return v1.load();
                }
            };
#else
            // 詰めるスレッドが複数なら、詰められる値はバラバラになってしまう。
            ++v1;
            return v1.load();
#endif
        },
        std::thread::hardware_concurrency(),
        [&v2, &result](int64_t n) {
            //queueから取得したデータの通知を受け検証する。
            ++v2;
            if (v2.load() != n) {
                result = false;
                assert(false && "queue is bug.");
            }
        },
        1,
        std::chrono::seconds(5)
    );

    return result.load();
}

//スコアを計測。
template <typename Lock>
uint64_t score() {
    const auto core = std::max <uint32_t> (std::thread::hardware_concurrency() >> 1, 1);
    return queue_tester <uint64_t, Lock>(
        [] {
            //好きな値で詰める。重要なのは呼ばれた回数。
            return 1234;
        },
        core,
        [](uint64_t) {
            //popしたデータを受けても何もしない。重要なのは呼ばれた回数。
        },
        core,
        std::chrono::seconds(5)
    );
}

//平均を算出
template <typename F, typename FN>
auto avg(F f, uint32_t count, FN notify) -> decltype(f()) {
    auto r = decltype(f()){};
    for (auto i = UINT32_C(0); i < count; ++i) {
        auto rr = f();
        notify(rr);
        r += rr;
    }
    return r / count;
}

ふう……完成です。

最後の一振りは
検証と結果の出力をマクロにしておきます。

score
//キューを検証した後にスコアを計測。
#define SCORE(lock, count) \
    std::cout << "TargetLock => " << ""#lock""  << std::endl; \
    if (checkQueue<lock>()) { \
        std::cout << "\t" << "checkQueue: Success." << std::endl; \
        std::cout << "\t" << "Score(Avg): " << avg(score<lock>, count, [](decltype(score<lock>()) score) { std::cout << "\t\t" << "Score: " << score << std::endl; }) << std::endl; \
    } else {  \
        std::cout << "\t" << "checkQueue: Failed." << std::endl; \
    } \
    std::cout << std::endl;

測定 & 結果

さてさて、実際にスコアを計測してみましょう。

test
int main() {
    const auto TEST_COUNT = 3;

    //スコアを計測。lockの実装が不十分だと検証で弾かれる。
    SCORE(lockNormal, TEST_COUNT);
    SCORE(lockTry, TEST_COUNT);
    SCORE(lockWeak, TEST_COUNT);
    return 0;
}

私の開発マシンでは以下のようになりました
(CPU: i7-2600k、Win7、VC2013 Release Build)

1: Test timeout computed to be: 9.99988e+006
1: TargetLock => lockNormal
1:  checkQueue: Success.
1:      Score: 11758106
1:      Score: 9432942
1:      Score: 13260581
1:  Score(Avg): 11483876
1: 
1: TargetLock => lockTry
1:  checkQueue: Success.
1:      Score: 48652518
1:      Score: 49120617
1:      Score: 48946418
1:  Score(Avg): 48906517
1: 
1: TargetLock => lockWeak
1:  checkQueue: Success.
1:      Score: 80397898
1:      Score: 80919271
1:      Score: 81028361
1:  Score(Avg): 80781843
1: 
1/1 Test #1: test .............................   Passed   64.01 sec

おお、凄い結果になりました。。

折角なのでグラフにもしてみます。
score.PNG

atomic <bool>恐るべし…

調べるとCPU命令で比較と値の交換を行っているんですね。(CAS)
http://qiita.com/kmikmy/items/0a7324cc50fd40470407
http://cpprefjp.github.io/reference/atomic/atomic/compare_exchange_weak.html

ソースコード

今回の記事に作成したソース一式です。
https://github.com/flowtumn/lock_performance

travis-ciでの測定結果(linux/osx)

2015/12/18 travis-ciで計測してみました。

linux.PNG

osx.PNG

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away