はじめに
前回までの記事では、Embassy-rsの基本的な使い方として、Lチカ、並行処理、PWM制御を扱いました。
今回は、複数のタスク間で変数を安全に共有する方法について解説します。組み込み開発では、センサーの値を複数のタスクで参照したり、制御値を共有したりする場面が頻繁に発生します。しかし、複数のタスクが同じメモリに同時アクセスすると、データ競合が発生し、予期せぬバグの原因となります。
Rustの型システムとEmbassy-rsは、この問題をコンパイル時にチェックし、安全なタスク間通信を実現します。
具体的には、以下の3つの方法を解説します。
- アトミック変数: ロックフリーで高速、単純な値の共有に最適
- Channel: 非同期なデータの送受信
- Mutex: 複雑なデータ構造の保護に適した排他制御
今回は、ロータリーエンコーダーの値をタスク間で共有する例を通して、これらの方法を理解していきます。
使用したマイコンはNucleo STM32 F446REです。
今回使用したコードはembassy-introductionリポジトリにまとめておきました。
また、Embassyに関する情報がまだ少ないためか、AIに聞いても間違った解答が返ってくることが多いです。そこは注意してください。
参考資料
The Rust Programming Language 日本語版
Rustの日本語翻訳版ドキュメントです。一連の記事では3, 4, 5, 6, 9, 10, 18章の内容を扱います。
記事内でも少しは解説しますが、ドキュメントの完成度が非常に高いので読むことをおすすめします。
Embassy Book
全部英語で読むのが辛いですが、From bare metal to async Rust、System description
は読んでみることをおすすめします。
embassy
EmbassyのGitHubリポジトリ。examplesが非常に役に立ちました。
NUCLEO-F446RE
MbedによるNucleo F446REのページです。
STM32F446xC/E DataSheet
STMicroelectronicsによるNucleo F446xC/Eのデータシートです。
ロータリーエンコーダーとは
ロータリーエンコーダーは、軸の回転角度や回転数を電気信号に変換するセンサーです。モーターの位置制御やロボットのオドメトリ(自己位置推定)などに広く使われています。
一般的なインクリメンタル型エンコーダーは、A相とB相の2つの信号を出力します。これらの信号の位相差を読み取ることで、回転方向と回転量を検出できます。
A相: _| ̄|_| ̄|_| ̄|_
B相: __| ̄|_| ̄|_| ̄
→(正回転)
A相: __| ̄|_| ̄|_| ̄
B相: _| ̄|_| ̄|_| ̄|_
→(逆回転)
STM32のQEI(Quadrature Encoder Interface)機能を使うと、ハードウェアが自動的にこれらの信号をカウントしてくれます。A相とB相の立ち上がり/立ち下がりを検出してカウントすることで、エンコーダーの分解能を4逓倍にできます。
エンコーダー構造体の実装
エンコーダーの実装全体は以下のようになります。エンコーダーを使ったテストはしていないため、実際に使えるのかどうかは不明です。
#[allow(unused)]
enum RotateDirection {
Forward,
Reverse,
}
struct Encoder<'d, T: GeneralInstance4Channel> {
qei: Qei<'d, T>,
qei_resolution: u32,
direction: RotateDirection,
last_hw_count: u16,
software_count: i32,
}
#[allow(dead_code)]
impl<'d, T: GeneralInstance4Channel> Encoder<'d, T> {
fn new(
tim: Peri<'d, T>,
phase_a_pin: Peri<'d, impl TimerPin<T, qei::Ch1>>,
phase_b_pin: Peri<'d, impl TimerPin<T, qei::Ch2>>,
ppr: u32,
direction: RotateDirection,
) -> Self {
let phase_a_pin = QeiPin::new(phase_a_pin);
let phase_b_pin = QeiPin::new(phase_b_pin);
// QEIではA相とB相の立ち上がり/立ち下がりを用いて4逓倍でカウントされる
let qei_resolution = ppr * 4;
let qei = Qei::new(tim, phase_a_pin, phase_b_pin);
// 現在のハードウェアカウントを読み取り、初期値とする
let last_hw_count = qei.count();
Self {
qei,
qei_resolution,
direction,
last_hw_count,
software_count: 0,
}
}
fn get_count(&self) -> i32 {
self.software_count
}
/// 現在の回転数を取得
pub fn get_rotations(&self) -> f32 {
self.get_count() as f32 / self.qei_resolution as f32
}
/// ハードウェアカウントを読み取り、ソフトウェアカウントを更新します。
///
/// ## [重要]
///
/// このメソッドは、エンコーダーがハードウェアカウント上限の半分
/// (32,767カウント ≒ 分解能2048で4回転)
/// 回転するよりも短い周期で、外部の制御ループから定期的に呼び出す必要があります。
fn update(&mut self) {
let current_hw_count = self.qei.count();
let delta = self.calculate_delta(current_hw_count, self.last_hw_count);
match self.direction {
RotateDirection::Forward => self.software_count += delta,
RotateDirection::Reverse => self.software_count -= delta,
}
self.last_hw_count = current_hw_count;
}
/// オーバーフローを考慮してエンコーダーのカウント変化を計算
fn calculate_delta(&self, current_count: u16, last_count: u16) -> i32 {
if current_count > last_count {
let delta = current_count - last_count;
if delta <= 32767 {
// 100 -> 200 : +100
// 増加としてカウント
delta as i32
} else {
// 50 -> 65486 : -100
// 減少としてカウント
// -(65536 - delta)と等しい
-(delta.wrapping_neg() as i32)
}
} else {
let delta = last_count - current_count;
if delta <= 32767 {
// 200 -> 100 : -100
// 減少としてカウント
-(delta as i32)
} else {
// 65486 -> 50 : +100
// 増加としてカウント
// 65536 - deltaと等しい
delta.wrapping_neg() as i32
}
}
}
}
まず、エンコーダーを扱うための構造体を実装します。この構造体は、ハードウェアのカウンタのオーバーフローを考慮しながら、累積回転数を正確に追跡します。
#[allow(unused)]
enum RotateDirection {
Forward, // 正回転
Reverse, // 逆回転
}
struct Encoder<'d, T: GeneralInstance4Channel> {
qei: Qei<'d, T>, // QEIハードウェア
qei_resolution: u32, // エンコーダーの分解能(4逓倍後)
direction: RotateDirection, // 回転方向
last_hw_count: u16, // 前回のハードウェアカウント
software_count: i32, // ソフトウェアで管理する累積カウント
}
Rust初学者向け解説
トレイト境界とGeneralInstance4Channel
struct Encoder<'d, T: GeneralInstance4Channel>
T: GeneralInstance4Channelは、トレイト境界と呼ばれる制約です。これは「型TはGeneralInstance4Channelトレイトを実装していなければならない」という意味です。
トレイトは、C++の抽象クラスに似た概念で、「この型はこういうメソッドを持っている」という制約を定義します。
// TIM1, TIM2などがGeneralInstance4Channelトレイトを実装している
let encoder1: Encoder<TIM1> = ...; // ✅ OK
let encoder2: Encoder<TIM2> = ...; // ✅ OK
// let encoder3: Encoder<i32> = ...; // ❌ コンパイルエラー: i32はGeneralInstance4Channelを実装していない
これにより、QEI機能を持つタイマーのみをEncoderに渡せることが保証されます。
エンコーダーの初期化
impl<'d, T: GeneralInstance4Channel> Encoder<'d, T> {
fn new(
tim: Peri<'d, T>,
phase_a_pin: Peri<'d, impl TimerPin<T, qei::Ch1>>,
phase_b_pin: Peri<'d, impl TimerPin<T, qei::Ch2>>,
ppr: u32, // Pulse Per Revolution(1回転あたりのパルス数)
direction: RotateDirection,
) -> Self {
let phase_a_pin = QeiPin::new(phase_a_pin);
let phase_b_pin = QeiPin::new(phase_b_pin);
// QEIでは4逓倍でカウントされる
let qei_resolution = ppr * 4;
let qei = Qei::new(tim, phase_a_pin, phase_b_pin);
let last_hw_count = qei.count();
Self {
qei,
qei_resolution,
direction,
last_hw_count,
software_count: 0,
}
}
}
ppr(Pulse Per Revolution)は、エンコーダーが1回転で出力するパルス数です。4逓倍モードでは、A相とB相の立ち上がり/立ち下がりを全て検出するため、実際の分解能はppr * 4になります。
カウントの更新
/// ハードウェアカウントを読み取り、ソフトウェアカウントを更新します。
///
/// ## [重要]
///
/// このメソッドは、エンコーダーがハードウェアカウント上限の半分
/// (32,767カウント ≒ 分解能2048で4回転)
/// 回転するよりも短い周期で、外部の制御ループから定期的に呼び出す必要があります。
fn update(&mut self) {
let current_hw_count = self.qei.count();
let delta = self.calculate_delta(current_hw_count, self.last_hw_count);
match self.direction {
RotateDirection::Forward => self.software_count += delta,
RotateDirection::Reverse => self.software_count -= delta,
}
self.last_hw_count = current_hw_count;
}
STM32のハードウェアカウンタは16ビット(0〜65535)しかないため、回転が進むとオーバーフローします。calculate_delta()メソッドは、このオーバーフローを考慮して正しいカウント変化量を計算します。
オーバーフローを考慮した差分計算
/// オーバーフローを考慮してエンコーダーのカウント変化を計算
fn calculate_delta(&self, current_count: u16, last_count: u16) -> i32 {
if current_count > last_count {
let delta = current_count - last_count;
if delta <= 32767 {
// 100 -> 200 : +100
// 増加としてカウント
delta as i32
} else {
// 50 -> 65486 : -100
// 減少としてカウント
-(delta.wrapping_neg() as i32)
}
} else {
let delta = last_count - current_count;
if delta <= 32767 {
// 200 -> 100 : -100
// 減少としてカウント
-(delta as i32)
} else {
// 65486 -> 50 : +100
// 増加としてカウント
delta.wrapping_neg() as i32
}
}
}
16ビットカウンタの半分(32767)を基準に、オーバーフローを検出します。そのため、update()は32767カウント未満の周期で呼び出す必要があります。
回転数の取得
/// 現在の回転数を取得
pub fn get_rotations(&self) -> f32 {
self.get_count() as f32 / self.qei_resolution as f32
}
fn get_count(&self) -> i32 {
self.software_count
}
これで、エンコーダーの累積回転数を取得できます。
タスク間共有の必要性
エンコーダーの値を使うタスクを考えてみましょう。
- タスク1: 5msごとにエンコーダーの値を更新
- タスク2: 500msごとにエンコーダーの値を表示
この2つのタスクでsoftware_countを共有する必要があります。しかし、単純に変数を共有すると、以下の問題が発生します。
let mut count = 0;
spawner.spawn(update_task(&mut count)); // countの可変参照を借用
spawner.spawn(print_task(&count)); // ❌ コンパイルエラー: 既に可変参照が存在
Rustの借用規則により、可変参照と不変参照は同時に存在できません。これは、データ競合を防ぐための重要な制約です。
そこで、タスク間で安全にデータを共有するための特別な仕組みが必要になります。
方法1: アトミック変数
アトミック変数は、ロックを使わずに複数のタスクから安全にアクセスできる変数です。読み書きが不可分(アトミック)に実行されるため、データ競合が発生しません。
コード全体
#![no_std]
#![no_main]
use core::sync::atomic::{AtomicI32, Ordering};
use defmt::*;
use embassy_executor::Spawner;
use embassy_stm32::{
Peri,
peripherals::{TIM1, TIM2},
timer::{
GeneralInstance4Channel, TimerPin,
qei::{self, Qei, QeiPin},
},
};
use embassy_time::Timer;
// Panic handler. Don't remove.
use {defmt_rtt as _, panic_probe as _};
#[embassy_executor::main]
async fn main(spawner: Spawner) {
let p = embassy_stm32::init(Default::default());
let encoder1 = Encoder::new(p.TIM1, p.PA8, p.PA9, 2048, RotateDirection::Forward);
let encoder2 = Encoder::new(p.TIM2, p.PA0, p.PA1, 2048, RotateDirection::Forward);
static ENCODER1_COUNT: AtomicI32 = AtomicI32::new(0);
static ENCODER2_COUNT: AtomicI32 = AtomicI32::new(0);
spawner
.spawn(update_encoder_tim1(encoder1, &ENCODER1_COUNT))
.unwrap();
spawner.spawn(print_encoder_count(&ENCODER1_COUNT)).unwrap();
spawner
.spawn(update_encoder_tim2(encoder2, &ENCODER2_COUNT))
.unwrap();
spawner.spawn(print_encoder_count(&ENCODER2_COUNT)).unwrap();
}
#[embassy_executor::task]
async fn update_encoder_tim1(encoder: Encoder<'static, TIM1>, count: &'static AtomicI32) {
update_encoder(encoder, count).await;
}
#[embassy_executor::task]
async fn update_encoder_tim2(encoder: Encoder<'static, TIM2>, count: &'static AtomicI32) {
update_encoder(encoder, count).await;
}
async fn update_encoder(
mut encoder: Encoder<'static, impl GeneralInstance4Channel>,
count: &'static AtomicI32,
) {
loop {
encoder.update();
count.store(encoder.get_count(), Ordering::Relaxed);
Timer::after_millis(5).await;
}
}
#[embassy_executor::task(pool_size = 2)]
async fn print_encoder_count(count: &'static AtomicI32) {
loop {
info!("{}", count.load(Ordering::Relaxed));
Timer::after_millis(500).await;
}
}
// Encoder構造体の定義(省略)
ビルドと実行
前回と同様に、以下のコマンドまたはrust-analyzerのRunボタンで実行してください。
cargo run
ピンを指で触れば静電気によって値が動きます(もちろん非推奨です)。encoder1は右中央辺り、encoder2は左下辺りのピンです。
注意: これは動作確認のための簡易的な方法です。ピンに直接触れると静電気による破損のリスクがあるため、推奨しません。
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.04s
Running `probe-rs run --chip STM32F446RE --connect-under-reset target/thumbv7em-none-eabihf/debug/encoder_atomic`
Erasing ✔ 100% [####################] 128.00 KiB @ 52.07 KiB/s (took 2s)
Programming ✔ 100% [####################] 92.00 KiB @ 38.65 KiB/s (took 2s) Finished in 4.84s
0.000000 [TRACE] BDCR ok: 00008200 (embassy_stm32 src/rcc/bd.rs:221)
0.000000 [DEBUG] flash: latency=0 (embassy_stm32 src/rcc/f247.rs:264)
0.000000 [DEBUG] rcc: Clocks { hclk1: MaybeHertz(16000000), hclk2: MaybeHertz(16000000), hclk3: MaybeHertz(16000000), hse: MaybeHertz(0), hsi: MaybeHertz(16000000), lse: MaybeHertz(0), lsi: MaybeHertz(0), pclk1: MaybeHertz(16000000), pclk1_tim: MaybeHertz(16000000), pclk2: MaybeHertz(16000000), pclk2_tim: MaybeHertz(16000000), pll1_q: MaybeHertz(0), pll1_r: MaybeHertz(0), plli2s1_p: MaybeHertz(0), plli2s1_q: MaybeHertz(0), plli2s1_r: MaybeHertz(0), pllsai1_q: MaybeHertz(0), rtc: MaybeHertz(32000), sys: MaybeHertz(16000000) } (embassy_stm32 src/rcc/mod.rs:71)
0.001922 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
0.003570 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
0.503326 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
0.505187 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
1.005035 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
1.007202 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
1.507141 [INFO ] -42 (encoder_atomic src/bin/encoder_atomic.rs:65)
1.509765 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
2.009490 [INFO ] -42 (encoder_atomic src/bin/encoder_atomic.rs:65)
2.012634 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
2.511535 [INFO ] -37 (encoder_atomic src/bin/encoder_atomic.rs:65)
2.514038 [INFO ] 0 (encoder_atomic src/bin/encoder_atomic.rs:65)
3.013580 [INFO ] -78 (encoder_atomic src/bin/encoder_atomic.rs:65)
3.015747 [INFO ] -8 (encoder_atomic src/bin/encoder_atomic.rs:65)
3.515319 [INFO ] -78 (encoder_atomic src/bin/encoder_atomic.rs:65)
3.517883 [INFO ] -20 (encoder_atomic src/bin/encoder_atomic.rs:65)
4.017333 [INFO ] -78 (encoder_atomic src/bin/encoder_atomic.rs:65)
4.020141 [INFO ] -40 (encoder_atomic src/bin/encoder_atomic.rs:65)
4.519531 [INFO ] -109 (encoder_atomic src/bin/encoder_atomic.rs:65)
4.522186 [INFO ] -40 (encoder_atomic src/bin/encoder_atomic.rs:65)
コードの解説
Rust初学者向け解説
static変数
static ENCODER1_COUNT: AtomicI32 = AtomicI32::new(0);
staticは、プログラムの実行中ずっと存在する変数を定義します。C++の静的変数に似ていますが、Rustでは以下の制約があります。
-
static変数は必ず初期化が必要 -
static mut(可変な静的変数)はunsafeブロック内でしか使えない
タスクに渡す変数は'staticライフタイムが必要なため、staticで定義します。
アトミック変数とOrdering
count.store(encoder.get_count(), Ordering::Relaxed);
let count = count.load(Ordering::Relaxed);
アトミック変数は、store()で書き込み、load()で読み取ります(イミュータブルであっても値を変更することができます)。Orderingは、メモリアクセスの順序をどの程度厳密に制御するかを指定します。
-
Ordering::Relaxed: 最も緩い制約。他の操作との順序は保証されないが、その操作自体はアトミック -
Ordering::Acquire/Release: より厳密な順序保証 -
Ordering::SeqCst: 最も厳密(最も遅い)
今回の例では、単純にカウント値を読み書きするだけなので、Relaxedで十分です。
pool_size
#[embassy_executor::task(pool_size = 2)]
async fn print_encoder_count(count: &'static AtomicI32) {
pool_sizeは、同じタスクを複数起動する際の、タスクの最大数を指定します。通常、Embassyのタスクは1つしか起動できませんが、pool_sizeを指定することで複数起動が可能になります。
spawner.spawn(print_encoder_count(&ENCODER1_COUNT)).unwrap();
spawner.spawn(print_encoder_count(&ENCODER2_COUNT)).unwrap();
これにより、同じロジックを異なるデータに対して並行実行できます。
タスクの定義
#[embassy_executor::task]
async fn update_encoder_tim1(encoder: Encoder<'static, TIM1>, count: &'static AtomicI32) {
update_encoder(encoder, count).await;
}
#[embassy_executor::task]
async fn update_encoder_tim2(encoder: Encoder<'static, TIM2>, count: &'static AtomicI32) {
update_encoder(encoder, count).await;
}
update_encoder_tim1とupdate_encoder_tim2が別々に定義されているのは、Embassyのタスクが具体的な型を要求するためです。
Encoder<TIM1>とEncoder<TIM2>は異なる型なので、pool_sizeを使って1つのタスクで両方を起動することができません。そのため、タイマーごとに別々のタスク関数を定義し、共通のロジックをupdate_encoder()に抽出しています。
アトミック変数の利点と欠点
利点
- ロックフリーで高速
- シンプルで理解しやすい
- オーバーヘッドが最小
欠点
- 単純な型(整数と真偽値)しか扱えない
- 複数の値をまとめて更新できない(例: x座標とy座標)
方法2: Channel
Channelは、タスク間でデータを非同期に送受信する仕組みです。送信側と受信側が独立しており、バッファを持つことができます。
コード全体
#![no_std]
#![no_main]
use defmt::*;
use embassy_executor::Spawner;
use embassy_stm32::{
Peri,
peripherals::{TIM1, TIM2},
timer::{
GeneralInstance4Channel, TimerPin,
qei::{self, Qei, QeiPin},
},
};
use embassy_sync::{
blocking_mutex::raw::CriticalSectionRawMutex,
channel::{Channel, Receiver, Sender},
};
use embassy_time::Timer;
// Panic handler. Don't remove.
use {defmt_rtt as _, panic_probe as _};
#[embassy_executor::main]
async fn main(spawner: Spawner) {
let p = embassy_stm32::init(Default::default());
let encoder1 = Encoder::new(p.TIM1, p.PA8, p.PA9, 2048, RotateDirection::Forward);
let encoder2 = Encoder::new(p.TIM2, p.PA0, p.PA1, 2048, RotateDirection::Forward);
static ENCODER1_CHANNEL: Channel<CriticalSectionRawMutex, i64, 1> = Channel::new();
let sender1 = ENCODER1_CHANNEL.sender();
let receiver1 = ENCODER1_CHANNEL.receiver();
static ENCODER2_CHANNEL: Channel<CriticalSectionRawMutex, i64, 1> = Channel::new();
let sender2 = ENCODER2_CHANNEL.sender();
let receiver2 = ENCODER2_CHANNEL.receiver();
spawner
.spawn(update_encoder_tim1(encoder1, sender1))
.unwrap();
spawner.spawn(print_encoder(receiver1)).unwrap();
spawner
.spawn(update_encoder_tim2(encoder2, sender2))
.unwrap();
spawner.spawn(print_encoder(receiver2)).unwrap();
}
#[embassy_executor::task]
async fn update_encoder_tim1(
encoder: Encoder<'static, TIM1>,
sender: Sender<'static, CriticalSectionRawMutex, i64, 1>,
) {
update_and_send_loop(encoder, sender).await;
}
#[embassy_executor::task]
async fn update_encoder_tim2(
encoder: Encoder<'static, TIM2>,
sender: Sender<'static, CriticalSectionRawMutex, i64, 1>,
) {
update_and_send_loop(encoder, sender).await;
}
async fn update_and_send_loop(
mut encoder: Encoder<'static, impl GeneralInstance4Channel>,
sender: Sender<'static, CriticalSectionRawMutex, i64, 1>,
) {
loop {
encoder.update();
sender.clear();
sender.send(encoder.get_count()).await;
Timer::after_millis(5).await;
}
}
#[embassy_executor::task(pool_size = 2)]
async fn print_encoder(receiver: Receiver<'static, CriticalSectionRawMutex, i64, 1>) {
loop {
info!("{}", receiver.receive().await);
Timer::after_millis(500).await;
}
}
// Encoder構造体の定義(省略)
コードの解説
Channelの型パラメータ
static ENCODER1_CHANNEL: Channel<CriticalSectionRawMutex, i64, 1> = Channel::new();
Channelは3つの型パラメータを取ります。
-
CriticalSectionRawMutex: 内部で使用するMutexの型(後述) -
i64: Channelで送受信するデータの型 -
1: バッファサイズ(保留できるメッセージ数)
SenderとReceiver
let sender = ENCODER1_CHANNEL.sender();
let receiver = ENCODER1_CHANNEL.receiver();
ChannelからSenderとReceiverを取得します。これらは独立したオブジェクトなので、異なるタスクに渡すことができます。
また、いずれもコピー可能(Copyトレイト付き)で、どこからでも受信・送信ができます。
sender.send(encoder.get_count()).await; // 送信(非同期)
let count = receiver.receive().await; // 受信(非同期)
send()とreceive()は両方とも非同期メソッドです。
バッファが満杯の場合、send()はバッファが空くまで待機します。逆に、データが無い場合、receive()はデータが届くまで待機します。
表示する際に最新の値が取得できるよう、今回はsend()する前にclear()してバッファを空にしてから送信しています。
注意点として、受信側が複数ある場合、どれか1つがreceive()するとそのデータはmoveされるため、他の受信側がそのデータを受信することはできません。
Channelの利点と欠点
利点
- 送信側と受信側が完全に独立
- バッファリングによるデータの一時保存が可能
- 複雑な型でも送受信できる
欠点
- バッファ分のメモリオーバーヘッドがある
- 最新の値だけが欲しい場合は不向き(古いデータが残る可能性)
追記: SignalとWatchという、それぞれバッファの大きさが1のとき、受信側が複数ある時に適している構造体もありました。
SignalはChannelとは違い、送信(Signal::signal())すると前の値が上書きされます。1つのインスタンスへの参照を他のタスクで使用します。
WatchはChannelとは違い、送信(Sender::send())すると複数の受信側の値が上書きされます。また、Channelと同じようにSenderとReceiver(数に上限あり)を作り、他のタスクで使用します。
方法3: Mutex
Mutex(Mutual Exclusion: 相互排他)は、複数のタスクが同じデータに同時アクセスするのを防ぐ仕組みです。データへアクセスする前に「ロック」を取得し、アクセス後に「アンロック」します。
コード全体
#![no_std]
#![no_main]
use core::cell::RefCell;
use defmt::*;
use embassy_executor::Spawner;
use embassy_stm32::{
Peri,
peripherals::{TIM1, TIM2},
timer::{
GeneralInstance4Channel, TimerPin,
qei::{self, Qei, QeiPin},
},
};
use embassy_sync::blocking_mutex::CriticalSectionMutex;
use embassy_time::Timer;
// Panic handler. Don't remove.
use {defmt_rtt as _, panic_probe as _};
#[embassy_executor::main]
async fn main(spawner: Spawner) {
let p = embassy_stm32::init(Default::default());
let encoder1 = Encoder::new(p.TIM1, p.PA8, p.PA9, 2048, RotateDirection::Forward);
let encoder2 = Encoder::new(p.TIM2, p.PA0, p.PA1, 2048, RotateDirection::Forward);
static ENCODER1_COUNT: CriticalSectionMutex<RefCell<i32>> =
CriticalSectionMutex::new(RefCell::new(0));
static ENCODER2_COUNT: CriticalSectionMutex<RefCell<i32>> =
CriticalSectionMutex::new(RefCell::new(0));
spawner
.spawn(update_encoder_tim1(encoder1, &ENCODER1_COUNT))
.unwrap();
spawner.spawn(print_encoder_count(&ENCODER1_COUNT)).unwrap();
spawner
.spawn(update_encoder_tim2(encoder2, &ENCODER2_COUNT))
.unwrap();
spawner.spawn(print_encoder_count(&ENCODER2_COUNT)).unwrap();
}
#[embassy_executor::task]
async fn update_encoder_tim1(
encoder: Encoder<'static, TIM1>,
count: &'static CriticalSectionMutex<RefCell<i32>>,
) {
update_encoder(encoder, count).await;
}
#[embassy_executor::task]
async fn update_encoder_tim2(
encoder: Encoder<'static, TIM2>,
count: &'static CriticalSectionMutex<RefCell<i32>>,
) {
update_encoder(encoder, count).await;
}
async fn update_encoder(
mut encoder: Encoder<'static, impl GeneralInstance4Channel>,
count: &'static CriticalSectionMutex<RefCell<i32>>,
) {
loop {
encoder.update();
count.lock(|count| *count.borrow_mut() = encoder.get_count());
Timer::after_millis(5).await;
}
}
#[embassy_executor::task(pool_size = 2)]
async fn print_encoder_count(count: &'static CriticalSectionMutex<RefCell<i32>>) {
loop {
info!("{}", count.lock(|ref_cell| *ref_cell.borrow()));
Timer::after_millis(500).await;
}
}
// Encoder構造体の定義(省略)
コードの解説
RefCellと内部可変性
CriticalSectionMutex<RefCell<i32>>
この型は複雑に見えますが、2つの層に分かれています。
-
RefCell<i32>: 内部可変性(interior mutability)を提供 -
CriticalSectionMutex: 排他制御をし、複数のタスクから安全にアクセスできるようにする
まずRefCellについて説明します。
Rustの通常の借用規則では、不変参照&Tから可変参照&mut Tを得ることはできません。しかし、RefCellを使うと、実行時に借用規則をチェックしながら、不変参照経由で可変アクセスができます。
let cell = RefCell::new(5);
// 不変参照&RefCell<i32>から、内部の値を可変借用できる
*cell.borrow_mut() = 10;
// 読み取りも可能
let value = *cell.borrow();
borrow()は不変借用、borrow_mut()は可変借用を行います。同時に複数の可変借用が存在する場合、パニックが発生します(コンパイル時ではなく、実行時にチェック)。
もちろん借用チェックを実行時に行う分、オーバーヘッドがあります。
CriticalSectionMutex
RefCell単体では、複数のスレッドやタスクから安全にアクセスできません。そこで、CriticalSectionMutexでラップします。
static COUNT: CriticalSectionMutex<RefCell<i32>> =
CriticalSectionMutex::new(RefCell::new(0));
CriticalSectionMutexは、クリティカルセクション(割り込みを一時的に無効化する区間)を使って排他制御を行います。
count.lock(|count| {
// この中は排他制御されている
*count.borrow_mut() = encoder.get_count();
});
lock()はクロージャ(無名関数)を受け取り、その中で排他的にデータにアクセスできます。
なぜCriticalSectionMutex<RefCell<i32>>が必要なのか?
-
CriticalSectionMutexだけでは不十分: Mutexは排他制御を提供するだけで、内部可変性は提供しない -
RefCellだけでは不十分: マルチスレッド環境で安全に使えない
両方を組み合わせることで、マルチスレッド環境で安全に、不変参照から可変アクセスができるようになります。
Mutexの利点と欠点
利点
- 複雑なデータ構造も保護できる
- 複数の値をまとめて更新できる(例: 座標(x, y)を同時に更新)
- 最新の値に直接アクセスできる
欠点
- クリティカルセクションの実行中は割り込みが無効化されるため、長時間ロックすると他のタスクや割り込みの応答性が悪化する
- デッドロックの可能性(複数のMutexを使う場合)
3つの方法の比較
| 特徴 | アトミック変数 | Channel | Mutex |
|---|---|---|---|
| 速度 | 最速 | 中速 | やや遅い |
| メモリ | 最小 | バッファ分必要 | 小さい |
| 用途 | 整数値・真偽値 | データストリーム | 複雑なデータ構造 |
| 最新値へのアクセス |
|
||
| 複数値の同時更新 |
|
||
| 複数ヶ所からの変更 | |||
| 割り込みへの影響 | なし | なし | あり(ロック中) |
追記: SignalとWatchについても記載します。
| 特徴 | Signal | Watch |
|---|---|---|
| 速度 | 高速 | 高速 |
| メモリ | 小さい | 小さい |
| 用途 | データストリーム | 1対多の状態共有 |
| 最新値へのアクセス | ||
| 複数値の同時更新 | ||
| 複数ヶ所からの変更 | ||
| 割り込みへの影響 | なし | なし |
使い分けの指針
- アトミック変数: 単純な整数カウンタ、フラグなど、最も頻繁に更新される値に最適
- Channel: センサーデータのストリーム、イベント通知など、データの流れを扱う場合に最適
- Mutex: 複数のフィールドを持つ構造体、状態管理など、複雑なデータを扱う場合に最適
扱うデータ型から考えると、
- 整数・真偽値だったらアトミック変数
- それ以外の型で
- 一箇所からの更新だったらChannel、Mutex
- 複数ヶ所からの更新だったらMutex
今回のエンコーダーのような単純な整数値の共有には、アトミック変数が最も適しています。
ケースバイケースで適切な方針をとってください。
まとめ
この記事では、Embassyでのタスク間の変数共有について、3つの方法を解説しました。