読み込み側が圧倒的に多い状況化でのNumaCPUに最適化したlock free table
はじめに
ロックレス:特定のインデクスにあるポインタについて、A/Bの二面を保持し、インデクス毎に現在の表面が[A or B]のどちらかを管理する。
読み込み側は、表面をreadし、書き込み側は、裏面に書き込み、書き込み完了後に、現在の面を「裏面」->「表面」にアトミックに変更する。
この時、読み込み側は「裏面」になった以前の「表面」をreadしている可能性があるため、書き込みが連続する環境化では、連続して同じデータを書き換えることはできない
それは、以下の制約を設けることで解決できる
- 読み込み処理は、CPUコア[n] に割り当て、書き込み処理は、CPUコア[n+1]に割り当てる
- シングルプロデューサー、シングルコンシューマ
- 読み込み処理の完了時に、カウンタをインクリメントする
- 書き込み処理は、[3]のカウンタが前回と同値だった場合、読み込みポインタはアクセスされる可能性があるため、書き込み処理はスキップ
詳細
主処理で32bit値より64Byte程度の関連データを検索し、関連データを元に処理する機能を実装設計することを考えた。関連データは、Mysqlデータベースに保存し、テーブル定義は、整数をキー、64Byte構造体相当のカラムを定義する。
データ挿入、変更、削除をトリガーとして、複数スレッドに変更通知を発行する、データベース変更トリガーは、mysql bin log apiを利用して、専用のスレッド:書き込み専用スレッド(pthread_setaffinity_np で、主処理(読み込み専用)スレッドと異なるNumaNodeのCPUに割り当てる)から、主処理:読み込みスレッドへLockFree テーブルを経由してデータを通知する
この複数スレッドが参照するテーブルのアクセスをlock freeにすることで主処理を高速化することができる
lock free table
lock free table によるスレッド間データ同期では、boost::lockfree::spsc_queue 、boost::atomic_uint64_t によるロックフリー実装による実装設計を検討した
しかし、パフォーマンス評価で、スケールはするものの(1モジュールに必要なコア数=3)12スレッド、3モジュール程度でほぼすべてのCPUコアリソースを使い切ってしまった
boostロックフリーキューでは、主要処理ではないキュー処理のために多くのCPUリソースが必要になってしまい本末転倒なパフォーマンスとなってしまった
そこで、主要処理にCPUリソースを残せるような非ロック:スレッド間データ同期インタフェイスを次のように考えた(考えている時のメモ)
この設計の特徴
- データは、A面、B面のどちらかが最新で、もう片方は一つ古いデータ
- 最新データが、A,B面のいずれかは、Bitmapを参照するとわかる
- m x n のデータ群は一つ一つの領域をmalloc等でメモリマネージャから割り当てる
- 読み込みトラフィック > 書込みトラフィック となるようなシステム
- 読み込み側のスレッドで、A面、B面、Bitmap全てをallocateする(libnumactlを利用せずに当該CPU側Nodeに配置させるため)
- 書込みスレッドは、非アクティブ面にlock freeでデータを書き込む
- 書込みスレッドは、非アクティブ面への全データ書込みが完了した後に、bitmapを操作し、A面、B面をひっくり返す
- また、読み込みスレッドが、アクティブ面を確定し、データ領域を読み込んでいる瞬間にA,B面が切り替わらないよう、仮想クロックカウンタが前回より進んでいるかをチェックする
- 読み込みスレッドは、アクティブ面からlock freeでデータを読み込む
- データ読み込みが完了した後、仮想クロックカウンタを進め、書込みスレッドが非アクティブ面へ書き込むことが可能なことをマークする
[R]は読み込みスレッド、[W]は書き込みスレッド、[C]はPPカウンタ、そして下図はPPカウンタを含めた全体実装イメージ
まとめ
割り当てるスレッド数(CPU数)でスケールしており 8 コア割り当てで、150Mtps(1秒間に、読み取り、書込み処理を150000000回)を計測した
4コア割り当て時のリソース消費も、書き込み、読み込みCPUを最大限活用できる
一つ目のスレッドは、共有バッファへの構造体データを書込むスレッドであり、binlog apiスレッドに該当する、そして、二つ目のスレッドが主処理スレッドに相当する
以下のソースコードによるスレッド間メモリアクセスパフォーマンスを計測対象である
(LockFreeTableは200Step程度の薄いtemplate実装:ソースは最後に)
TEST(MultiThread, ReadWrite){
uint64_t counter = 0,rcounter = 0;
unsigned long long r_st = 0,r_et = 0;
int done = 0;
std::auto_ptr<LockFreeTable<struct link>> tbl(LockFreeTable <struct link>::Create());
auto s_tbl_ = tbl.get();
EXPECT_NE((void*)s_tbl_, (void*)NULL);
//
auto st = msec();
s_threads_.create_thread([&]{
for(int c = 0;c < 10;c++){
for(uint64_t n = 0;n<65536;n+=(c>0?17:1)){
struct link itm;
bzero(&itm, sizeof(itm));
itm.hoge_0 = n+2;
itm.hoge_1 = n+3;
itm.hoge_2 = n+4;
itm.hoge_3 = n+5;
//
EXPECT_EQ(s_tbl_->Add((__be32)(200+n),&itm, 0), RETOK);
counter++;
if (c > 0){ usleep(10); }
}
done = 1;
}
done=2;
});
s_threads_.create_thread([&]{
while(1){
if (done == 0){ usleep(100); continue; }
break;
}
r_st = msec();
// at main thread
for(int c = 0;c < 100;c++){
for(uint64_t n = 0;n<65536;n++){
auto it = s_tbl_->Find(200+n,0);
EXPECT_EQ(it == s_tbl_->End(), false);
if (it != s_tbl_->End()){
EXPECT_EQ(it->hoge_0, (n+2));
EXPECT_EQ(it->hoge_1, (n+3));
EXPECT_EQ(it->hoge_2, (n+4));
EXPECT_EQ(it->hoge_3, (n+5));
rcounter++;
}
if (done == 2){ break; }
}
}
r_et = msec();
});
s_threads_.join_all();
auto et = msec();
char bf[128] = {0};
//
fprintf(stderr, "### read/write ###\n %s pps\n#############\n", Norm(bf, (double)rcounter*1000000.0e0/(double)(r_et - r_st)));
}
対象アーキテクチャ
二面ロックフリーlookup table
ヘッダ
lockfree.hpp
template<typename T,int S,typename K>
class Lockfree{
public:
class Iterator{
public:
Iterator():ptr_(NULL){}
Iterator(T* ptr): ptr_(ptr){}
T& operator*() { return(*ptr_); }
T* operator->() { return(ptr_); }
bool operator==(const Iterator cp) { return(ptr_==cp.ptr_);}
bool operator!=(const Iterator cp) { return(ptr_!=cp.ptr_);}
private:
T* ptr_;
};
public:
static Lockfree<T,S,K>* Create(void);
static Lockfree<T,S,K>* Init(void);
virtual ~Lockfree();
private:
Lockfree();
public:
int Add(const K,T*, int);
int Del(const K,int);
void Clock(void);
void Clear();
void ClearNoticeBmp64(const K);
//
Iterator Find(const K key, const int flag = 0);
uint64_t FindNoticeBmp64(const K);
Iterator End();
T& operator[](const K key);
void SwapSide(const K key);
void NotifyChange(const K key);
void EnumerateForUpdate(std::function<void(T*,void*)>,void*);
private:
T* data_side_a_[S];
T* data_side_b_[S];
T* end_;
uint64_t data_side_bmp_[S>>6];
uint64_t data_notice_bmp_[S>>6];
uint64_t virtual_clock_counter_;
uint64_t virtual_clock_counter_prv_;
};
#include "lockfree.hh"
#ifndef STAT_ON
# define STAT_ON (0)
#endif
#ifndef STAT_OFF
# define STAT_OFF (1)
#endif
namespace LTEEPC{
static pthread_mutex_t init_mtx_;
//
template<typename T,int S,typename K> Lockfree<T,S,K>* Lockfree<T,S,K>::Create(void){
return(new Lockfree<T,S,K>());
}
template<typename T,int S,typename K> Lockfree<T,S,K>* Lockfree<T,S,K>::Init(void){
static Lockfree<T,S,K>* lookup = NULL;
pthread_mutex_lock(&init_mtx_);
if (!lookup){
lookup = Lockfree<T,S,K>::Create();
}
pthread_mutex_unlock(&init_mtx_);
return(lookup);
}
template<typename T,int S,typename K> Lockfree<T,S,K>::~Lockfree(){
for(int n = 0;n < S;n++){
free(data_side_a_[n]);
free(data_side_b_[n]);
}
free(end_);
}
template<typename T,int S,typename K> Lockfree<T,S,K>::Lockfree(){
// instanciate.
/*
* [important]
* initialization process must be called on the main thread.
*/
if ((end_ = (T*)malloc(sizeof(T))) == NULL){
throw "malloc for link_ptr(end)";
}
for(int n = 0;n < S;n++){
if ((data_side_a_[n] = (T*)malloc(sizeof(T))) == NULL ||
(data_side_b_[n] = (T*)malloc(sizeof(T))) == NULL){
throw "malloc for link_ptr(side_[a/b])";
}
}
Clear();
}
template<typename T,int S,typename K> void Lockfree<T,S,K>::Clock(void){
virtual_clock_counter_++;
// fprintf(stderr, "VCLOCK[%p]..vcc :" FMT_LLU "/vpc :" FMT_LLU "\n",(void*)this, virtual_clock_counter, virtual_clock_counter_prv);
}
template<typename T,int S,typename K> void Lockfree<T,S,K>::Clear(){
int n;
memset(data_side_bmp_,0,sizeof(data_side_bmp_));
memset(data_notice_bmp_,0,sizeof(data_notice_bmp_));
for(n = 0;n < S;n++){
memset(data_side_a_[n], 0, sizeof(*data_side_a_[n]));
memset(data_side_b_[n], 0, sizeof(*data_side_b_[n]));
}
memset(end_, 0, sizeof(*end_));
virtual_clock_counter_ = 0;
virtual_clock_counter_prv_ = 0;
}
template<typename T,int S,typename K> void Lockfree<T,S,K>::ClearNoticeBmp64(const K gcnt){
if (gcnt <= S){
data_notice_bmp_[gcnt>>6] = 0;
}
}
template<typename T,int S,typename K> typename Lockfree<T,S,K>::Iterator Lockfree<T,S,K>::Find(const K gcnt, const int flag){
/*
* find process
* 1. determine side of valid
* 2. return valid side data
*/
if (gcnt > (S-1)){
return(End());
}
uint64_t bits = (((uint64_t)1)<<(gcnt - ((gcnt>>6)<<6)));
if (flag){
// back side is always return.
if (!(data_side_bmp_[(gcnt>>6)]&bits)){
return(Iterator(data_side_a_[gcnt]));
}else{
return(Iterator(data_side_b_[gcnt]));
}
}else{
if ((data_side_bmp_[(gcnt>>6)]&bits)){
if (data_side_a_[gcnt]->stat.valid==STAT_ON){
return(Iterator(data_side_a_[gcnt]));
}
}else{
if (data_side_b_[gcnt]->stat.valid==STAT_ON){
return(Iterator(data_side_b_[gcnt]));
}
}
}
// fprintf(stderr, "failed..stat.valid!=ON .find(%u)\n", gcnt);
return(End());
}
template<typename T,int S,typename K> uint64_t Lockfree<T,S,K>::FindNoticeBmp64(const K gcnt){
if (gcnt > (S-1)){
return(0);
}
return(data_notice_bmp_[(gcnt>>6)]);
}
template<typename T,int S,typename K> typename Lockfree<T,S,K>::Iterator Lockfree<T,S,K>::End(){ return(Lockfree<T,S,K>::Iterator(end_)); }
template<typename T,int S,typename K> T& Lockfree<T,S,K>::operator[](const K gcnt){
if (gcnt > (S-1)){
// fprintf(stderr, "capability over ,link operator[] %u - %u\n", gcnt, S);
throw "invalid index";
}
Iterator itr = Find(gcnt);
if (itr != End()){
itr->stat.valid = STAT_ON;
return((*itr));
}
itr = Find(gcnt, 1);
if (itr == End()){
throw "exception operator[]";
}
itr->stat.valid = STAT_ON;
SwapSide(gcnt);
return((*itr));
}
template<typename T,int S,typename K> void Lockfree<T,S,K>::SwapSide(const K gcnt){
uint64_t bits = (((uint64_t)1)<<(gcnt - ((gcnt>>6)<<6)));
data_side_bmp_[(gcnt>>6)]^=bits;
}
template<typename T,int S,typename K> void Lockfree<T,S,K>::NotifyChange(const K gcnt){
uint64_t bits = (((uint64_t)1)<<(gcnt - ((gcnt>>6)<<6)));
data_notice_bmp_[(gcnt>>6)]^=bits;
}
template <typename T,int S,typename K> void Lockfree<T,S,K>::EnumerateForUpdate(std::function<void(T*,void*)> func, void* udata){
K gcnt;
//
for(gcnt = 1; gcnt < S;gcnt++){
uint64_t bits = (((uint64_t)1)<<(gcnt - ((gcnt>>6)<<6)));
// return at back side.
if (data_side_bmp_[(gcnt>>6)]&bits){
*(data_side_b_[gcnt]) = *(data_side_a_[gcnt]);
func(data_side_b_[gcnt],udata);
data_side_b_[gcnt]->stat.valid = STAT_ON;
}else{
*(data_side_a_[gcnt]) = *(data_side_b_[gcnt]);
func(data_side_a_[gcnt],udata);
data_side_a_[gcnt]->stat.valid = STAT_ON;
}
}
// 最後にまとめて、ビット変更、ひっくり返す
for(gcnt = 0; gcnt < (S>>6);gcnt++){
data_side_bmp_[gcnt]^=((uint64_t)-1);
}
}
template<typename T,int S,typename K>int Lockfree<T,S,K>::Add(K gcnt, T* itm,int nochk_counter){
Lockfree<T,S,K>::Iterator itr = NULL;
// read phase
if (nochk_counter){
if (virtual_clock_counter_ <= virtual_clock_counter_prv_){
// fprintf(stderr, "failed[%p]..add(%u) vcc :%llu/vpc :%llu\n",(void*)this, gcnt, virtual_clock_counter_, virtual_clock_counter_prv_);
return(RETERR);
}
virtual_clock_counter_prv_ = virtual_clock_counter_;
}
itm->stat.valid = STAT_ON;
// access back side.
if ((itr = Find(gcnt, 1)) == End()){
// fprintf(stderr, "failed[%p]..add(%u) vcc :%llu/vpc :%llu\n",(void*)this, gcnt, virtual_clock_counter_, virtual_clock_counter_prv_);
throw "exception not found(add_link).";
}
// update and swap.
memcpy(&(*itr), itm, sizeof(*itm));
SwapSide(gcnt);
NotifyChange(gcnt);
//
return(RETOK);
}
template<typename T,int S,typename K>int Lockfree<T,S,K>::Del(K gcnt, int nochk_counter){
Lockfree<T,S,K>::Iterator itr = NULL;
if (nochk_counter){
// read phase
if (virtual_clock_counter_ <= virtual_clock_counter_prv_){
// fprintf(stderr, "failed[%p]..delete(%u) vcc :%llu/vpc :%llu\n",(void*)this, gcnt, virtual_clock_counter_, virtual_clock_counter_prv_);
return(RETERR);
}
virtual_clock_counter_prv_ = virtual_clock_counter_;
}
// access back side.
if ((itr = Find(gcnt, 1)) == End()){
throw "exception not found(add_link).";
}
// update and swap.
itr->stat.valid = STAT_OFF;
SwapSide(gcnt);
NotifyChange(gcnt);
//
return(RETOK);
}