Raspberry Pi Picoの二つのCoreを使ってUARTから一つのXBeeを制御することになったので、Picoのmutexで排他制御をしてみました。その時に調べたことをまとめてみました。
PicoのLock機能は以下のような構成になっています。
Raspberry Pi PicoのSDKは2つのレイヤーから構成されていて、RP2040ハードウェアーにアクセスするためのhardware_から始まるライブラリー群とpico_から始まるライブラリー群で構成されています。下から順番に、ハードウェアーのRP2040で、ここでは32個のSpinlockが提供されています。次にhardware_syncでRP2040が提供しているSpinlockを使うためのSDKのCライブラリーになります。最上層のlock_coreとmutexなどを提供しているpico_syncライブラリーになります。lock_coreはmutexやsemから呼び出される共通の機能が集められています。
以下が今回使ったmutex部分のコードの抜粋になります。
static mutex_t xbee_send_mutex;
// mutex の取得を試みる。成功するとtrueを返し、失敗するとfalseを返します。
// owner パラメータには、失敗したときに現在のxbee_send_mutexに保持されているオーナーが入ります。
if(!mutex_try_enter(&xbee_send_mutex, &owner)){
// 現在のCore IDを調べて、ownerが同じであれば再入を避けるためにfalseを返して関数を終了します。
if(owner == get_core_num()) return false;
// 他のCoreがmutexを保持している場合にはブロックして取得できるまで待ちます。
mutex_enter_blocking(&xbee_send_mutex);
}
/*** この部分にCritical Sectionを挿入します ***/
// mutex を開放します。
mutex_exit(&xbee_send_mutex);
最初、mutex_try_enterでブロックをしないでmutexの取得を試みます。そこで失敗したときにmutexを取得しているCore IDがowner変数にコピーされます。mutexを取得しようとしているCore IDが同じであった場合、再入を避けるためにfalseを返して処理を終了します。ownerが現在のCore IDでなければ、次にmutex_enter_blockingを使って他のCoreがmutexを開放するのをまって、mutexを取得します。mutex_enter_blockingがmutexの取得に成功したら、/*** critical section ***/のところで排他制御をしなければならない、出来る限り短いコードを挿入して、排他制御部分の処理が終わったところで、mutex_exitを使ってmutexを開放します。
次のダイヤグラムは、Raspberry Pi PicoのDatasheetから抜粋した、SIO(Single-cycle IO)の説明で使用しているダイアグラムです。
RP2040のLock機能では32個のハードウェアースピンロックをSIO(single-cycle IO)が提供しています。それぞれのSpinlockは1ビットのフラグで、SPINLOCK0からSPINLOCK31のレジスターに割り当てられています。
ロックを得るためには、Spinlockを読み出して、その値がゼロ以外であればロックが成功して、もしゼロならばSpinlockはもうすでにほかのプロセスによってロックを取得されていることになります。ロックの解除は何らかの値をSpinlockに書き込むことで、ロックは解放されます。
もし両方のコアが同時にSpinlockを取得しようとすると、コア0が取得することになります。
一般的にSpinlockはロックが取得できるまでループで繰り返されるので、出来る限り短い間隔で取得と解放を行います。(Pico SDKでは、上記のFIFO 0 to 1、FIFO 1 to 0を使ってEventを使うことでループで繰り返すことを防いでいます。詳細はこのあとで説明します。)
Debug目的でSpinlockの状態を見たいときにはSPINLOCK_STレジスターを使います。
また、Hardware Spinlockに関係する機能として、それぞれのCoreが互いにイベント・メッセージを送るためにFIFOが用意されていて、8個までのイベントやメッセージを保持することができるようになっています。mutexでは以下の命令をFIFOで使っています。
- __sev : Set Event 命令をコードパスに挿入します。SEVは互いのcoreにメッセージを送ります。
- __wfe : Wait For Event 命令をコードパスに挿入します。WFEはEventが発生するのをまち、SEVで互いのcoreに送られたEventも含まれます。WFEが実行された場合、そのcoreは省電力モードに入りEventを受け取ることを待ちます。
ここまでがRP2040の基本的なLock機能の説明になります。そして、それらがどのようにSDKを通して使うことができるのか、コードをみながらmutexの機能を詳細に見てみます。Mutex APIは非IRQでのマルチコア間で使うための排他処理のためのモジュールになります。以下がmutex構造体の宣言部分になります。
typedef struct __packed_aligned mutex {
lock_core_t core;
lock_owner_id_t owner; //! owner id LOCK_INVALID_OWNER_ID for unowned
} mutex_t;
From pico/mutex_t.h
メンバーはlock_core_tとlock_owner_id_tになります。lock_core_t構造体はlock_core.hで宣言されていて、以下のようにhardware_syncのspin_lock_tをラップしています。lock_owner_id_tは、pico/lock_core.hでint8_tとして定義されて、ロックを取得しているCore IDを保持するためのメンバーになります。ルールとしては、ownerを変更するためには必ずそのCoreでSpinlockを取得しなければならないというところです。ownerには必ず取得しているCore IDか、無効なIDの-1が入っています。
以下のlock_coreの宣言部分にはNoteとして、構造体に含まれているあらゆるlockメンバーにはvolatileは必要なく、Spinlockの取得、解放時にはメモリーフェンス(メモリーバリア)によって守られていると書いてありますが、詳しくはこの後で見ていきます。
/** \file lock_core.h
* \ingroup lock_core
*
* Base implementation for locking primitives protected by a spin lock. The spin lock is only used to protect
* access to the remaining lock state (in primitives using lock_core); it is never left locked outside
* of the function implementations
*/
struct lock_core {
// spin lock protecting this lock's state
spin_lock_t *spin_lock;
// note any lock members in containing structures need not be volatile;
// they are protected by memory/compiler barriers when gaining and release spin locks
};
typedef struct lock_core lock_core_t;
From pico/lock_core.h
次にmutex_try_enterを見ていくと、今回はリエントラント (再入可能)は有効にはしていないのでPICO_MUTEX_ENABLE_SDK120_COMPATIBILITYのブロックは無視します。
spin_lock_blockingは、Spinlockをかける前に割り込み処理を無効にしてからSpinlockをかけます。戻り値は割り込み処理を無効にしたときの直前のステータスを返します。この戻り値は、Spinlockをアンロックしたあとに割り込みをもとの状況に戻すために使います。lock_is_owner_id_validはマクロでownerが0以上かどうかをテストをすることでownerが有効な数値であるかどうかを確認するためのマクロです。ここではCore IDが入っているので、通常は0もしくは1が入っています。無効なIDとして-1がLOCK_INVALID_OWNER_IDとして定義されていて、だれもmutexを取得していない時はLOCK_INVALID_OWNER_IDが入っています。
ここでは、ownerが有効な値でなければ(mutexがどちらのCoreからも取得されていない状態)現在のCore IDがownerに代入されて、mutexの取得に成功したことになります。lock_get_caller_owner_idはマクロで、内部的にはget_core_numの戻り値が返されます。
現在のmuetxのownerが有効であれば、mutexはそのownerに取得されているのでそのownerをowner_outに代入します。
最後にspin_unlockでspin_lockを開放して割り込みをもとの状態に戻します。
bool __time_critical_func(mutex_try_enter)(mutex_t *mtx, uint32_t *owner_out) {
#if PICO_MUTEX_ENABLE_SDK120_COMPATIBILITY
if (mtx->recursive) {
return recursive_mutex_try_enter(mtx, owner_out);
}
#endif
bool entered;
uint32_t save = spin_lock_blocking(mtx->core.spin_lock);
if (!lock_is_owner_id_valid(mtx->owner)) {
mtx->owner = lock_get_caller_owner_id();
entered = true;
} else {
if (owner_out) *owner_out = (uint32_t) mtx->owner;
entered = false;
}
spin_unlock(mtx->core.spin_lock, save);
return entered;
}
From pico_sync/mutex.c
mutex_enter_blockingは少し複雑で内部的にはCore間にあるFIFOを使ってCore間通信を行っています。まずは、lock_get_caller_owner_idで現在のCore IDを取得しcaller変数に保持します。次に無限ループに入り、spin_lock_blockingを使ってspin_lockを取得します。取得後にownerが無効(取得されていない状態)であれば、現在のCore IDをownerに代入してspinlockを開放して無限ループ処理を抜けます。
もうすでにmutexが他のCoreによって取得済みであれば、lock_internal_spin_unlock_with_waitが呼び出され、これはマクロでspin_unlockが呼び出されspin_lockが解放されて、もう一つのコアにWait For Eventを送信して、このコアはスリープモードに入ります。また、マクロの処理はアトミックに実行されます。スリープモードは、他のCoreがmutex_exitを呼び出し、内部的にSev(Set Event)が実行されて、FIFOにEventが流れてWait For Eventから復帰することになります。
void __time_critical_func(mutex_exit)(mutex_t *mtx) {
#if PICO_MUTEX_ENABLE_SDK120_COMPATIBILITY
if (mtx->recursive) {
recursive_mutex_exit(mtx);
return;
}
#endif
uint32_t save = spin_lock_blocking(mtx->core.spin_lock);
assert(lock_is_owner_id_valid(mtx->owner));
mtx->owner = LOCK_INVALID_OWNER_ID;
lock_internal_spin_unlock_with_notify(&mtx->core, save);
}
From pico_sync/mutex.c
mutex_exitでは、spin_lock_blockingでspin_lockを取得して、saveには以前にも説明したように割り込み処理を無効にしてあるので、あとで割り込み処理をもとの状態に戻すために必要な情報が入っています。次にmutexのownerに有効なIDが入っていることを確認したあとに無効なIDを代入し、mutexが未取得状態にします。最後にlock_internal_spin_unlock_with_notifyを呼びます。これはマクロでspin_unlockが呼び出されspin_lockが解放されて、他のCoreにSet EventをFIFOを通して送信して、このスリープモードに入っているCoreを復帰させてmutexを取得させます。また、このマクロの処理はアトミックに実行されます。
以下が、lock_internal_spin_unlock_with_waitとlock_internal_spin_unlock_with_notifyの宣言になります。
#ifndef lock_internal_spin_unlock_with_wait
/*! \brief Atomically unlock the lock's spin lock, and wait for a notification.
* \ingroup lock_core
*
* _Atomic_ here refers to the fact that it should not be possible for a concurrent lock_internal_spin_unlock_with_notify
* to insert itself between the spin unlock and this wait in a way that the wait does not see the notification (i.e. causing
* a missed notification). In other words this method should always wake up in response to a lock_internal_spin_unlock_with_notify
* for the same lock, which completes after this call starts.
*
* In an ideal implementation, this method would return exactly after the corresponding lock_internal_spin_unlock_with_notify
* has subsequently been called on the same lock instance, however this method is free to return at _any_ point before that;
* this macro is _always_ used in a loop which locks the spin lock, checks the internal locking primitive state and then
* waits again if the calling thread should not proceed.
*
* By default this macro simply unlocks the spin lock, and then performs a WFE, but may be overridden
* (e.g. to actually block the RTOS task).
*
* \param lock the lock_core for the primitive which needs to block
* \param save the uint32_t value that should be passed to spin_unlock when the spin lock is unlocked. (i.e. the `PRIMASK`
* state when the spin lock was acquire
*/
#define lock_internal_spin_unlock_with_wait(lock, save) spin_unlock((lock)->spin_lock, save), __wfe()
#endif
#ifndef lock_internal_spin_unlock_with_notify
/*! \brief Atomically unlock the lock's spin lock, and send a notification
* \ingroup lock_core
*
* _Atomic_ here refers to the fact that it should not be possible for this notification to happen during a
* lock_internal_spin_unlock_with_wait in a way that that wait does not see the notification (i.e. causing
* a missed notification). In other words this method should always wake up any lock_internal_spin_unlock_with_wait
* which started before this call completes.
*
* In an ideal implementation, this method would wake up only the corresponding lock_internal_spin_unlock_with_wait
* that has been called on the same lock instance, however it is free to wake up any of them, as they will check
* their condition and then re-wait if necessary/
*
* By default this macro simply unlocks the spin lock, and then performs a SEV, but may be overridden
* (e.g. to actually un-block RTOS task(s)).
*
* \param lock the lock_core for the primitive which needs to block
* \param save the uint32_t value that should be passed to spin_unlock when the spin lock is unlocked. (i.e. the PRIMASK
* state when the spin lock was acquire)
*/
#define lock_internal_spin_unlock_with_notify(lock, save) spin_unlock((lock)->spin_lock, save), __sev()
#endif
From pico/lock_core.h
lock_internal_spin_unlock_with_waitとlock_internal_spin_unlock_with_notifyは対のマクロで、lock_internal_spin_unlock_with_waitはSpinlockをunlockしてwfe(Wait For Event)で他のCoreがSEV(Set EVent)でEventを送るのを待ちます。lock_internal_spin_unlock_with_notifyはSpinlockをunlockして他のCoreに対してSEVでEventを送ります。
そのため、lock_internal_spin_unlock_with_waitにあるspin_unlockと__wfeはアトミックに実行され、間にlock_internal_spin_unlock_with_notifyが実行されるべきではなく、__wfeが呼ばれるとそのCoreは省電力モードになって停止状態にはいります。他のCoreがSEVを実行してそこからEventを受け取ることでそのCoreが省電力モードから復帰することができるようになります。
/*! \brief Acquire a spin lock safely
* \ingroup hardware_sync
*
* This function will disable interrupts prior to acquiring the spinlock
*
* \param lock Spinlock instance
* \return interrupt status to be used when unlocking, to restore to original state
*/
__force_inline static uint32_t spin_lock_blocking(spin_lock_t *lock) {
uint32_t save = save_and_disable_interrupts();
spin_lock_unsafe_blocking(lock);
return save;
}
/*! \brief Release a spin lock safely
* \ingroup hardware_sync
*
* This function will re-enable interrupts according to the parameters.
*
* \param lock Spinlock instance
* \param saved_irq Return value from the \ref spin_lock_blocking() function.
* \return interrupt status to be used when unlocking, to restore to original state
*
* \sa spin_lock_blocking()
*/
__force_inline static void spin_unlock(spin_lock_t *lock, uint32_t saved_irq) {
spin_unlock_unsafe(lock);
restore_interrupts(saved_irq);
}
From hardware/sync.h
spin_lock_blockingはsave_and_disable_interruptsを使って割り込み処理を無効にして、unlockしたときに割り込み処理をもとの状態にするために現在の状態をsave変数に保存して、spin_lock_unsafe_blockingでspinlockを取得します。最後にsave変数を戻り値として返して、呼び出し元がunlockをするときに割り込み処理をもとの状態にできるようにします。
spin_unlockは割り込み処理をspinlock取得前の状態にするためにspin_lock_blockingの戻り値を引数にします。spin_unlock_unsafeでspinlockを開放して、restore_interruptsを使って割り込み処理をspinlock取得前の状態にします。
ここでspin_lock_unsafe_blockingとspin_unlock_unsafeでunsafeが意味するところは、この関数が割り込み処理を無効にしないということで、割り込み処理についてはこの関数の前後で有効無効を制御しなければならないということです。
/*! \brief Acquire a spin lock without disabling interrupts (hence unsafe)
* \ingroup hardware_sync
*
* \param lock Spinlock instance
*/
__force_inline static void spin_lock_unsafe_blocking(spin_lock_t *lock) {
// Note we don't do a wfe or anything, because by convention these spin_locks are VERY SHORT LIVED and NEVER BLOCK and run
// with INTERRUPTS disabled (to ensure that)... therefore nothing on our core could be blocking us, so we just need to wait on another core
// anyway which should be finished soon
while (__builtin_expect(!*lock, 0));
__mem_fence_acquire();
}
/*! \brief Release a spin lock without re-enabling interrupts
* \ingroup hardware_sync
*
* \param lock Spinlock instance
*/
__force_inline static void spin_unlock_unsafe(spin_lock_t *lock) {
__mem_fence_release();
*lock = 0;
}
From hardware/sync.h
spin_lock_unsafe_blockingはwhileループでブロックをして、spinlockを取得します。__builtin_expectは、コンパイラーに対して分岐情報を提供するもので、GNUコンパイラーの拡張機能でARMコンパイラーでもサポートされている機能です。ここでは"!*lock"が"0"であることを期待していることになります。ここで、もう一度RP2040のDatasheetの原文を確認しながらコードを見てみます。RP2040のSpinlockの取得方法は、Spinlockを読むことでSpinlockを取得することができます。読んだ結果が0以外であれば取得に成功したことになり、0だともうすでに他に取得されていることになります。"*lock"を読み、結果が"!"で反転され0以外の結果が0になりwhileループは取得に成功するとループを終了することになります。結果が0の場合は1になり、Spinlockを読み続けることになります。ただ、ここでは取得できることがほとんどなので、__builtin_expectで0を期待することになっています。
__mem_fence_acquireと__mem_fence_releaseは、spin_lock_unsafe_blockingではSpinlockを取得するまで、spin_unlock_unsafeでは解放するまで、メモリー操作の並び替えを抑制しているように見えますが、Cortex-Mの"ARM Cortex-M Programming Guide to Memory Barrier Instructions"をみると、"The use of DMB is rarely needed in Cortex-M processors because they do not reorder memory transactions."とあるので、メモリー操作の並び替えを抑制のためではなく、ここは"struct lock_core"にあったコメントにもあるように、アトミック性のための__mem_fence_acquireと__mem_fence_releaseになるようです。
// note any lock members in containing structures need not be volatile;
// they are protected by memory/compiler barriers when gaining and release spin locks
__mem_fence_acquireと__mem_fence_releaseでは、両方とも__dmbが呼び出されていますが、Cortex-Mではフルフェンスと呼ばれるメモリバリア命令だけなので、関数内では同じ__dmbが呼ばれています。aquire/releaseについては、リード・アフター・ライト操作の可視性に関するもので、読み込む側(acquire)と書き込む側(release)がそれぞれの観点で命令「acquire」命令と「release」命令でメモリバリアを構成するものもあるので、Cortex-Mの将来の拡張のために二つの関数に分けたと思われます。
/*! \brief Acquire a memory fence
* \ingroup hardware_sync
*/
__force_inline static void __mem_fence_acquire(void) {
// the original code below makes it hard for us to be included from C++ via a header
// which itself is in an extern "C", so just use __dmb instead, which is what
// is required on Cortex M0+
__dmb();
//#ifndef __cplusplus
// atomic_thread_fence(memory_order_acquire);
//#else
// std::atomic_thread_fence(std::memory_order_acquire);
//#endif
}
/*! \brief Release a memory fence
* \ingroup hardware_sync
*
*/
__force_inline static void __mem_fence_release(void) {
// the original code below makes it hard for us to be included from C++ via a header
// which itself is in an extern "C", so just use __dmb instead, which is what
// is required on Cortex M0+
__dmb();
//#ifndef __cplusplus
// atomic_thread_fence(memory_order_release);
//#else
// std::atomic_thread_fence(std::memory_order_release);
//#endif
}
hardware/sync.h
/*! \brief Insert a DMB instruction in to the code path.
* \ingroup hardware_sync
*
* The DMB (data memory barrier) acts as a memory barrier, all memory accesses prior to this
* instruction will be observed before any explicit access after the instruction.
*/
__force_inline static void __dmb(void) {
__asm volatile ("dmb" : : : "memory");
}
from hardware/sync.h
__force_inlineはpico/platform.hで定義されているマクロで、最適化のレベルに関係なく関数をインライン関数にします。インライン関数とは、コンパイラーが、個別の命令セットをメモリー内に作成するのではなく、関数定義からのコードを呼び出し元関数のコードに直接コピーすることでソース内で処理が展開、実行されるので高速化が見込めます。__asmの後のvolatileは、コンパイラーに最適化で意図していない動作を防ぐために置かれています。
save_and_disable_interruptsは、現在の割り込み処理のステータスをstatus変数に戻り値として確保し、割り込み処理を無効にします。sutatus変数の値は、割り込み処理を復帰(有効)させた後に、割り込みが無効にされる前の状態に戻すために使います。
"cpsid i"で、PRIMASKレジスタの値を1にして、ハードフォルト、NMI、リセット以外の割り込みを禁止にします。PRIMASKは例外処理マスクレジスターで、設定可能な優先順位によって、すべての例外をマスクします。
/*! \brief Save and disable interrupts
* \ingroup hardware_sync
*
* \return The prior interrupt enable status for restoration later via restore_interrupts()
*/
__force_inline static uint32_t save_and_disable_interrupts(void) {
uint32_t status;
__asm volatile ("mrs %0, PRIMASK" : "=r" (status)::);
__asm volatile ("cpsid i");
return status;
}
from hardware/sync.h
lock_owner_id_tはint8_tとして定義されています。基本的にはCore IDが入るか、無効な値として-1が入ります。
#ifndef lock_owner_id_t
/*! \brief type to use to store the 'owner' of a lock.
* \ingroup lock_core
* By default this is int8_t as it only needs to store the core number or -1, however it may be
* overridden if a larger type is required (e.g. for an RTOS task id)
*/
#define lock_owner_id_t int8_t
#endif
ここでは無効なCore IDとして-1が定義されています。
#ifndef LOCK_INVALID_OWNER_ID
/*! \brief marker value to use for a lock_owner_id_t which does not refer to any valid owner
* \ingroup lock_core
*/
#define LOCK_INVALID_OWNER_ID ((lock_owner_id_t)-1)
#endif
lock_get_caller_owner_idはマクロでowner IDとして、get_core_num()の戻り値を返します。lock_is_owner_id_validはcore IDとして有効な値かどうかをテストします。
#ifndef lock_get_caller_owner_id
/*! \brief return the owner id for the caller
* \ingroup lock_core
* By default this returns the calling core number, but may be overridden (e.g. to return an RTOS task id)
*/
#define lock_get_caller_owner_id() ((lock_owner_id_t)get_core_num())
#ifndef lock_is_owner_id_valid
#define lock_is_owner_id_valid(id) ((id)>=0)
#endif
#endif
以上になります。