概要
前回 は Entity のリストの中から複数の Entity を同時にミュータブル参照する方法について書きました。今回は ECS でもよく使われる世代型Id (Generational Id) アリーナの利用と最適化について書こうと思います。
ゲームにおいては Entity が他の Entity への参照を保持しておきたいことがあります。例えばシューティングゲームの誘導ミサイルはターゲットの Entity への参照を持っておくことで、ターゲットの位置が移動しても追従することができます。 C や C++ ではポインタで参照することはできますが、対象の Entity が削除されたら、 dangling pointer が発生してしまいます。これを防ぐために std::shared_ptr を使うことはできますが、参照カウントの管理コストが生じます。もっと重大な問題として、循環参照がリークしてしまいますが、これは std::weak_ptr を使うことで解決できます。しかし weak_ptr もお世辞にも軽量なデータ構造とは言えません。
世代型 Id
このような問題に対処するために人気のある方法として、世代型 Id があります。これは次のようなデータ構造で、 id と gen のセットになります。
struct EntityId {
id: u32,
gen: u32,
}
この Id を使う上での前提として、 Entity は一連の Vector の要素として確保され、移動することはないとします。
struct Entity {
// ...
}
struct EntityEntry {
gen: u32,
entity: Option<Entity>,
}
type EntityList = Vec<EntityEntry>;
id は Vec へのインデックスなので、 Vec が伸長してリアロケーションが走っても有効なままだということに注目してください。
リスト更新メソッド
Entity が削除されるときは要素をシフトするのではなく、削除したスロットに無効な値をセットします。
impl EntityList {
fn remove(&mut self, id: EntityId) -> Option<Entity> {
self.0[id.id as usize].entity.take()
}
}
EntityId
の id
はこの Vector のインデックスであり、 gen
はそのインデックスが何度再利用されたかを表す数値(世代)です。
新しい Entity を作成するときは、空きスロットを見つけ、そこに新しいオブジェクトを挿入し、世代をインクリメントします。空きスロットがなければ内部の Vec を拡張します。これは若干込み入ったコードになりますが、次のようになります。
impl EntityList {
fn add(&mut self, entity: Entity) -> EntityId {
for (i, entry) in self.0.iter_mut().enumerate() {
if entry.entity.is_none() {
entry.entity = Some(entity);
entry.gen += 1;
return EntityId {
id: i as u32,
gen: self.0[i].gen,
};
}
}
self.0.push(EntityEntry {
gen: 0,
entity: Some(entity),
});
EntityId {
id: self.0.len() as u32 - 1,
gen: 0,
}
}
}
実際の世代型Idアリーナのライブラリ実装では、空のエントリはリンクリストにして検索を高速にする実装にすることが多いと思いますが、ここでは単純化のために省略しています。
アクセサメソッド
ここまでのメソッドで Entity の世代がユニークであることを保証すれば、次のようなメソッドで実際の Id を使って Entity を参照することができます。同じインデックスを再利用した場合でも世代が異なれば見つからなかったものと見なします。 self.0.get
は Vec のインデックスアクセスなので、ただの O(1) のアドレスオフセット計算であることに注目してください。
impl EntityList {
fn get(&self, id: EntityId) -> Option<&Entity> {
self.0.get(id.id as usize).and_then(|e| {
if e.gen == id.gen {
e.entity.as_ref()
} else {
None
}
})
}
}
ミュータブル版は次の通りです。
impl EntityList {
fn get_mut(&mut self, id: EntityId) -> Option<&mut Entity> {
self.0.get_mut(id.id as usize).and_then(|e| {
if e.gen == id.gen {
e.entity.as_mut()
} else {
None
}
})
}
}
スライストリック
HashMap などのコンテナに比べて、この方法の優れた特徴は、前回触れたスライストリックとの相性が非常に良いということです。例えば、任意の2つの Entity へのミュータブル参照を同時に得るメソッドを次のように書くことができます。2つの EntityId が偶然同じインデックスに確保された場合は、有効な世代のほうのみを返すようにするため、多少込み入ったロジックになっています。まあ、ペアを取得したいときに呼ぶ関数ですから、どちらか片方が無効だった場合は、両方まとめて None を返す実装でもいいかもしれません。その場合はもう少しシンプルな実装になるはずです。
impl EntityEntry {
fn get_mut(&mut self, id: EntityId) -> Option<&mut Entity> {
if self.gen == id.gen {
self.entity.as_mut()
} else {
None
}
}
}
impl EntityList {
fn get_pair_mut(
&mut self,
a: EntityId,
b: EntityId,
) -> (Option<&mut Entity>, Option<&mut Entity>) {
if a.id < b.id {
let (left, right) = self.0.split_at_mut(b.id as usize);
(
left.get_mut(a.id as usize)
.and_then(|entry| entry.get_mut(a)),
right.first_mut().and_then(|s| s.get_mut(b)),
)
} else if b.id < a.id {
let (left, right) = self.0.split_at_mut(a.id as usize);
(
right.first_mut().and_then(|s| s.get_mut(a)),
left.get_mut(b.id as usize)
.and_then(|entry| entry.get_mut(b)),
)
} else if self
.0
.get(a.id as usize)
.map(|a_obj| a.gen != a_obj.gen)
.unwrap_or(true)
{
(
None,
self.0
.get_mut(b.id as usize)
.and_then(|entry| entry.get_mut(b)),
)
} else {
(
self.0
.get_mut(a.id as usize)
.and_then(|entry| entry.get_mut(a)),
None,
)
}
}
}
スライスイテレータ
前回は自分自身を除いたイテレータを返す方法を紹介しました。世代型Idはもちろんこのイテレータとも相性が良いです。
ですが、今回はもう一段踏み込んで、複数の任意の Entity を除外したイテレータを作ることを考えてみます。
複数の任意の Entity を除外したイテレータを作りたいことなんて、そんなにあるか?と思われるかもしれませんが、私の場合は相互作用する3つ以上の Entity をトレイトオブジェクトを使った動的ポリモーフィズムで実装する場合に必要でした。
この場合、 Entity A の仮想関数を呼び、それが Entity B の仮想関数を呼び、それが他の Entity のリストを検索するといったことが生じますが、仮想関数の呼び出しがネストするたびにそのトレイトオブジェクトを除外したリストを使って残りのオブジェクトを表現する必要があります。
もちろん、任意の Entity を除外したリストを作るくらいなら、最初から含めたい Entity だけのリストを作ればいいじゃないかという主張も成り立ちます。ただし、仮想関数呼び出しのネストはせいぜい3か4ぐらいが上限なのに対し、 Entity の数は数百や数千といった数になりうる(上記の図では単純化されているのでスケール感がありませんが)という点が重要なのです。
まず最初に必要なことは、前回のように left と right の2つのスライス固定で分けるのではなく、スライスの Vec を使ってイテレータに必要なデータを表現することです。
struct EntitySlice<'a> {
start: usize,
slice: &'a mut [EntityEntry],
}
struct EntityDynIter<'a>(Vec<EntitySlice<'a>>);
最もシンプルなケースとして、何も除外しない場合は一つのスライスを使って次のように構築できます。
impl<'a> EntityDynIter<'a> {
fn new_all(source: &'a mut EntityList) -> Self {
Self(vec![EntitySlice {
start: 0,
slice: &mut source.0,
}])
}
}
また、インデックスを指定してリストを2つに分割する場合(これがメインの使い方になると思いますが)は次のようになります。
これは除外した Entity のエントリも一緒に返します。
impl<'a> EntityDynIter<'a> {
fn new_split(
source: &'a mut EntityList,
split_idx: usize,
) -> Option<(&'a mut EntityEntry, Self)> {
let (left, right) = source.0.split_at_mut(split_idx);
let (center, right) = right.split_first_mut()?;
Some((
center,
Self(vec![
EntitySlice {
start: 0,
slice: left,
},
EntitySlice {
start: split_idx + 1,
slice: right,
},
]),
))
}
}
さらに、既存の EntityDynIter
をもう一度分割したい場合は、次のメソッドを使います。
impl<'a> EntityDynIter<'a> {
fn exclude(&mut self, id: EntityId) -> Option<&mut Entity> {
let idx = id.id as usize;
if let Some((slice_idx, _)) = self
.0
.iter_mut()
.enumerate()
.find(|(_, slice)| slice.start <= idx && idx < slice.start + slice.slice.len())
{
let slice_borrow = &self.0[slice_idx];
let entry = &slice_borrow.slice[idx - slice_borrow.start];
if entry.gen != id.gen || entry.entity.is_none() {
return None;
}
let slice = std::mem::take(&mut self.0[slice_idx]);
let (left, right) = slice.slice.split_at_mut(idx - slice.start);
let (center, right) = right.split_first_mut()?;
self.0[slice_idx] = EntitySlice {
start: slice.start,
slice: left,
};
self.0.push(EntitySlice {
start: idx + 1,
slice: right,
});
center.entity.as_mut()
} else {
None
}
}
}
しかし、このメソッドには少々問題があります。それは、元の EntityDynIter 変数を変更してしまうということです。使いたいのは仮想関数呼び出しをネストしたときなので、内側の関数を呼び出したときに新しくエントリを除外したコピーを作り、それが返ってきたときに元のデータに戻りたいところです。
これは、残念ながら多少込み入ったコードになります。
まず、登場するライフタイムが2つに増えます。 'a
は元の EntityList のライフタイム、 'b
は EntityDynIter のライフタイムになります。この2つの間にはライフタイムの包含関係があるので、 where 'a: 'b
としてライフタイム境界を指定しています。
このライフタイムを分ける理由は、新しい EntityDynIter 変数と呼び出し元の EntityDynIter のライフタイムを一致させることができないからです。どちらの EntityDynIter も元のスライスへのミュータブル参照を持っているので、一つのスコープ内に共存することができません。新しい EntityDynIter は、 reborrow によって存在できるわけですが、同じライフタイムで両者を表現してしまうと、すべてのコードで両者が同時に存在してしまうので reborrow できません。
impl<'a> EntityDynIter<'a> {
fn exclude_copy<'b>(
&'b mut self,
id: EntityId,
) -> Option<(Option<&'b mut Entity>, EntityDynIter<'b>)>
where
'a: 'b,
{
let idx = id.id as usize;
if let Some((slice_idx, _)) = self
.0
.iter()
.enumerate()
.find(|(_, slice)| slice.start <= idx && idx < slice.start + slice.slice.len())
{
let slice_borrow = &self.0[slice_idx];
let entry = &slice_borrow.slice[idx - slice_borrow.start];
if entry.gen != id.gen || entry.entity.is_none() {
return Some((
None,
EntityDynIter(self.0.iter_mut().map(|i| i.clone()).collect()),
));
}
// [slice_0] [slice_1] .. [left..center..right] .. [slice_i+1] .. [slice_n]
// to
// [slice_0] [slice_1] .. [left] [right] .. [slice_i+1] .. [slice_n]
// and center
let (left_slices, right_slices) = self.0.split_at_mut(slice_idx);
let (slice, right_slices) = right_slices.split_first_mut()?;
let (left, right) = slice.slice.split_at_mut(idx - slice.start);
let (center, right) = right.split_first_mut()?;
let left_slices = left_slices
.iter_mut()
.map(|i| i.clone())
.collect::<Vec<_>>();
let mut slices = left_slices;
slices.push(EntitySlice {
start: slice.start,
slice: left,
});
slices.push(EntitySlice {
start: idx + 1,
slice: right,
});
slices.extend(right_slices.iter_mut().map(|i| i.clone()));
Some((center.entity.as_mut(), EntityDynIter(slices)))
} else {
None
}
}
さて、ここまででスライスのデータ構造を構築する準備ができたので、実際にイテレートするときに使うメソッドを実装します。
このメソッドは Entity に加えて EntityId もセットでイテレートするものです。
注目すべき点は返り値の型が impl Iterator<...>
となっているところでしょうか。実際、スライスと Vec がネストされたデータ構造を順番にイテレートするロジックを書くのは面倒ですが、ここでは flat_map
や filter_map
といったコンビネータが必要な型を自動生成してくれます。しかも impl Trait
は monomorphize されているので、動的ディスパッチのオーバーヘッドがなく、インライン化による最適化が期待できます。このようなメソッドを書く時には Rust のイテレータプロトコルの威力を感じます。
impl<'a> EntityDynIter<'a> {
fn dyn_iter_id(&self) -> impl Iterator<Item = (EntityId, &Entity)> + '_ {
self.0
.iter()
.flat_map(move |slice| {
let start = slice.start;
slice
.slice
.iter()
.enumerate()
.map(move |(i, val)| (i + start, val))
})
.filter_map(|(id, val)| {
Some((
EntityId {
id: id as u32,
gen: val.gen,
},
val.entity.as_ref()?,
))
})
}
}
しかし、これをミュータブル版にしようとすると問題が生じます。
素直な実装は次のようになるでしょうが、このコードはコンパイルできません。
impl<'a> EntityDynIter<'a> {
fn dyn_iter_mut_id(
&mut self,
) -> impl Iterator<Item = (EntityId, &mut Entity)> + '_ {
self.0
.iter_mut()
.flat_map(move |slice| {
let start = slice.start;
slice
.slice
.iter_mut()
.enumerate()
.map(move |(i, val)| (i + start, val))
})
.filter_map(|(id, val)| {
Some((
EntityId {
id: id as u32,
gen: val.gen,
},
val.entity.as_mut()?,
))
})
}
}
おそらく Subtyping and Variance が関わっていると思うのですが、正直なところ分かりません。 &'a T
は T
に対して covariant なのに対して、 &'a mut T
は T
に対して invariant ですので、それぐらいしか考えられる違いはないような気がします。
この場合、 Rust のコンパイラのメッセージはあまり助けにはなりません。ライフタイムを追加で設定してみたり、ライフタイム境界を入れたりしてみましたが、どうにもコンパイルできるようにはなりませんでした。
とりあえずの回避策として、前回使った DynIterMut
トレイトを使ってみます。 EntityDynIter
に対してこのトレイトを実装します。
impl<'a> DynIter for EntityDynIter<'a> {
type Item = Entity;
fn dyn_iter(&self) -> Box<dyn Iterator<Item = &Self::Item> + '_> {
Box::new(
self.0
.iter()
.flat_map(|slice| slice.slice.iter().filter_map(|s| s.entity.as_ref())),
)
}
fn as_dyn_iter(&self) -> &dyn DynIter<Item = Self::Item> {
self
}
}
impl<'a> DynIterMut for EntityDynIter<'a> {
fn dyn_iter_mut(&mut self) -> Box<dyn Iterator<Item = &mut Self::Item> + '_> {
Box::new(
self.0
.iter_mut()
.flat_map(|slice| slice.slice.iter_mut().filter_map(|s| s.entity.as_mut())),
)
}
}
これで、 EntityDynIter
変数があったときに dyn_iter()
もしくは dyn_iter_mut()
メソッドを呼ぶことで再始動可能なイテレータを生成することができます。ちょっと不便な点としては、 EntityId
も併せてイテレートすることができないことですが、必要とあらば DynIter
トレイトを修正することもできるでしょう。
ただし、 DynIter
トレイトの欠点は、常にヒープに確保されたイテレータのトレイトオブジェクトを必要とすることで、動的メモリのオーバーヘッドが存在します。そのため、ミュータブルである必要がなければ、 impl Trait
を返す dyn_iter_id()
を使った方がよいでしょう。
さらなる最適化
さて、ここまでの実装で任意の数のスライスをイテレートできる型ができましたが、まだ少し望ましくない点があります。それは EntityIter
がスライスの Vec として実装され、必ずヒープメモリを使うということです。
struct EntityDynIter<'a>(Vec<EntitySlice<'a>>);
ここでは、 smallvec クレートを使って最適化してみます。
smallvec クレートは、 SmallVec
という型を提供します。これは Vec のように振舞うが、一定の数の要素まではヒープではなく、インラインで確保するというものです。 C++ の Small string optimization のようなものです。 Rust の Vec は要素数がたとえ1であってもヒープに確保される動的配列として規定されているので、このような最適化は自動的には行えません。しかし、我々の使い道では、ほとんどの場合で2つのスライスを含む Vec で十分なので、2つ分のバッファをインラインで確保できれば十分な効果が期待できます。
効果の代償としては、 EntityDynIter
型の変数そのもののサイズが大きくなることがありますが、スライス2つ分なのでそれほど大きなものではありません。私の環境では Vec
を使うと 24 バイト、 SmallVec
で 64 バイトでした。もう一つの代償としては、サイズによって処理が分岐するので、分岐予測に負荷がかかりますが、これは動的メモリを使わなくて済むことに比べたら大したものではないでしょう。
これを実行するには、 SmallVec
型と smallvec
マクロを use し、
use smallvec::{SmallVec, smallvec};
EntityDynIter
を次のように書き換えます。
struct EntityDynIter<'a>(SmallVec<[EntitySlice<'a>; 2]>);
あとは、Vec
型を使っているところを SmallVec
で、 vec!
マクロを使っているところを smallvec!
で置き換えるだけです。
SmallVec
は Vec
に比べてちょっとだけ不便なところがあります。それはミュータブルな借用をスコープの終わりまで保持するということです。このため、次のようなコードでは、4行目の drop(dyn_itr2)
がないとコンパイルエラーになります。
fn split_test(el: &mut EntityList, exclude_id: EntityId) {
let (_, mut dyn_iter) = EntityDynIter::new_split(el, 1).unwrap();
let (_, dyn_iter2) = dyn_iter.exclude_copy(exclude_id).unwrap();
drop(dyn_iter2);
for entity in dyn_iter.dyn_iter() {
println!("{}", entity.name);
}
}
Vec
でこれが起きないのは Non-lexical lifetimes のおかげだと思うのですが、 SmallVec
だとライフタイムを縮小してくれない理由はよくわかりません。 Drop check がらみだと思うのですが、 Vec
も Drop
トレイトを実装しています。 Vec
がコンパイラに特別扱いされているのでしょうか?
おわりに
さて、今回はだいぶ長くなってしまいましたが、最終的なコードはこちらに置いてあります。
世代型Idアリーナのアイデアはゲーム開発では割とありふれたもので、 Rust では specs、 generational-arena クレートなどで提供されています。 ECS と合わせたデータ指向デザインの一般論(および、オブジェクト指向がなぜうまくいかないか)としてはこの動画がよくまとまっています。
以上のようにこのトピックはそれなりに複雑です(実際、まだ上記の実装は完璧ではないです)。それでも自前で実装することの意義は、どのライブラリも任意の要素を除外したミュータブル参照をイテレートするメソッドを備えていないからです1。正直、それがなくてどうしてゲーム開発ができるのか疑問です。
もちろん、メッセージキューなどを使って複数の Entity を同時に更新するのと同様の効果を得ることはできます。しかし、それは同等の C++ が単純にエイリアシングを無視してアクセスできるのと比較すると不利です。