C++でパイプライン処理を実装する:基本概念から並列処理まで
データ処理を「独立したステップの連鎖」として組み立てるパイプラインパターンは、コードの読みやすさと再利用性を大きく高めてくれます。この記事では、C++でパイプライン処理を実装する方法を、基本概念から現代的なC++20 Ranges、さらには並列パイプラインまで段階的に解説します。
パイプライン処理とは?
「一連の処理を、独立したステップの連鎖として組み立てる」設計パターンです。
入力データ → [ステップ1] → [ステップ2] → [ステップ3] → 出力
各ステップは「受け取って・処理して・渡す」だけで、互いの実装を知らなくてよいのがポイントです。
① 素朴な実装:関数の連鎖
まずはシンプルに、関数を順番に呼び出す形で実装します。
#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
std::vector<int> parse(const std::string& csv) {
// "1,2,3,4,5" → {1, 2, 3, 4, 5}(簡略化)
return {1, 2, 3, 4, 5};
}
std::vector<int> filter_even(std::vector<int> data) {
std::vector<int> result;
std::copy_if(data.begin(), data.end(),
std::back_inserter(result),
[](int x){ return x % 2 == 0; });
return result;
}
std::vector<int> double_values(std::vector<int> data) {
std::transform(data.begin(), data.end(), data.begin(),
[](int x){ return x * 2; });
return data;
}
int main() {
// パイプライン:順番に呼ぶだけ
auto result = double_values(filter_even(parse("1,2,3,4,5")));
// → {4, 8}
}
これがパイプラインの本質ですが、ネストが深くなると右から左に読まなければならず、可読性が下がります。
② operator| で読みやすくする
Linuxの | のように、左から右に読めるようにしましょう。
operator| のオーバーロード
template<typename T, typename F>
auto operator|(T value, F func) -> decltype(func(value)) {
return func(value);
}
a | f を f(a) として動作させる、それだけです。
使い方
// 各ステップをラムダとして定義
auto filter_even = [](std::vector<int> data) {
std::vector<int> result;
for (int x : data)
if (x % 2 == 0) result.push_back(x);
return result;
};
auto double_values = [](std::vector<int> data) {
for (int& x : data) x *= 2;
return data;
};
int main() {
std::vector<int> data = {1, 2, 3, 4, 5};
// 左から右に読める!
auto result = data | filter_even | double_values;
// → {4, 8}
}
引数付きステップ:カリー化
カリー化とは?
カリー化(Currying) とは、「複数の引数を取る関数」を「引数を1つずつ受け取る関数の連鎖」に変換するテクニックです。
// 通常の関数:引数を2つ同時に受け取る
bool greater_than(int x, int threshold) {
return x > threshold;
}
// カリー化した関数:引数を1つずつ受け取る
auto greater_than_curried = [](int threshold) {
return [threshold](int x) { // threshold を覚えた関数を返す
return x > threshold;
};
};
// 使い方
auto greater_than_3 = greater_than_curried(3); // 「3より大きいか判定する関数」が得られる
greater_than_3(5); // → true
greater_than_3(2); // → false
パイプラインとの相性が抜群な理由は、operator| は「データを1つ受け取る関数」しか繋げられないからです。カリー化によって引数を先に渡し、「データだけを待つ関数」を作ることで、パイプラインに組み込めます。
「しきい値を指定してフィルタしたい」など、引数を持つステップはカリー化で実現します。
auto filter_greater_than = [](int threshold) {
// threshold をキャプチャした関数を返す
return [threshold](std::vector<int> data) {
std::vector<int> result;
for (int x : data)
if (x > threshold) result.push_back(x);
return result;
};
};
int main() {
std::vector<int> data = {1, 2, 3, 4, 5};
auto result = data
| filter_greater_than(2) // → {3, 4, 5}
| double_values; // → {6, 8, 10}
}
filter_greater_than(2) は「データを受け取る関数」を返すため、operator| にそのまま渡せます。
ラムダのキャプチャについて
上の例で使った [threshold] はキャプチャです。ラムダは外側の変数に直接アクセスできないため、キャプチャで「持ち込む」必要があります。
| 書き方 | 意味 | 注意点 |
|---|---|---|
[=] |
全部コピー | 安全。作成時の値が固定される |
[&] |
全部参照 | 関数内で完結するときのみ安全 |
[x] |
x だけコピー | 明示的で最も安全 |
[&x] |
x だけ参照 | 関数をまたぐ場合は危険 |
関数をまたいでラムダを返す場合は、必ず値でキャプチャしてください。参照キャプチャ [&] はダングリング参照の原因になります。
// ❌ 危険:n が消えたあとも参照しようとする
auto make_adder(int n) {
return [&n](int x){ return x + n; }; // n は関数終了で消える
}
// ✅ 安全:n をコピーして持つ
auto make_adder(int n) {
return [n](int x){ return x + n; };
}
③ C++20 Ranges:標準パイプライン
②で自作した operator| の仕組みを、標準ライブラリが正式に提供したものが Ranges です。
#include <iostream>
#include <vector>
#include <ranges>
int main() {
std::vector<int> data = {1, 2, 3, 4, 5};
auto result = data
| std::views::filter([](int x){ return x % 2 == 0; })
| std::views::transform([](int x){ return x * 2; });
for (int v : result)
std::cout << v << " "; // → 4 8
}
最大の特徴:遅延評価(Lazy Evaluation)
// ❌ 自作版:各ステップで中間 vector が作られる
auto a = data | filter_even; // {2, 4} が生成される
auto b = a | double_values; // {4, 8} が生成される
// ✅ Ranges版:for で取り出すまで処理が走らない
// ※ ラムダは前出のものと同じため省略(C++の構文ではありません)
auto result = data
| std::views::filter([](int x){ return x % 2 == 0; })
| std::views::transform([](int x){ return x * 2; }); // この時点では何も起きていない
for (int v : result) { /* ここで初めて処理される */ }
100万件のデータでも、result を作るだけなら一瞬です。
よく使う views
// 条件でフィルタ
| std::views::filter([](int x){ return x > 2; })
// 各要素を変換
| std::views::transform([](int x){ return x * 2; })
// 先頭 N 件だけ
| std::views::take(3)
// 先頭 N 件を捨てる
| std::views::drop(2)
// 逆順
| std::views::reverse
結果を vector に保存する
// C++20 の書き方
std::vector<int> result;
std::ranges::copy(
data
| std::views::filter([](int x){ return x % 2 == 0; })
| std::views::transform([](int x){ return x * 2; }),
std::back_inserter(result)
);
// C++23 なら一行で
auto result = data
| std::views::filter([](int x){ return x % 2 == 0; })
| std::ranges::to<std::vector>();
④ 並列パイプライン:Producer/Consumer
これまでは1スレッドの処理でした。各ステップを別スレッドで同時に動かすパイプラインを作ります。
発想:工場の流れ作業
[生産者スレッド] → キュー → [処理スレッド] → キュー → [消費者スレッド]
データを作る 変換する 結果を使う
各ステップは独立して動き続け、キューを通じてデータを受け渡します。
スレッドセーフなキューの実装
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
template<typename T>
class SafeQueue {
std::queue<T> q;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
public:
// 生産者側:データを追加する
void push(T value) {
std::lock_guard lock(mtx);
q.push(std::move(value));
cv.notify_one(); // 「データ来たよ」と通知
}
// 消費者側:データを取り出す(なければ待つ)
std::optional<T> pop() {
std::unique_lock lock(mtx);
cv.wait(lock, [&]{ return !q.empty() || done; });
if (q.empty()) return std::nullopt; // 終了シグナル
T value = std::move(q.front());
q.pop();
return value;
}
// 「もうデータは来ない」と宣言する
void finish() {
std::lock_guard lock(mtx);
done = true;
cv.notify_all();
}
};
3ステップのパイプライン
#include <iostream>
#include <thread>
int main() {
SafeQueue<int> queue1; // 生産者 → 処理者
SafeQueue<int> queue2; // 処理者 → 消費者
// ① 生産者スレッド
std::thread producer([&]{
for (int i = 1; i <= 5; i++) {
std::cout << "生産: " << i << "\n";
queue1.push(i);
}
queue1.finish();
});
// ② 処理スレッド
std::thread processor([&]{
while (auto value = queue1.pop()) {
int result = *value * 2;
std::cout << "処理: " << *value << " → " << result << "\n";
queue2.push(result);
}
queue2.finish();
});
// ③ 消費者スレッド
std::thread consumer([&]{
while (auto value = queue2.pop()) {
std::cout << "消費: " << *value << "\n";
}
});
producer.join();
processor.join();
consumer.join();
}
実行結果(順番は実行のたびに変わります):
生産: 1
生産: 2
処理: 1 → 2
消費: 2
処理: 2 → 4
消費: 4
...
mutex を忘れると何が起きるか
// ❌ mutex なしで複数スレッドが同時に触ると…
queue.push(1); // スレッドA
queue.push(2); // スレッドB ← データ破損の危険!
SafeQueue の中で mutex を使って「一度に1スレッドだけ触れる」ようにしているのはそのためです。
まとめ:4段階の全体像
| 段階 | 特徴 | 向いているケース |
|---|---|---|
| ① 関数の連鎖 | シンプル・すぐ書ける | 処理が少ない・学習用 |
② operator| 自作 |
読みやすい・概念を理解できる | 依存ライブラリなし・カスタマイズ |
| ③ C++20 Ranges | 標準・遅延評価で効率的 | 変換・フィルタ中心の処理 |
| ④ 並列パイプライン | 複数スレッドで同時処理 | I/O待ち・CPU負荷の高い処理 |
②で operator| を自分で実装したことで、③のRangesが「魔法」ではなく「同じ発想の洗練された形」として理解できるようになります。まずは②から入り、③に移行するのがおすすめの学習順序です。