Linux
ARM
マルチスレッド

ARMプロセッサにおけるロックフリーなデータ更新

所用で、ARMプロセッサ上でのロックフリーなデータ更新について調べたのでメモ変わりに書いておく。

ロックフリー

基本的な背景知識について一応説明。
マルチスレッドなシステムにおいて、複数のスレッドから同時に共有データへアクセスを行う際は注意が必要。
例えば複数のスレッドからデータの更新を行う場合、スレッドAがまずデータを更新し終え、次にスレッドBがデータを更新する、といった場合特に問題は発生しない。
ところがデータ更新処理がアトミックに行われていない場合、例えばスレッドAのデータ更新中にスレッドBが先に同じアドレスにあるデータを更新してしまった場合などに不整合が発生する。

よって一般的にはセマフォやミューテックスといったロックを利用する事で、各スレッドがデータを更新している間は他のスレッドからのアクセスを禁止するような設計にする。
ただし、ロックを取っている間他のスレッドは完全にブロックされ停止してしまうため複数のマルチスレッド/コアのシステム等では、これがパフォーマンスの低下に繋がる事もある。
そこで、ロックを使用せずに複数スレッドでデータの読み書きを行うようなアルゴリズムをロックフリーアルゴリズムと言う。

CAS(Compare And Swap)

上記のロックフリーなデータ更新を設計するために利用されるCPU命令には主にCAS(Compare And Swap)LL/SC(Load-Link/Store-Conditional)の2種類がある。
x86が主に前者、Alpha, PowerPC, MIPS, ARMは後者の命令を持っている。
CASとはあるメモリ位置の内容と指定された値を比較し、等しければそのメモリ位置に別の指定された値を格納するという物。
疑似コードっぽく書くと、

int CAS(void *ptr, void *old, void *new) {
  if (memcmp(ptr, old) == 0) {
    memcpy(ptr, new); // Set new value to ptr
    return 1;
  }
  memcpy(old, ptr);
  return 0;
}

データ更新を行おうとしているアドレスptrが以前の値oldと変化していない事を確認し、変化がなければ新しい値newを書き込んでデータを更新するという物。
oldはデータ更新前にアドレスptrにある古い値を設定しておく。データ更新時にこのoldの値が既に以前の物から変わっている場合、他のスレッドが既にデータ更新を行ったという事であるため不整合が起こりうるからである。
この場合CASは0を返し、データ更新をもう一度やり直す必要がある。
実際のCASを使ったデータ更新は以下のようになる。

void AtomicOp(int *ptr)
{
    while(1)
    {
        const int OldValue = *ptr;
        const int NewValue = UpdateData(OldValue); //Update old value to new value

        // If the result is the original value, the new value was stored.
        if(CAS(ptr, &OldValue, &NewValue))
        {
            return;
        }
    }
}

CASが0を返している間、つまり他のスレッドがデータ更新を行う間はデータ更新を行わず再度やり直す。

LL/SC(Load-Link/Store-Conditional)

前節のCASではABA問題が発生する。
CASでは他スレッドによるデータ更新が発生したかを一定の間隔でメモリの値を確認し、値が書き換わっているかを確認する事で調べていた。ところがこの値の確認には一定のインターバルが存在するため、前回の確認から次の確認を行うまでの間にメモリの値が書き換わってまた元に戻るという操作が行われた場合、確認の段階では値は書き換わっていないかのように見えてしまう問題がある。
これはつまり、あるアドレスの値が書き換わったというイベントを一定時間ごとにpollingのように確認するため発生する問題と言えるだろう。

では、pollingではなくイベント駆動的にアドレスの値の変化を検知すれば良いのではないか?というのがLL/SCの基本的な発想である。(と私は考えている...)
LL/SCは特殊なLoad/Store命令の対で、まずデータ更新を行いたいアドレスから値をLL(Load)するが、この時読み込まれたアドレスには"Reserve"というフラグを付加する。このフラグは同じアドレスに書き込みが発生した場合消える。
次にSC(Store)によって新しい値をアドレスに書き込むが、この時SCは"Reserve"フラグが消えていれば書きこみを行わない。
なぜなら"Reserve"フラグが消えているという事は、既に他のスレッドが同じメモリ領域に何らかのデータ更新を行った事を意味するためである。
このように"Reserve"フラグを利用して、データ書き込みを検出するのがLL/SCで基本的な仕組みである。
実際のLL/SCを使ったデータ更新は以下のようになる。

void AtomicOp(int* Value)
{
    while(1)
    {
        const int NewValue = UpdateData(__lwarx(Value));

        // If the reservation was still valid, new value was stored.
        if(__stwcx(Value, NewValue))
        {
            return;
        }
    }
}

(2018-4-21 更新)
LL/SCの"Reserve"フラグはコンテキストスイッチでL1キャッシュから揮発してしまうため、SCが失敗してしまう事があるらしい。
このため厳密にはLock-FreeではなくObstruction-freeになるそうです。
@kumagi さん有難うございます。

ARMプロセッサにおけるLL/SC

ARMプロセッサはCASではなくLL/SC命令を実装しているため、ロックフリーなデータ更新にはLL/SCを利用する必要がある。
32bit ARM(AArch32)ではこれはLDREX と STREXが該当する。

LDREX{cond} Rt, [Rn {, #offset}]
STREX{cond} Rd, Rt, [Rn {, #offset}]
LDREXB{cond} Rt, [Rn]
STREXB{cond} Rd, Rt, [Rn]
LDREXH{cond} Rt, [Rn]
STREXH{cond} Rd, Rt, [Rn]
LDREXD{cond} Rt, Rt2, [Rn]
STREXD{cond} Rd, Rt, Rt2, [Rn]

http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.dui0204ij/Cihbghef.html
また64bit ARM(AArch64)ではLDXR,STXRが該当する。

LDXR  Wt, [Xn|SP{,#0}]    ; 32 ビット汎用レジスタ
LDXR  Xt, [Xn|SP{,#0}]    ; 64 ビット汎用レジスタ
STXR  Ws, Wt, [Xn|SP{,#0}]    ; 32 ビット汎用レジスタ
STXR  Ws, Xt, [Xn|SP{,#0}]    ; 64 ビット汎用レジスタ

http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.dui0802aj/LDXR.html
http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.dui0802aj/STXR.html

Linuxカーネル内での実装例

Linuxカーネルではcmpxchgという関数をアーキテクチャごとに実装する事で、ロックフリーなデータ更新などに使用している。
cmpxchgの実装は64bit ARMの場合arch/arm64/include/asm/atomic_ll_sc.hにある。

#define __CMPXCHG_CASE(w, sz, name, mb, acq, rel, cl)           \
__LL_SC_INLINE unsigned long                        \
__LL_SC_PREFIX(__cmpxchg_case_##name(volatile void *ptr,        \
                     unsigned long old,         \
                     unsigned long new))        \
{                                   \
    unsigned long tmp, oldval;                  \
                                    \
    asm volatile(                           \
    "   prfm    pstl1strm, %[v]\n"              \
    "1: ld" #acq "xr" #sz "\t%" #w "[oldval], %[v]\n"       \
    "   eor %" #w "[tmp], %" #w "[oldval], %" #w "[old]\n"  \
    "   cbnz    %" #w "[tmp], 2f\n"             \
    "   st" #rel "xr" #sz "\t%w[tmp], %" #w "[new], %[v]\n" \
    "   cbnz    %w[tmp], 1b\n"                  \
    "   " #mb "\n"                      \
    "2:"                                \
    : [tmp] "=&r" (tmp), [oldval] "=&r" (oldval),           \
      [v] "+Q" (*(unsigned long *)ptr)              \
    : [old] "Lr" (old), [new] "r" (new)             \
    : cl);                              \
                                    \
    return oldval;                          \
}                                   \
__LL_SC_EXPORT(__cmpxchg_case_##name);

__CMPXCHG_CASE(w, b,     1,        ,  ,  ,         )
__CMPXCHG_CASE(w, h,     2,        ,  ,  ,         )
__CMPXCHG_CASE(w,  ,     4,        ,  ,  ,         )
__CMPXCHG_CASE( ,  ,     8,        ,  ,  ,         )
__CMPXCHG_CASE(w, b, acq_1,        , a,  , "memory")
__CMPXCHG_CASE(w, h, acq_2,        , a,  , "memory")
__CMPXCHG_CASE(w,  , acq_4,        , a,  , "memory")
__CMPXCHG_CASE( ,  , acq_8,        , a,  , "memory")
__CMPXCHG_CASE(w, b, rel_1,        ,  , l, "memory")
__CMPXCHG_CASE(w, h, rel_2,        ,  , l, "memory")
__CMPXCHG_CASE(w,  , rel_4,        ,  , l, "memory")
__CMPXCHG_CASE( ,  , rel_8,        ,  , l, "memory")
__CMPXCHG_CASE(w, b,  mb_1, dmb ish,  , l, "memory")
__CMPXCHG_CASE(w, h,  mb_2, dmb ish,  , l, "memory")
__CMPXCHG_CASE(w,  ,  mb_4, dmb ish,  , l, "memory")
__CMPXCHG_CASE( ,  ,  mb_8, dmb ish,  , l, "memory")

マクロで記述されているが、LDXRでptrから値を読み出し(oldval)、それを以前の値(old)と比較している。
比較にはeor(xor)とcbnzを利用し、変化がなければ新しい値(new)を設定するだけである。

このようにLinuxではCASをLL/SC命令でエミュレーションするような実装になっている。
ちなみにARMv8.1からはLSE(Large System Extension)という命令が増え、これにはCASを1命令で行えるCAS命令という物が追加されている。LinuxではLSEが有効な場合こちらの命令を使った実装を用いる事もできる。
arch/arm64/include/asm/atomic_lse.h

#define __CMPXCHG_CASE(w, sz, name, mb, cl...)              \
static inline unsigned long __cmpxchg_case_##name(volatile void *ptr,   \
                          unsigned long old,    \
                          unsigned long new)    \
{                                   \
    register unsigned long x0 asm ("x0") = (unsigned long)ptr;  \
    register unsigned long x1 asm ("x1") = old;         \
    register unsigned long x2 asm ("x2") = new;         \
                                    \
    asm volatile(ARM64_LSE_ATOMIC_INSN(             \
    /* LL/SC */                         \
    __LL_SC_CMPXCHG(name)                       \
    __nops(2),                          \
    /* LSE atomics */                       \
    "   mov " #w "30, %" #w "[old]\n"           \
    "   cas" #mb #sz "\t" #w "30, %" #w "[new], %[v]\n"     \
    "   mov %" #w "[ret], " #w "30")            \
    : [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr)     \
    : [old] "r" (x1), [new] "r" (x2)                \
    : __LL_SC_CLOBBERS, ##cl);                  \
                                    \
    return x0;                          \
}

__CMPXCHG_CASE(w, b,     1,   )
__CMPXCHG_CASE(w, h,     2,   )
__CMPXCHG_CASE(w,  ,     4,   )
__CMPXCHG_CASE(x,  ,     8,   )
__CMPXCHG_CASE(w, b, acq_1,  a, "memory")
__CMPXCHG_CASE(w, h, acq_2,  a, "memory")
__CMPXCHG_CASE(w,  , acq_4,  a, "memory")
__CMPXCHG_CASE(x,  , acq_8,  a, "memory")
__CMPXCHG_CASE(w, b, rel_1,  l, "memory")
__CMPXCHG_CASE(w, h, rel_2,  l, "memory")
__CMPXCHG_CASE(w,  , rel_4,  l, "memory")
__CMPXCHG_CASE(x,  , rel_8,  l, "memory")
__CMPXCHG_CASE(w, b,  mb_1, al, "memory")
__CMPXCHG_CASE(w, h,  mb_2, al, "memory")
__CMPXCHG_CASE(w,  ,  mb_4, al, "memory")
__CMPXCHG_CASE(x,  ,  mb_8, al, "memory")

こちらは随分シンプルな実装になっており、CAS命令を使うだけである。

Reference

Understanding Atomic Operationsが分かりやすい