0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Fractional Cascading

0
Last updated at Posted at 2026-03-15

Fractional Cascading

FD-Tree

FD-Tree (Flash Disk Tree)と呼ばれる2010年ごろに提案された B-Tree Variantsが存在します。
このTreeの目的としてはSSDへのアクセスをシークエンシャルにすることで、従来のランダムなアクセスで発生したWrite AmplificationやGarbage Collectionの回数を下げるというものです。
このTreeではキーを効率よく検索するため、Fractional Cascadingと呼ばれる複数のレベルにわたる配列の値を効率よく探すための方法が採用されています。この記事ではその実際の動き方についてまとめています。(FD-Treeを1度に理解するのが自分では難しかったので、要素分解したい意図が大きいです)

Fractional Cascading

概要

Fractional Cascadingとはざっくり言うと、複数の配列が存在する場合に各要素を同じ値を持つ1つの配列から別の配列の同じ値を持つ要素に向けてBridgeと呼ばれる形で繋ぐことで、効率的な値の検索を実現するアルゴリズムのことです。
Bridgeとなる要素は、一定の間隔(N)で決められます。例えばN=2の場合、array[1], array[3], array[5]... がBridgeとなる要素になります。 (array[i*N-1] for i in range(1, size_of_array / N ))
例を示して動きを見てみます。

Example

A1, A2, A3の3つの配列があり、それぞれLevel 0, Level 1, Leve 2とします。
前提:各配列はソート済み

A1 = [8, 18, 27, 40] - Level 0 
A2 = [10, 20, 25, 35, 50] - Level 1
A3 = [5, 12, 22, 28, 29, 45] - Level 2

Fractional Cascading構造の作成

  1. Bridges(配列 at level[i] -> 配列 at level[i+1]へのポインター)を作成する。ここではN=2とします
  • Step 1 — Pull from A3 → A2
    A3の配列から2個ごとに要素をA2にコピーをし、A3→A2の要素に向けてポインターを作成します。

    • コピーする要素: [12, 28, 45]
    • コピー後の配列: A2 = [10, 12, 20, 25, 28, 35, 45, 50]
    • ポインター
      • A2[1] -> A3[1]
      • A2[4] -> A3[3]
      • A2[6] -> A3[5]
  • Step 2 — Pull from A2 → A1
    A2の配列から2個ごとに要素をA1にコピーします。

    • コピーする要素: [12, 25, 35, 50]
    • コピー後の配列: A1 = [8, 12, 18, 25, 27, 35, 40, 50]
    • ポインター
      • A1[1] -> A2[1]
      • A1[3] -> A2[3]
      • A1[5] -> A2[5]
      • A1[7] -> A2[7]

作成された構造

上記の2つのステップの後の全体構造(配列 at levels[i] -> 配列 at levels[i+1]に向けてポインタ(Bridge)が作成されている)

A1 = [8, 12, 18, 25, 27, 35, 40, 50]
         ↓       ↓        ↓       ↓
         |       |        |       |
A2 = [10,12, 20, 25, 28, 35, 45, 50]
          ↓          ↓       ↓
          |          |       |
A3 = [5, 12, 22,    28, 29, 45]

fractional_cascading_structure.png

キーの検索

x = 29が3つの配列に含まれているかを上記の構造を利用して検索します。
検索は配列のレベルが低いほうから高いほうに向けて行います。(配列 at levels[i] -> 配列 at levels[i+1]、この例ではA1 -> A2 -> A3の順番]

Step 1 — A1内の検索

一番レベルが低い(Level0)のA1の配列に29のpredecessor(x <= 29となる最大の値)が存在するかを検索します。ここでは配列がソートされているためBinary Searchを実行し値を検索します。この配列では27がpredecessorとなります。
Binary SearchのためTime Complexityはlog(M)となります。(配列の要素数をMとする)

Step 2 — A1内でBridgeの検索

predecessorである27がBridgeであるか(Level1の配列A2に向けてポインターを作成しているか)を確認します。この例の場合Bridgeではないため、27から右の方向に(インデックスの大きい方向に)順にBridge要素を検索していきます。(27 < 29のため)
この例では27の右隣の35がBridgeであることを確認します。
Time Complexityはpredecessorと右隣の要素からbridgeが存在するかどうかの2回の確認で、log(1)となります。ここでの検索する要素の数は最大でNとなります。

Step3 - Bridgeから次の配列への移動し、A2内で検索

A1[5] -> A2[5]のポインターに従って、A2[5]に到着します。A2[5]を起点として、A2内のpredecessorを検索します。
まず起点であるA2[5](=35)29を比較します。35 > 29のため、これより左の要素を順に検索していきます。左隣の要素A2[4](28)29を比較します。28 < 29 のため28がpredecessorとなります。次にpredecessorとなったA2[4]を起点としてBridgeを検索します。ここではA2[4](28)がBridgeとなります。Time ComplexityはA2[4](28)がBridgeであるかどうかの確認で、log(1)となります。

Step4 - Bridgeから次の配列への移動し、A3内で検索

A2[4] -> A3[3]のポインターに従って、A3[3]に到着します。A3[3]を起点にして、predecessorを検索します。
まず起点であるA3[3](=28)29を比較します。28 < 29のため、これより右の要素を順に検索していきます。右隣の要素A3[4](=29)29を比較します。29 = 29 のため配列に29が存在していることがわかりました。Time Complexityはpredecessorと右隣の要素との比較でlog(1)となります。

fractional_cascading_search.png

コスト

  • A1でのBinary Searchによる検索 - O(logM)
  • A1でのBridge検索 - O(1)
  • A2でのBridge検索 - O(1)
  • A3でのBridge検索 - O(1)

k = 配列の数とすると合計 O(logM + k)になります。

Bridgeを使用しない場合は、A1->A2->A3と順番にBinary Searchを実行してキーを検索するためTime ComplexityはO(logM * k)となります。

Code

サンプルコード
from dataclasses import dataclass
from typing import Optional


@dataclass
class Item:
    level: int
    value: int
    bridged_index: Optional[int] = None


def transform_items(level: int, run: list[int]) -> list[Item]:
    return [Item(level, value) for value in run]


def binary_search_predecessor(run: list[Item], value: int) -> tuple[Optional[int], int]:
    """
    Return:
      (exact_value_or_predecessor_value, predecessor_index)

    If value exists, predecessor_index is the exact-match index.
    If value does not exist, predecessor_index is the index of the
    largest element <= value.
    If value is smaller than all elements, returns (None, -1).
    """
    if not run:
        return None, -1

    left = 0
    right = len(run) - 1
    pred_index = -1

    while left <= right:
        mid = (left + right) // 2
        mid_value = run[mid].value

        if mid_value == value:
            return mid_value, mid
        elif mid_value < value:
            pred_index = mid
            left = mid + 1
        else:
            right = mid - 1

    if pred_index == -1:
        return None, -1

    return run[pred_index].value, pred_index


def sample_lower_run(lower_run: list[Item], n: int, start: int = 1) -> list[Item]:
    """
    Pull every n-th element from the lower run.
    Each pulled-up item keeps a bridge to its original index in lower_run.
    """
    sampled: list[Item] = []

    for lower_index in range(start, len(lower_run), n):
        sampled.append(
            Item(
                level=lower_run[lower_index].level - 1,
                value=lower_run[lower_index].value,
                bridged_index=lower_index,
            )
        )

    return sampled


def merge_upper_with_sampled(upper_run: list[Item], sampled: list[Item]) -> list[Item]:
    """
    Linear merge of two sorted lists:
      - upper_run: original items already in the upper level
      - sampled: pulled-up items from the lower level, each with a bridged_index

    If the same value exists in both, keep the original upper item and skip
    the pulled-up duplicate.
    """
    merged: list[Item] = []
    i = 0
    j = 0

    while i < len(upper_run) and j < len(sampled):
        upper_value = upper_run[i].value
        sampled_value = sampled[j].value

        if upper_value < sampled_value:
            merged.append(upper_run[i])
            i += 1
        elif upper_value > sampled_value:
            merged.append(sampled[j])
            j += 1
        else:
            merged.append(upper_run[i])
            i += 1
            j += 1

    while i < len(upper_run):
        merged.append(upper_run[i])
        i += 1

    while j < len(sampled):
        merged.append(sampled[j])
        j += 1

    return merged


def build_data_structure(runs: dict[int, list[int]], max_level: int, n: int) -> dict[int, list[Item]]:
    """
    Build fractional cascading structure bottom-up.

    Steps:
      1. Convert raw ints to Item objects.
      2. For each level from bottom to top:
         - sample every n-th item from lower run
         - merge sampled items into upper run in linear time
    """
    item_runs: dict[int, list[Item]] = {}

    for level in range(max_level + 1):
        item_runs[level] = transform_items(level, runs[level])

    for level in range(max_level, 0, -1):
        lower_run = item_runs[level]
        upper_run = item_runs[level - 1]

        sampled = sample_lower_run(lower_run, n=n, start=1)
        item_runs[level - 1] = merge_upper_with_sampled(upper_run, sampled)

    return item_runs


def print_runs(runs: dict[int, list[Item]]) -> None:
    for level in sorted(runs.keys()):
        print(f"Level {level}:")
        for idx, item in enumerate(runs[level]):
            print(
                f"  idx={idx:2d}, value={item.value:2d}, "
                f"bridged_index={item.bridged_index}"
            )
        print()


def find_bridge_index(built: dict[int, list[Item]], idx: int, level: int) -> int:
    """
    Find the nearest bridge to the next level starting from position idx.

    Search forward first (for elements >= current position),
    then backward if no forward bridge found.

    Returns the bridged_index pointing to the next level, or -1 if no bridge found.
    """
    run = built[level]
    run_len = len(run)

    # Search forward from idx
    for i in range(idx, run_len):
        if run[i].bridged_index is not None:
            return run[i].bridged_index

    # Search backward from idx-1 if no forward bridge found
    for i in range(idx - 1, -1, -1):
        if run[i].bridged_index is not None:
            return run[i].bridged_index

    return -1


def local_search_predecessor(run: list[Item], start_idx: int, search_key: int, window: int) -> int:
    """
    Search for the predecessor of search_key in a local window around start_idx.

    The window size is determined by the sampling rate n.
    Returns the index of the largest element <= search_key within the window,
    or -1 if no such element exists.
    """
    run_len = len(run)

    # Calculate search bounds
    left_bound = max(0, start_idx - window)
    right_bound = min(run_len - 1, start_idx + window)

    # Find the largest element <= search_key in the window
    best_idx = -1

    for i in range(left_bound, right_bound + 1):
        if run[i].value <= search_key:
            if best_idx == -1 or run[i].value > run[best_idx].value:
                best_idx = i

    return best_idx


def search(
    built: dict[int, list[Item]],
    num_levels: int,
    search_key: int,
    n: int
) -> list[tuple[int, Optional[int]]]:
    """
    Search for search_key across all levels using fractional cascading.

    Returns a list of (level, index) tuples where:
      - index is the position of the exact match if found
      - index is the position of the largest element <= search_key (predecessor)
      - index is None if search_key is smaller than all elements in that level

    Time complexity: O(log m + k) where m is the size of level 0 and k is number of levels.
    """
    results: list[tuple[int, Optional[int]]] = []

    if not built or 0 not in built or not built[0]:
        return results

    # Binary search on level 0
    _, idx = binary_search_predecessor(built[0], search_key)

    if idx == -1:
        results.append((0, None))
    else:
        results.append((0, idx))

    current_idx = idx if idx >= 0 else 0

    # Cascade through remaining levels
    for level in range(num_levels):
        # Find bridge to next level
        bridge_idx = find_bridge_index(built, current_idx, level)

        if bridge_idx == -1:
            # No bridge found, need to do binary search on next level
            _, next_idx = binary_search_predecessor(built[level + 1], search_key)
            results.append((level + 1, next_idx if next_idx >= 0 else None))
            current_idx = next_idx if next_idx >= 0 else 0
        else:
            # Use bridge and do local search
            next_idx = local_search_predecessor(built[level + 1], bridge_idx, search_key, n)
            results.append((level + 1, next_idx if next_idx >= 0 else None))
            current_idx = next_idx if next_idx >= 0 else bridge_idx

    return results


def find_exact_match(
    built: dict[int, list[Item]],
    num_levels: int,
    search_key: int,
    n: int
) -> Optional[tuple[int, int]]:
    """
    Search for an exact match of search_key across all levels.

    Returns (level, index) of the first exact match found, or None if not found.
    """
    results = search(built, num_levels, search_key, n)

    for level, idx in results:
        if idx is not None and built[level][idx].value == search_key:
            return level, idx

    return None


def main(runs: dict[int, list[int]], num_levels: int, search_key: int, n: int) -> Optional[tuple[int, int]]:
    """
    Build the fractional cascading structure and search for search_key.

    Returns (level, index) if exact match found, otherwise (None, None).
    """
    built = build_data_structure(runs, max_level=num_levels, n=n)
    print_runs(built)

    # Search across all levels
    results = search(built, num_levels, search_key, n)

    print(f"\nSearch results for key {search_key}:")
    for level, idx in results:
        if idx is not None:
            value = built[level][idx].value
            match_type = "EXACT" if value == search_key else "predecessor"
            print(f"  Level {level}: idx={idx}, value={value} ({match_type})")
        else:
            print(f"  Level {level}: no predecessor found")

    # Check for exact match
    exact_match = find_exact_match(built, num_levels, search_key, n)

    if exact_match:
        level, idx = exact_match
        print(f"\nSearch key {search_key} found at level {level}, index {idx}")
        return level, idx
    else:
        print(f"\nSearch key {search_key} not found (no exact match)")
        return None, None


if __name__ == "__main__":
    runs = {
        0: [8, 18, 27, 40],
        1: [10, 20, 25, 35, 50],
        2: [5, 12, 22, 28, 29, 45],
    }

    n = 2
    num_levels = 2
    search_key = 29
    main(runs, num_levels, search_key, n)

まとめ

Bridgeという形で連結された配列を使用することで、効率的にキーの検索ができることが分かりました。

参考文献

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?