はじめに
移動中央値を求めなければならない事は人生に一度くらいありますが、処理窓をスライドさせる度に愚直にソートして中央値を求めていると非常に時間がかかります。
二分ヒープ木とキューを組み合わせる事で、移動最大値や移動最小値を高速に求める方法は広く知られていると思います。それらと同じアイディアに基づき、本記事では二分ヒープ木を二つ使って移動中央値を高速に求めるアイディアとC言語による実装を提示します。
おそらくアイディア自体はみんな思いつくでしょうけど、C言語で実装するとなると面倒くさくなって「まあいっか、遅くても結果が求まれば」ってなるやつです。
結論
- C言語で二分ヒープ木とキューを組み合わせて移動中央値を高速($O(log N)$)に求めてみた
- コード量はそこそこ多い
- 固定サイズのメモリで実装できる
- 実装は面倒なので今後はコピペで済ませたい
- 使う機会はあまり無いと思う
- 別言語だったらメモリサイズはともかく標準のライブラリで同じような事ができると思う
アイディアの骨子
二分ヒープ木
二分ヒープ木は、上から下に向かって値が大きい順になる、もしくは小さい順になるヒープ条件を満たす「平衡(へいこう)二分木」です。その特性上、配列で実現できます。値が大きい順になるか小さい順になるかは条件判定を差し替えれば容易に切り替えられます。
二分ヒープ木と中央値
中央値を求める際は、ヒープ木を二つ用意して、片方を大きい値がルートにくるように、もう片方を小さい値がルートにくるように管理します。それぞれのヒープのルートが中央値になります。
大きい値がルートにくる方を本記事では左手ヒープ木、小さい値がルートにくる方を右手ヒープ木と名付けます。左手ヒープ木に所属する全ての要素は、右手ヒープ木に所属する全ての要素以下である事を保証できます。
以下の記事で説明されている考え方と基本的なアイディアは全く同じです。
移動中央値
データ列に対して、長さNの処理窓をスライドしながら、処理窓内で都度求めた中央値の事をここでは移動中央値と呼びます。移動平均値の中央値版です。
処理窓に対して値の出入りがあるので、これをリングバッファによるキューで実現しつつ、内部ではさらに二分ヒープ木を使って中央値をいつでも計算量$O(1)$で取り出せるようにしておきます。
値のキュー領域と、二分ヒープ木を構築する領域、さらに、二分ヒープ木にいれた順序を覚えておくためのキュー領域が必要です。二分ヒープ木の要素とその順序を覚えておくキューの間では、二分ヒープ木の各要素とキュー要素の相互参照を維持し、Dequeue時に二分ヒープ木中のどの要素が出ていくのかを$O(1)$で特定するために使います。
相互参照を成立させるために、二分ヒープ木に入れる値は正味の値ではなく、キュー内のインデックス値です。また、二分ヒープ木に入れた要素の入力順をキューイングして覚えておくため、二分ヒープ木の要素を指し示すインデックスをキューイングしておきます。
大原則として、Enqueue処理およびDequeue処理では、右手ヒープ木の要素数が左手ヒープ木の要素数と同じか1だけ多い状態を維持するように、左右ヒープ木間で要素の受け渡しをして操作します。この左右ヒープ木間の要素数の条件を、本記事ではバランス制約と勝手に呼びます。
二分ヒープ木の要素と、それをキューにいれた順序との相互参照を維持するところが非常に面倒で、思った以上に複雑な実装になってしまいました。
C言語ソースコード
公開API
/* Include guard */
# ifndef MEDIAN_QUEUE_H_INCLUDED
# define MEDIAN_QUEUE_H_INCLUDED
/*****************************************************************************
* Header Include
*****************************************************************************/
# include <stdint.h>
/*****************************************************************************
* Type Definition
*****************************************************************************/
typedef int32_t MedianQueueElement;
typedef struct MedianQueue_ {
uint32_t max_elements_count_;
MedianQueueElement* buffer_for_elements_;
uint32_t* buffer_for_heap_indices_;
uint32_t* lh_heap_;
uint32_t* rh_heap_;
uint32_t wp_;
uint32_t rp_;
} MedianQueue;
/*****************************************************************************
* Variable Declaration
*****************************************************************************/
/*****************************************************************************
* Function Declaration
*****************************************************************************/
# ifdef __cplusplus
extern "C" {
# endif
void MedianQueue_Construct(
MedianQueue* mq_object_memory,
MedianQueueElement* buffer_for_elements,
uint32_t* buffer_for_heap_indices,
uint32_t* buffer_for_heap,
uint32_t max_elements_count);
void MedianQueue_Destruct(MedianQueue* mq);
uint32_t MedianQueue_GetNumElements(const MedianQueue* mq);
uint32_t MedianQueue_GetNumBlanks(const MedianQueue* mq);
MedianQueueElement MedianQueue_GetLowerMedian(
const MedianQueue* mq);
MedianQueueElement MedianQueue_GetUpperMedian(
const MedianQueue* mq);
int MedianQueue_IsEnqueueable(
const MedianQueue* mq);
int MedianQueue_IsDequeueable(
const MedianQueue* mq);
void MedianQueue_Enqueue(
MedianQueue* mq, MedianQueueElement element);
MedianQueueElement MedianQueue_Dequeue(
MedianQueue* mq);
# ifdef __cplusplus
}
# endif
# endif /* MEDIAN_QUEUE_H_INCLUDED */
固定長リングバッファを利用したキューのAPIとして、固定長リングバッファでキューとスタックを実現するの設計を踏襲しています。
モジュール名はMedianQueue
としました。初めは二分ヒープ木で実装するのでBinaryHeapQueue
のような名前をつけていましたが、移動最大値や移動最小値を求めたい時に同じように二分ヒープ木とキューを組み合わせた時に困りますし、この実装を、移動中央値を逐次算出するキューの決定版としたいという思いもあって、MedianQueue
としています。
単純なキューとスタックの実装と比べて、二分ヒープ木と、その要素の入力順を保持するための管理領域が増えており、インスタンス構築の方法がやや複雑です。以下はインスタンス構築の例です。
# define MAX_WINDOW_SIZE (5)
MedianQueue mq[1];
MedianQueueElement buffer_for_elements[MAX_WINDOW_SIZE];
uint32_t heap_indices[MAX_WINDOW_SIZE];
uint32_t heap[MAX_WINDOW_SIZE];
MedianQueue_Construct(
mq, buffer_for_elements, heap_indices, heap, MAX_WINDOW_SIZE);
中央値を取得する関数は二つあります。
MedianQueueElement MedianQueue_GetLowerMedian(
const MedianQueue* mq);
MedianQueueElement MedianQueue_GetUpperMedian(
const MedianQueue* mq);
キュー内の要素数が偶数の場合、丁度中央に位置する要素は存在しないので、左手ヒープ木のルートをLowerMedian(下側中央値)、右手ヒープ木のルートをUpperMedian(上側中央値)として取得出来るようにしています。キュー内の要素数が奇数の場合、バランス制約により右手ヒープ木の要素数が左手ヒープ木の要素数より必ず1つ多いので、右手ヒープ木のルートが真の中央値です。そのため、LowerMedianとUpperMedianはどちらも右手ヒープ木のルートを返します。
それぞれの中央値が具体的に左右ヒープ木のどの値を返すかは以下の表の通りです。
キュー内の要素数 | LowerMedian | UpperMedian |
---|---|---|
偶数 | 左手ヒープ木のルート | 右手ヒープ木のルート |
奇数 | 右手ヒープ木のルート | 右手ヒープ木のルート |
キュー内の要素数が偶数の時はLowerMedianとUpperMedianの平均値を中央値とする定義があるようなので、ユーザーコード側でどのように扱うか選択の余地を残したAPIとなっています。常に$\frac{LowerMedian+UpperMedian}{2}$で中央値を計算すれば、ユーザーコードで偶数奇数に合わせて実装を変える必要はなくなります。もちろん、得られた中央値を浮動小数点数として扱うのかどうかと言った事はユーザーコード側で考えるべきです。
実装
ソースコードはここにあります。
考え方として自明でない点は、左手ヒープ木の最大要素数をキューの$最大要素数//2$、右手ヒープ木の最大要素数をキューの$(最大要素数+1)//2$で構築している点です。左右ヒープ木の最大要素数を合わせると、ちゃんとキュー全体の最大要素数と一致します。よって、キューの最大要素数$N$に対して、二分ヒープ木の管理領域の要素数も$N$で足ります。(//
はpythonにおける切り捨て除算と同じ意味で用いています。)
また、Dequeue時に二分ヒープ木のインデックスを内部のインデックスキューから取り出しますが、左右どちらのヒープ木から取り出したのかを判別できるように、右手ヒープ木の要素インデックスにオフセットを足して記録しています。ここで用いるオフセットは、左手ヒープ木の要素インデックスが超える事のない$N//2$($N$はキューの最大要素数)です。
Enqueue
void MedianQueue_Enqueue(
MedianQueue* mq, MedianQueueElement element)
{
uint32_t const num_elements = MedianQueuePrivate_GetNumElements(mq);
uint32_t const new_leaf_index_in_lh_heap = num_elements / 2U;
uint32_t const new_leaf_index_in_rh_heap = (num_elements + 1U) / 2U;
uint32_t const heap_index_offset = MedianQueuePrivate_GetRightHandHeapIndexOffset(mq);
uint32_t const rounded_wp = mq->wp_ % mq->max_elements_count_;
/* 値をキューに入れる */
mq->buffer_for_elements_[rounded_wp] = element;
/* 内部ヒープ木の操作 */
if (num_elements == 0U) {
/* 最初の一つ目の要素追加は右手ヒープ木に追加して終了 */
mq->buffer_for_heap_indices_[rounded_wp] = new_leaf_index_in_rh_heap + heap_index_offset;
mq->rh_heap_[new_leaf_index_in_rh_heap] = rounded_wp;
MedianQueuePriavte_RHUpHeap(mq, new_leaf_index_in_rh_heap);
} else if ((num_elements % 2U) != 0U) {
/* 要素数が奇数=右手ヒープ木が大きい */
MedianQueueElement upper_median = MedianQueue_GetUpperMedian(mq);
if (upper_median < element) {
uint32_t index_in_queue = mq->rh_heap_[0U];
/* RHのルートを取り出してLHの末尾へ追加してUpHeap */
mq->lh_heap_[new_leaf_index_in_lh_heap] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = new_leaf_index_in_lh_heap;
MedianQueuePriavte_LHUpHeap(mq, new_leaf_index_in_lh_heap);
/* 新しい要素をRHのルートに入れてDownHeap */
mq->buffer_for_heap_indices_[rounded_wp] = 0U + heap_index_offset;
mq->rh_heap_[0U] = rounded_wp;
MedianQueuePriavte_RHDownHeap(mq, 0U);
} else {
/* 新しい要素をLHの末尾に追加してUpHeap */
mq->lh_heap_[new_leaf_index_in_lh_heap] = rounded_wp;
mq->buffer_for_heap_indices_[rounded_wp] = new_leaf_index_in_lh_heap;
MedianQueuePriavte_LHUpHeap(mq, new_leaf_index_in_lh_heap);
}
} else {
/* 要素数が偶数=左右ヒープ木の要素数が同じ */
MedianQueueElement lower_median = MedianQueue_GetLowerMedian(mq);
if (element < lower_median) {
uint32_t index_in_queue = mq->lh_heap_[0U];
/* LHのルートを取り出してRHの末尾へ追加してUpHeap */
mq->rh_heap_[new_leaf_index_in_rh_heap] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = new_leaf_index_in_rh_heap + heap_index_offset;
MedianQueuePriavte_RHUpHeap(mq, new_leaf_index_in_rh_heap);
/* 新しい要素をLHのルートに入れてDownHeap */
mq->lh_heap_[0U] = rounded_wp;
mq->buffer_for_heap_indices_[rounded_wp] = 0U;
MedianQueuePriavte_LHDownHeap(mq, 0U);
} else {
/* 新しい要素をRHの末尾に追加してUpHeap */
mq->rh_heap_[new_leaf_index_in_rh_heap] = rounded_wp;
mq->buffer_for_heap_indices_[rounded_wp] = new_leaf_index_in_rh_heap + heap_index_offset;
MedianQueuePriavte_RHUpHeap(mq, new_leaf_index_in_rh_heap);
}
}
/* 処理の途中で要素数が変化しないように最後に要素数を更新する */
mq->wp_++;
return;
}
追加前の要素数が偶数か奇数か、左手ヒープ木に追加するか右手ヒープ木に追加するかで分岐します。
偶数かつ左手ヒープ木へ追加する場合
バランス制約を守るために、左手ヒープ木のルートから右手ヒープ木に移動した後、左手ヒープ木に新しい要素を追加します。
偶数かつ右手ヒープ木へ追加する場合
右手ヒープ木にそのまま新しい要素を追加します。右手ヒープ木の要素数が左手ヒープ木の要素数より1つ多いだけなので、バランス制約は守られたままです。
奇数かつ左手ヒープ木へ追加する場合
左手ヒープ木にそのまま新しい要素を追加します。左右でヒープ木の要素数が同じになり、バランス制約は守られたままです。
奇数かつ右手ヒープ木へ追加する場合
バランス制約を守るために、右手ヒープ木のルートから左手ヒープ木に移動した後、右手ヒープ木に新しい要素を追加します。
Dequeue
MedianQueueElement MedianQueue_Dequeue(
MedianQueue* mq)
{
MedianQueueElement element;
uint32_t num_elements = MedianQueuePrivate_GetNumElements(mq);
uint32_t heap_index_offset = MedianQueuePrivate_GetRightHandHeapIndexOffset(mq);
uint32_t index_in_heap = mq->buffer_for_heap_indices_[mq->rp_];
/* キューから値を取り出す */
element = mq->buffer_for_elements_[mq->rp_];
/* 内部ヒープ木の操作 */
if ((num_elements % 2U) != 0U) {
/* 要素数が奇数=右手ヒープ木が大きい */
uint32_t tail_index_in_rh_heap = MedianQueuePrivate_GetTailIndexInRightHandHeap(mq);
if (heap_index_offset <= index_in_heap) {
/* RHから要素を取り出したので取り出した箇所にRHの末尾を移動する */
index_in_heap -= heap_index_offset;
if (index_in_heap != tail_index_in_rh_heap) {
uint32_t index_in_queue;
index_in_queue = mq->rh_heap_[tail_index_in_rh_heap];
mq->rh_heap_[index_in_heap] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = index_in_heap + heap_index_offset;
/* 埋めた箇所を起点にRHをDownHeap */
if (!MedianQueuePriavte_RHDownHeap(mq, index_in_heap)) {
/* DownHeapで変化がなかったら埋めた箇所を起点にRHをUpHeap */
MedianQueuePriavte_RHUpHeap(mq, index_in_heap);
}
}
} else {
/* LHから要素を取り出した */
/* 左右バランスのためにLHの取り出した箇所にRHのルートを移動する */
uint32_t index_in_queue;
index_in_queue = mq->rh_heap_[0];
mq->lh_heap_[index_in_heap] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = index_in_heap;
/* RHの要素はLHの全ての要素より大きいので、DownHeapで木が変化しない事は自明 */
/* 埋めた箇所を起点にLHをUpHeap */
MedianQueuePriavte_LHUpHeap(mq, index_in_heap);
/* RHの末尾をRHのルートに移動してDownHeap */
if (tail_index_in_rh_heap != 0U) {
index_in_queue = mq->rh_heap_[tail_index_in_rh_heap];
mq->rh_heap_[0] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = 0U + heap_index_offset;
MedianQueuePriavte_RHDownHeap(mq, 0U);
}
}
} else {
/* 要素数が偶数=左右ヒープ木の要素数が同じ */
uint32_t tail_index_in_lh_heap = MedianQueuePrivate_GetTailIndexInLeftHandHeap(mq);
if (heap_index_offset <= index_in_heap) {
uint32_t index_in_queue;
/* RHから要素を取り出した */
/* 取り出した箇所にLHのルートを移動する */
/* LHの要素はRHの全ての要素以下なので、DownHeapで木が変化しない事は自明 */
index_in_heap -= heap_index_offset;
index_in_queue = mq->lh_heap_[0];
mq->rh_heap_[index_in_heap] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = index_in_heap + heap_index_offset;
/* 埋めた箇所を起点にRHをUpHeap */
MedianQueuePriavte_RHUpHeap(mq, index_in_heap);
if (tail_index_in_lh_heap != 0U) {
/* LHの末尾をLHのルートに移動してDownHeap */
index_in_queue = mq->lh_heap_[tail_index_in_lh_heap];
mq->lh_heap_[0] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = 0U;
MedianQueuePriavte_LHDownHeap(mq, 0U);
}
} else {
/* LHから要素を取り出した */
/* 取り出した箇所にLHの末尾を移動する */
uint32_t index_in_queue;
if (index_in_heap != tail_index_in_lh_heap) {
index_in_queue = mq->lh_heap_[tail_index_in_lh_heap];
mq->lh_heap_[index_in_heap] = index_in_queue;
mq->buffer_for_heap_indices_[index_in_queue] = index_in_heap;
/* 埋めた箇所を起点にLHをDownHeap */
if (!MedianQueuePriavte_LHDownHeap(mq, index_in_heap)) {
/* DownHeapで変化がなかったら埋めた箇所を起点にRHをUpHeap */
MedianQueuePriavte_LHUpHeap(mq, index_in_heap);
}
}
}
}
/* 処理の途中で要素数が変化しないように最後に要素数を更新する */
mq->rp_++;
if (mq->max_elements_count_ <= mq->rp_) {
mq->wp_ -= mq->max_elements_count_;
mq->rp_ -= mq->max_elements_count_;
}
return element;
}
Enqueueと同様、要素を取り出す前のキュー内要素数が偶数か奇数か、取り出す要素が左手ヒープ木からなのか右手ヒープ木からなのかで分岐しつつ、バランス制約を守るように左右ヒープ木の要素を移し替えます。
性能
二分ヒープ木の挿入計算量は$O(log N)$なので、十分高速だと言えそうです。比較用の素朴な移動中央値の実装として、処理窓内を毎回クイックソートして中央値を求める実装と性能を比較してみます。
クイックソート版の実装はここにあります。
比較時の注意点として、クイックソートの方は実装が面倒だったのでC標準ライブラリのqsort
を使っています。キューへの入力順序を維持しておらず、Dequeue時にソート後のどの値が出ていくかわからないため、Enqueue時、Dequeue時共にソート用バッファにキュー内要素をコピーして、qsort
でインプレースソートしています。毎回要素数分のコピーが発生するのと、ソート状態を維持していないため全体をソートし直しになる点が、二分ヒープ木によるキューの実装と比べるとアンフェアです。
なお、アンフェアな条件がなかったとしてもクイックソートの計算量は$O(N log N)$なので、二分ヒープ木の挿入$O(log N)$よりも定性的な計算量が$N$倍大きいです。
性能測定に使ったマシンは以下の通りのスペックです。
Hardware Overview:
Model Name: MacBook Pro
Model Identifier: MacBookPro13,3
Processor Name: Quad-Core Intel Core i7
Processor Speed: 2.7 GHz
Number of Processors: 1
Total Number of Cores: 4
L2 Cache (per Core): 256 KB
L3 Cache: 8 MB
Hyper-Threading Technology: Enabled
Memory: 16 GB
Boot ROM Version: 428.0.0.0.0
SMC Version (system): 2.38f11
性能測定では、$N=1,11,21,\ldots,191$のケースについて、$-100$から$100$の間でランダムに生成した$100000$点のデータに対して移動中央値を逐次求め、その時間を計測しました。具体的な計測コードはここです。(面倒だったのでC++標準ライブラリを使って処理時間を測定しています。)
void execute_median_queue_sequence(uint32_t n, int32_t (*output_median_buffer)[NUMBER_OF_random_numbers_ELEMENTS])
{
MedianQueue mq[1];
MedianQueueElement buffer_for_elements[MAX_WINDOW_SIZE];
uint32_t heap_indices[MAX_WINDOW_SIZE];
uint32_t heap[MAX_WINDOW_SIZE];
uint32_t i;
MedianQueue_Construct(
mq, buffer_for_elements, heap_indices, heap, n);
for (i = 0; i < NUMBER_OF_random_numbers_ELEMENTS; i++) {
if (!MedianQueue_IsEnqueueable(mq)) {
(void)MedianQueue_Dequeue(mq);
}
MedianQueue_Enqueue(mq, random_numbers[i]);
(*output_median_buffer)[i] = MedianQueue_GetUpperMedian(mq);
}
MedianQueue_Destruct(mq);
return;
}
typedef void (*ProcessFunction)(uint32_t n, int32_t (*output_median_buffer)[NUMBER_OF_random_numbers_ELEMENTS]);
uint64_t record_time_millisecond(
ProcessFunction process,
uint32_t n, int32_t (*output_median_buffer)[NUMBER_OF_random_numbers_ELEMENTS])
{
std::chrono::system_clock::time_point start;
std::chrono::system_clock::time_point finish;
start = std::chrono::system_clock::now();
process(n, output_median_buffer);
finish = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(finish - start).count();
}
ビルドスクリプトはここです。
計測結果は以下の通りです。単位はマイクロ秒です。二分ヒープ木で実装した移動中央値算出キューの方が速いですが、クイックソート実装の方はアンフェアな条件があるためまだ高速化の余地があります。しかし、やはりクイックソート実装は定性的な計算量の性質通り$O(N log N)$に従っているようなので、二分ヒープ木の実装の方が移動中央値の算出という点ではおそらく高速なようです。
N | Median Queue | Quicksort Queue |
---|---|---|
1 | 1793 | 3291 |
11 | 5494 | 57923 |
21 | 5813 | 149989 |
31 | 6255 | 232708 |
41 | 6092 | 324084 |
51 | 6332 | 424474 |
61 | 6142 | 523216 |
71 | 6698 | 657075 |
81 | 7027 | 739221 |
91 | 7050 | 856525 |
101 | 6911 | 972397 |
111 | 7096 | 1099802 |
121 | 6737 | 1187515 |
131 | 7320 | 1304022 |
141 | 7455 | 1433426 |
151 | 7565 | 1550530 |
161 | 7597 | 1672221 |
171 | 7932 | 1798300 |
181 | 7550 | 1908294 |
191 | 7586 | 2035384 |
二分ヒープ木実装とクイックソート実装を比較すると以下の表の通りと言えそうです。
評価項目 | Median Queue | Quicksort Queue |
---|---|---|
時間計算量 | $O(log N)$で速い | $O(N log N)$で比較的遅い |
空間計算量 | $N\times 3$の領域が必要 | $N \times 2$の領域で済む(ただし順序を記録するなら$N \times 3$となる) |
副産物 | 無し | 同時に最大値と最初値も求まる |
クイックソート実装の方は二分ヒープ木実装と異なり、最大値と最小値が同時に求まるというメリットがあります。一方、二分ヒープ木の実装の方で最大値と最小値を同時に求めようとすると、最大値/最小値用の二分ヒープ木と、入力した順序を保存するキューが必要になるので、ヒープ関連領域がさらに増えて空間計算量が$N + N \times 6$になります。
$N$が大きい場合にこの差は無視できませんし、いくら速くてもメモリに乗らなければ実現不可能なプログラムになってしまいます。逆に、メモリを節約してクイックソート実装にしても、要求されるリアルタイム性によっては遅すぎて使い物にならないケースもあるので、二分ヒープ木実装とクイックソート実装の間には空間計算量と時間計算量のトレードオフが存在します。
応用先
移動中央値の計算はコア技術の開発や実装をしているとたまに必要になります。身近な例での応用先は画像処理におけるメディアンフィルタです。2次元配列に対しても、2次元処理窓のスライド方向と直交する方向で要素をEnqueueすれば応用可能です。
おわりに
理屈ではうまくできそうだと思っていても、実際に実装して理論通りの性能かを検証するのは骨が折れます。また、性能測定については再現可能な環境も用意する必要がありますし、どうせ作るなら一回作っておわりにしたいと考えだすとどうも腰が重くなりがちです。
想像以上に実装が複雑になってしまい、そらで実装できる規模ではなかったので、より効率的なアルゴリズムや、実は簡単に実装できると言った情報をお待ちしています。