1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

C++でパイプライン処理を実装する:基本概念から並列処理まで

1
Posted at

C++でパイプライン処理を実装する:基本概念から並列処理まで

データ処理を「独立したステップの連鎖」として組み立てるパイプラインパターンは、コードの読みやすさと再利用性を大きく高めてくれます。この記事では、C++でパイプライン処理を実装する方法を、基本概念から現代的なC++20 Ranges、さらには並列パイプラインまで段階的に解説します。


パイプライン処理とは?

「一連の処理を、独立したステップの連鎖として組み立てる」設計パターンです。

入力データ → [ステップ1] → [ステップ2] → [ステップ3] → 出力

各ステップは「受け取って・処理して・渡す」だけで、互いの実装を知らなくてよいのがポイントです。


① 素朴な実装:関数の連鎖

まずはシンプルに、関数を順番に呼び出す形で実装します。

#include <iostream>
#include <string>
#include <vector>
#include <algorithm>

std::vector<int> parse(const std::string& csv) {
    // "1,2,3,4,5" → {1, 2, 3, 4, 5}(簡略化)
    return {1, 2, 3, 4, 5};
}

std::vector<int> filter_even(std::vector<int> data) {
    std::vector<int> result;
    std::copy_if(data.begin(), data.end(),
                 std::back_inserter(result),
                 [](int x){ return x % 2 == 0; });
    return result;
}

std::vector<int> double_values(std::vector<int> data) {
    std::transform(data.begin(), data.end(), data.begin(),
                   [](int x){ return x * 2; });
    return data;
}

int main() {
    // パイプライン:順番に呼ぶだけ
    auto result = double_values(filter_even(parse("1,2,3,4,5")));
    // → {4, 8}
}

これがパイプラインの本質ですが、ネストが深くなると右から左に読まなければならず、可読性が下がります。


operator| で読みやすくする

Linuxの | のように、左から右に読めるようにしましょう。

operator| のオーバーロード

template<typename T, typename F>
auto operator|(T value, F func) -> decltype(func(value)) {
    return func(value);
}

a | ff(a) として動作させる、それだけです。

使い方

// 各ステップをラムダとして定義
auto filter_even = [](std::vector<int> data) {
    std::vector<int> result;
    for (int x : data)
        if (x % 2 == 0) result.push_back(x);
    return result;
};

auto double_values = [](std::vector<int> data) {
    for (int& x : data) x *= 2;
    return data;
};

int main() {
    std::vector<int> data = {1, 2, 3, 4, 5};

    // 左から右に読める!
    auto result = data | filter_even | double_values;
    // → {4, 8}
}

引数付きステップ:カリー化

カリー化とは?

カリー化(Currying) とは、「複数の引数を取る関数」を「引数を1つずつ受け取る関数の連鎖」に変換するテクニックです。

// 通常の関数:引数を2つ同時に受け取る
bool greater_than(int x, int threshold) {
    return x > threshold;
}

// カリー化した関数:引数を1つずつ受け取る
auto greater_than_curried = [](int threshold) {
    return [threshold](int x) {  // threshold を覚えた関数を返す
        return x > threshold;
    };
};

// 使い方
auto greater_than_3 = greater_than_curried(3);  // 「3より大きいか判定する関数」が得られる
greater_than_3(5);  // → true
greater_than_3(2);  // → false

パイプラインとの相性が抜群な理由は、operator| は「データを1つ受け取る関数」しか繋げられないからです。カリー化によって引数を先に渡し、「データだけを待つ関数」を作ることで、パイプラインに組み込めます。

「しきい値を指定してフィルタしたい」など、引数を持つステップはカリー化で実現します。

auto filter_greater_than = [](int threshold) {
    // threshold をキャプチャした関数を返す
    return [threshold](std::vector<int> data) {
        std::vector<int> result;
        for (int x : data)
            if (x > threshold) result.push_back(x);
        return result;
    };
};

int main() {
    std::vector<int> data = {1, 2, 3, 4, 5};

    auto result = data
        | filter_greater_than(2)   // → {3, 4, 5}
        | double_values;           // → {6, 8, 10}
}

filter_greater_than(2) は「データを受け取る関数」を返すため、operator| にそのまま渡せます。

ラムダのキャプチャについて

上の例で使った [threshold] はキャプチャです。ラムダは外側の変数に直接アクセスできないため、キャプチャで「持ち込む」必要があります。

書き方 意味 注意点
[=] 全部コピー 安全。作成時の値が固定される
[&] 全部参照 関数内で完結するときのみ安全
[x] x だけコピー 明示的で最も安全
[&x] x だけ参照 関数をまたぐ場合は危険

関数をまたいでラムダを返す場合は、必ず値でキャプチャしてください。参照キャプチャ [&] はダングリング参照の原因になります。

// ❌ 危険:n が消えたあとも参照しようとする
auto make_adder(int n) {
    return [&n](int x){ return x + n; };  // n は関数終了で消える
}

// ✅ 安全:n をコピーして持つ
auto make_adder(int n) {
    return [n](int x){ return x + n; };
}

③ C++20 Ranges:標準パイプライン

②で自作した operator| の仕組みを、標準ライブラリが正式に提供したものが Ranges です。

#include <iostream>
#include <vector>
#include <ranges>

int main() {
    std::vector<int> data = {1, 2, 3, 4, 5};

    auto result = data
        | std::views::filter([](int x){ return x % 2 == 0; })
        | std::views::transform([](int x){ return x * 2; });

    for (int v : result)
        std::cout << v << " ";  // → 4 8
}

最大の特徴:遅延評価(Lazy Evaluation)

// ❌ 自作版:各ステップで中間 vector が作られる
auto a = data | filter_even;    // {2, 4} が生成される
auto b = a    | double_values;  // {4, 8} が生成される

// ✅ Ranges版:for で取り出すまで処理が走らない
// ※ ラムダは前出のものと同じため省略(C++の構文ではありません)
auto result = data
    | std::views::filter([](int x){ return x % 2 == 0; })
    | std::views::transform([](int x){ return x * 2; });  // この時点では何も起きていない

for (int v : result) { /* ここで初めて処理される */ }

100万件のデータでも、result を作るだけなら一瞬です。

よく使う views

// 条件でフィルタ
| std::views::filter([](int x){ return x > 2; })

// 各要素を変換
| std::views::transform([](int x){ return x * 2; })

// 先頭 N 件だけ
| std::views::take(3)

// 先頭 N 件を捨てる
| std::views::drop(2)

// 逆順
| std::views::reverse

結果を vector に保存する

// C++20 の書き方
std::vector<int> result;
std::ranges::copy(
    data
        | std::views::filter([](int x){ return x % 2 == 0; })
        | std::views::transform([](int x){ return x * 2; }),
    std::back_inserter(result)
);

// C++23 なら一行で
auto result = data
    | std::views::filter([](int x){ return x % 2 == 0; })
    | std::ranges::to<std::vector>();

④ 並列パイプライン:Producer/Consumer

これまでは1スレッドの処理でした。各ステップを別スレッドで同時に動かすパイプラインを作ります。

発想:工場の流れ作業

[生産者スレッド] → キュー → [処理スレッド] → キュー → [消費者スレッド]
     データを作る                 変換する                  結果を使う

各ステップは独立して動き続け、キューを通じてデータを受け渡します。

スレッドセーフなキューの実装

#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>

template<typename T>
class SafeQueue {
    std::queue<T> q;
    std::mutex mtx;
    std::condition_variable cv;
    bool done = false;

public:
    // 生産者側:データを追加する
    void push(T value) {
        std::lock_guard lock(mtx);
        q.push(std::move(value));
        cv.notify_one();  // 「データ来たよ」と通知
    }

    // 消費者側:データを取り出す(なければ待つ)
    std::optional<T> pop() {
        std::unique_lock lock(mtx);
        cv.wait(lock, [&]{ return !q.empty() || done; });

        if (q.empty()) return std::nullopt;  // 終了シグナル

        T value = std::move(q.front());
        q.pop();
        return value;
    }

    // 「もうデータは来ない」と宣言する
    void finish() {
        std::lock_guard lock(mtx);
        done = true;
        cv.notify_all();
    }
};

3ステップのパイプライン

#include <iostream>
#include <thread>

int main() {
    SafeQueue<int> queue1;  // 生産者 → 処理者
    SafeQueue<int> queue2;  // 処理者 → 消費者

    // ① 生産者スレッド
    std::thread producer([&]{
        for (int i = 1; i <= 5; i++) {
            std::cout << "生産: " << i << "\n";
            queue1.push(i);
        }
        queue1.finish();
    });

    // ② 処理スレッド
    std::thread processor([&]{
        while (auto value = queue1.pop()) {
            int result = *value * 2;
            std::cout << "処理: " << *value << " → " << result << "\n";
            queue2.push(result);
        }
        queue2.finish();
    });

    // ③ 消費者スレッド
    std::thread consumer([&]{
        while (auto value = queue2.pop()) {
            std::cout << "消費: " << *value << "\n";
        }
    });

    producer.join();
    processor.join();
    consumer.join();
}

実行結果(順番は実行のたびに変わります):

生産: 1
生産: 2
処理: 1 → 2
消費: 2
処理: 2 → 4
消費: 4
...

mutex を忘れると何が起きるか

// ❌ mutex なしで複数スレッドが同時に触ると…
queue.push(1);  // スレッドA
queue.push(2);  // スレッドB ← データ破損の危険!

SafeQueue の中で mutex を使って「一度に1スレッドだけ触れる」ようにしているのはそのためです。


まとめ:4段階の全体像

段階 特徴 向いているケース
① 関数の連鎖 シンプル・すぐ書ける 処理が少ない・学習用
operator| 自作 読みやすい・概念を理解できる 依存ライブラリなし・カスタマイズ
③ C++20 Ranges 標準・遅延評価で効率的 変換・フィルタ中心の処理
④ 並列パイプライン 複数スレッドで同時処理 I/O待ち・CPU負荷の高い処理

②で operator| を自分で実装したことで、③のRangesが「魔法」ではなく「同じ発想の洗練された形」として理解できるようになります。まずは②から入り、③に移行するのがおすすめの学習順序です。


参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?