はじめに
マルチスレッドからデータを扱う時、どのようなlockをしていますか?
lockする時間が少しでも長いだけで
プログラム全体の速度の低下を招くため、lockは非常に難しいですね。
http://qiita.com/episteme/items/9e6e17512ecadb2ccb40
面白い記事を拝見し、効率の良いLockでデータを扱うと
どれぐらいパフォーマンスに違いがあるのかなと気になり
それを検証してみました。
若干コードが多めです(。。
この記事での普通のLockとは、以下のコードの事を指しています。
純粋にlockが取れるまでblockするコードですね。
std::lock_guard<decltype(mtx)> l(mtx);
結果だけ先に知りたい方は↓に行ってしまって下さい!
測定の概要
測定には以下のルールで行います。
- 1.concurrent_queueを用意し、測定前にthread-safeなのか検証。
- 2.concurrent_queueに対して、複数スレッドからデータのpush/popを繰り返し、その回数をスコアとして扱う。
- 3.測定時間は5秒として、3回行う。
concurrent_queueの実装
今回は普通のlockと、効率の良いlockを比較するので
lockに関して抽象クラスを用意します。
struct ILock {
virtual bool lock() = 0;
virtual void unlock() = 0;
};
スコープを抜けたら勝手にunlockされて欲しいですね。
なので以下のhelperも追加しておきます。
struct releaser {
using release_func = std::function <void()>;
releaser(release_func f) : f_(std::move(f)) {}
~releaser() {
if (f_) {
f_();
}
}
operator bool() const {
return (f_) ? true : false;
}
release_func f_;
};
次にconcurrent_queueの実装です。
template <typename T, typename L>
class concurrent_queue {
static_assert(std::is_base_of <flowTumn::ILock, L>::value, "L must be a descendant of ILock.");
public:
enum struct Result {
Success,
Failed,
Empty,
Timeout,
};
// lockをRAIIに。
struct locker {
releaser lock() {
if (l_.lock()) {
return{ [this]{ l_.unlock(); } };
}
else {
return{ nullptr };
}
}
L l_;
} lock_controller;
concurrent_queue() = default;
//queueにデータを詰め込み。
Result push(const T& v) {
for (;;) {
if (auto l = lock_controller.lock()) {
queue_.push(v);
return Result::Success;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return Result::Failed;
}
//fから取り出したデータをqueueに詰め込み。
Result push(std::function <T()> f) {
for (;;) {
if (auto l = lock_controller.lock()) {
queue_.push(f());
return Result::Success;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return Result::Failed;
}
//queueからデータを取得。
std::pair <Result, T> pop() {
for (;;) {
if (auto l = lock_controller.lock()) {
if (0 < queue_.size()) {
auto r = queue_.front();
queue_.pop();
return{ Result::Success, r };
} else {
return{ Result::Empty, {} };
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return{ Result::Failed, {} };
}
private:
std::queue <T> queue_;
};
以上で今回に使用するconcurrent_queueの完成です。
concurrent_queueは型パラメーターでILockを渡してもらい
それに対してlockを行い、成功が返されたら
データをpush/popするといった作りになっています。
このconcurrent_queueを用いてパフォーマンスを測定してみます。
Lockの方式
さて今回のメインですね。
ILock#lockの中で純粋にmutexをlockしてしまっても構いませんし
別のやり方でlockをしても良いです。
とにかくlock出来た事だけを返せば良いので
以下の3パターンのILockを実装します。
//普通のlock。mutexをlockできるまでblockします。
struct lockNormal : public ILock {
bool lock() override {
mutex_.lock();
return true;
}
void unlock() override {
mutex_.unlock();
}
std::mutex mutex_;
};
//blockせずにlockを獲得。
struct lockTry : public ILock {
bool lock() override {
return mtx_.try_lock();
}
void unlock() override{
mtx_.unlock();
}
std::mutex mtx_;
};
//atomic<bool>を用いてtrueに変更できたら成功。
struct lockWeak : public ILock {
bool lock() override {
bool expected = false;
bool desired = true;
return flg_.compare_exchange_weak(expected, desired);
}
void unlock() override {
flg_ = false;
}
std::atomic <bool> flg_{ false };
};
以上を持って、concurrent_queueは以下の用にして使う事が出来る様になりました。
auto q = concurrent_queue <int64_t, lockNormal>{};
後は3パターンのlockを使って測定するだけです。
測定の実装
測定前に、測定の実装になります。
コードばかりで申し訳ありません(..
queueがきちんとthread-safeなのかを検証した後にスコアを測ります。
template <typename T, typename Lock, typename F>
uint64_t queue_tester(
F gen,
uint32_t createGenCount,
std::function <void(T)> notify,
uint32_t createNotifyCount,
std::chrono::milliseconds timeout) {
using queue_type = concurrent_queue <int64_t, Lock>;
auto queue = queue_type{};
auto counter = std::atomic <uint64_t> {0};
std::atomic <bool> g{ false };
std::vector <std::thread> thr;
auto push = [&gen, &queue, &counter, &g]() {
while (!g) {}
while (g) {
queue.push(gen());
++counter;
}
};
auto pop = [¬ify, &g, &counter, &queue]() {
while (!g) {}
while (g) {
auto r = queue.pop();
if (r.first == queue_type::Result::Success) {
notify(r.second);
}
++counter;
}
};
std::generate_n(std::back_inserter(thr), createGenCount, [push] { return std::thread{ push }; });
std::generate_n(std::back_inserter(thr), createNotifyCount, [pop] { return std::thread{ pop }; });
g = !g;
std::this_thread::sleep_for(timeout);
g = !g;
for (auto& elem : thr) {
elem.join();
}
return counter.load();
}
//queueがthread safeなのかを検証します。
template <typename L>
bool checkQueue() {
auto v1 = std::atomic <int64_t> {0};
auto v2 = std::atomic <int64_t> {0};
auto result = std::atomic <bool> {true};
//cpuのコア数分詰める処理を追加し、popしたデータを検証するのは一つが担う。
queue_tester <int64_t, L>(
[&v1] {
#if 1
//検証するデータを生成するfunctionを返す。(この処理はlockされた後に呼ばれる。)
return std::function <int64_t()> {
[&v1] {
++v1;
return v1.load();
}
};
#else
// 詰めるスレッドが複数なら、詰められる値はバラバラになってしまう。
++v1;
return v1.load();
#endif
},
std::thread::hardware_concurrency(),
[&v2, &result](int64_t n) {
//queueから取得したデータの通知を受け検証する。
++v2;
if (v2.load() != n) {
result = false;
assert(false && "queue is bug.");
}
},
1,
std::chrono::seconds(5)
);
return result.load();
}
//スコアを計測。
template <typename Lock>
uint64_t score() {
const auto core = std::max <uint32_t> (std::thread::hardware_concurrency() >> 1, 1);
return queue_tester <uint64_t, Lock>(
[] {
//好きな値で詰める。重要なのは呼ばれた回数。
return 1234;
},
core,
[](uint64_t) {
//popしたデータを受けても何もしない。重要なのは呼ばれた回数。
},
core,
std::chrono::seconds(5)
);
}
//平均を算出
template <typename F, typename FN>
auto avg(F f, uint32_t count, FN notify) -> decltype(f()) {
auto r = decltype(f()){};
for (auto i = UINT32_C(0); i < count; ++i) {
auto rr = f();
notify(rr);
r += rr;
}
return r / count;
}
ふう……完成です。
最後の一振りは
検証と結果の出力をマクロにしておきます。
//キューを検証した後にスコアを計測。
#define SCORE(lock, count) \
std::cout << "TargetLock => " << ""#lock"" << std::endl; \
if (checkQueue<lock>()) { \
std::cout << "\t" << "checkQueue: Success." << std::endl; \
std::cout << "\t" << "Score(Avg): " << avg(score<lock>, count, [](decltype(score<lock>()) score) { std::cout << "\t\t" << "Score: " << score << std::endl; }) << std::endl; \
} else { \
std::cout << "\t" << "checkQueue: Failed." << std::endl; \
} \
std::cout << std::endl;
測定 & 結果
さてさて、実際にスコアを計測してみましょう。
int main() {
const auto TEST_COUNT = 3;
//スコアを計測。lockの実装が不十分だと検証で弾かれる。
SCORE(lockNormal, TEST_COUNT);
SCORE(lockTry, TEST_COUNT);
SCORE(lockWeak, TEST_COUNT);
return 0;
}
私の開発マシンでは以下のようになりました
(CPU: i7-2600k、Win7、VC2013 Release Build)
1: Test timeout computed to be: 9.99988e+006
1: TargetLock => lockNormal
1: checkQueue: Success.
1: Score: 11758106
1: Score: 9432942
1: Score: 13260581
1: Score(Avg): 11483876
1:
1: TargetLock => lockTry
1: checkQueue: Success.
1: Score: 48652518
1: Score: 49120617
1: Score: 48946418
1: Score(Avg): 48906517
1:
1: TargetLock => lockWeak
1: checkQueue: Success.
1: Score: 80397898
1: Score: 80919271
1: Score: 81028361
1: Score(Avg): 80781843
1:
1/1 Test #1: test ............................. Passed 64.01 sec
おお、凄い結果になりました。。
atomic <bool>恐るべし…
調べるとCPU命令で比較と値の交換を行っているんですね。(CAS)
http://qiita.com/kmikmy/items/0a7324cc50fd40470407
http://cpprefjp.github.io/reference/atomic/atomic/compare_exchange_weak.html
ソースコード
今回の記事に作成したソース一式です。
https://github.com/flowtumn/lock_performance
travis-ciでの測定結果(linux/osx)
2015/12/18 travis-ciで計測してみました。