LoginSignup
8
7

More than 5 years have passed since last update.

OpenCV(C++)チュートリアル:parallel_for_でコードを並列化する(マンデルブロ集合)

Last updated at Posted at 2018-12-24

公式チュートリアルのHow to use the OpenCV parallel_for_ to parallelize your codeの要約。
筆者の環境はdebian9

やること

コードを簡単に並列化できるOpenCVのparallel_for_フレームワークの使い方を示す。説明のために、ほぼ全部のCPUリソースを使ってマンデルブロ集合を描画するプログラムを書く。

前提

並列フレームワーク有効でビルドされたOpenCVが入っていること。OpenCV 3.2では次のフレームワークがこの順で利用できる。

  1. Intel Threading Building Blocks (サードパーティ製。明示的に有効にすること)
  2. C= Parallel C/C++ Programming Language Extension (サードパーティ製。明示的に有効にすること)
  3. OpenMP (コンパイラに組み込み。明示的に有効にすること)
  4. APPLE GCD (システムワイドで、 自動的に使用される (APPLEのみ))
  5. Windows RT concurrency (システムワイドで、 自動的に使用される(Windows RT のみ))
  6. Windows concurrency (ランタイムの一部, 自動的に使用される (Windows のみ - MSVC++ >= 10))
  7. Pthreads (利用可能なら)

また(弱い)前提として、並列化されたい処理が並列計算に適していること。依存関係のない複数の基本的な操作に分割できるタスクは並列化しやすい。コンピュータビジョンでの処理は多くの場合1ピクセルの処理が他のピクセルの処理に依存しないので並列化しやすい。

マンデルブロ集合を描く

理論

マンデルブロ集合は数学者ブノワ・マンデルブロの名前を取って数学者エイドリアン・ドゥアディが命名した。その画像表現が、すべてのスケールで同じ繰り返しのパターンを示す数学的集合フラクタル(しかも、マンデルブロ集合は拡大しても全体の形が繰り返しあらわれる自己相似)であることから数学以外の分野でも有名。

マンデルブロ集合を描画するための式をwikipediaの記事から引用すると

次の漸化式

\left\{
\begin{array}{ll}
Z_{n+1} = Z_n^2 + c \\
Z_0 = 0
\end{array}
\right.

で定義される複素数列{$Z_n$} $n\in N \cup$ {$0$}が $n\rightarrow \infty$の極限で無限大に発散しないという条件を満たす複素数 $c$ 全体が作る集合がマンデルブロ集合である。

つまり、はじめに複素数cを選び、$z_0=0$から開始して、上記の漸化式を繰り返し、どんなにnが大きくなっても$z_n$の絶対値が発散しなければ、その複素数cはマンデルブロ集合に含まれる。
このことは次のように表現できる。

\limsup_{n\to\infty} |z_{n+1}| \leq 2

疑似コード

マンデルブロ集合のシンプルな描画アルゴリズムにescape time algorithmがある。画像の各ピクセルに対して、最大回数までのイテレーションの間に、そのピクセルに対応している複素数値が発散するかどうかテストする。マンデルブロ集合に属していないピクセルはすぐにescapeされ、最大回数を超えてもエスケープしないピクセルは属するものと判断する。最大イテレーション回数を増やすとより詳細な画像が得られるが、その分計算時間がかかる。escapeするまでにかかったイテレーション回数はそのピクセルの色として使う。

各ピクセル (Px, Py) について, do:
{
  x0 = ピクセルのx座標 (マンデルブロ集合の X scale (-2, 1)におさまるようにスケールされている)
  y0 = ピクセルのy座標 (マンデルブロ集合の Y scale (-1, 1)におさまるようにスケールされている)
  x = 0.0
  y = 0.0
  iteration = 0
  max_iteration = 1000
  while (x*x + y*y < 2*2  AND  iteration < max_iteration) {
    xtemp = x*x - y*y + x0 
    y = 2*x*y + y0  
    x = xtemp
    iteration = iteration + 1
  }
  color = palette[iteration]
  plot(Px, Py, color)
}

上記疑似コードで、

while (x*x + y*y < 2*2  AND  iteration < max_iteration)

は、limsup|Zn+1| <= 2の条件のチェック、

    xtemp = x*x - y*y + x0 
    y = 2*x*y + y0  
    x = xtemp

は、$Z_n^2+c$を実部と虚部に分けて計算している(x0はcの実部、y0はcの虚部)
次の式を参照のこと。

\begin{align}
z&=x+iy\\
z^2&=x^2+i2xy−y^2\\
c&=x_0+iy_0\\
\end{align}

スケール

実装

Escape time algorithm

集合に含まれるかチェックを行い、escapeしたときのイテレーション回数を返す関数。複素数にはstd::complexを使う。

int mandelbrot(const complex<float> &z0, const int max)
{
    complex<float> z = z0;
    for (int t = 0; t < max; t++)
    {
        if (z.real()*z.real() + z.imag()*z.imag() > 4.0f) return t;
        z = z*z + z0;
    }
    return max;
}

シーケンシャルな実装

void sequentialMandelbrot(Mat &img, const float x1, const float y1, const float scaleX, const float scaleY)
{
    for (int i = 0; i < img.rows; i++)
    {
        for (int j = 0; j < img.cols; j++)
        {
            float x0 = j / scaleX + x1;
            float y0 = i / scaleY + y1;
            complex<float> z0(x0, y0);
            uchar value = (uchar) mandelbrotFormula(z0);
            img.ptr<uchar>(i)[j] = value;
        }
    }
}

ピクセルの座標がマンデルブロ集合のスケールに収まるようにする部分

    Mat mandelbrotImg(4800, 5400, CV_8U);
    float x1 = -2.1f, x2 = 0.6f;
    float y1 = -1.2f, y2 = 1.2f;
    float scaleX = mandelbrotImg.cols / (x2 - x1);
    float scaleY = mandelbrotImg.rows / (y2 - y1);

ピクセルにグレースケールを設定するルールとして:

  • ピクセルがマンデルブロ集合に含まれるなら黒
  • 含まれないなら、イテレーション回数に応じたグレースケール
int mandelbrotFormula(const complex<float> &z0, const int maxIter=500) {
    int value = mandelbrot(z0, maxIter);
    if(maxIter - value == 0)
    {
        return 0;
    }
    return cvRound(sqrt(value / (float) maxIter) * 255);
}

イテレーション回数を線形変換しただけではグレースケールの変化がわからない。square root scale transformationを使う(Jeremy D. Frensのブログポストから借りた)。

f(x)=\sqrt{\frac{x}{maxIter}}×255

変換
緑が線形変換で青がsquare root scale transformation。低い値がboostされている様子がわかる。

並列実装

シーケンシャルな実装で、ピクセルが独立に計算されているのを見た。
並列化にはOpenCVのcv::parallel_for_ フレームワークを使う。

class ParallelMandelbrot : public ParallelLoopBody
{
public:
    ParallelMandelbrot (Mat &img, const float x1, const float y1, const float scaleX, const float scaleY)
        : m_img(img), m_x1(x1), m_y1(y1), m_scaleX(scaleX), m_scaleY(scaleY)
    {
    }
    virtual void operator ()(const Range& range) const CV_OVERRIDE
    {
        for (int r = range.start; r < range.end; r++)
        {
            int i = r / m_img.cols;
            int j = r % m_img.cols;
            float x0 = j / m_scaleX + m_x1;
            float y0 = i / m_scaleY + m_y1;
            complex<float> z0(x0, y0);
            uchar value = (uchar) mandelbrotFormula(z0);
            m_img.ptr<uchar>(i)[j] = value;
        }
    }
    ParallelMandelbrot& operator=(const ParallelMandelbrot &) {
        return *this;
    };
private:
    Mat &m_img;
    float m_x1;
    float m_y1;
    float m_scaleX;
    float m_scaleY;
};

cv::ParallelLoopBodyを継承したカスタムクラスを宣言し、virtual void operator ()(const cv::Range& range) constをオーバーライドする。operator ()の中のrangeは個別のスレッドが扱うピクセルのサブセット。この分割は計算負荷を均等にするように自動的に行われる。ピクセルのインデックス座標を二次元の[row, col]座標に変換する必要がある(int i = r / m_img.cols;あたりのことを指しているのだと思う)。
Mat &m_img;で画像の参照を保持しておくこと。

呼び方:

 ParallelMandelbrot parallelMandelbrot(mandelbrotImg, x1, y1, scaleX, scaleY);
 parallel_for_(Range(0, mandelbrotImg.rows*mandelbrotImg.cols), parallelMandelbrot);

rangeは実行する操作の合計数。つまり全ピクセル数になる。
スレッド数はcv::setNumThreadsで設定できる。分割数はcv::parallel_for_nstripesパラメータで設定できる。
プロセッサが4スレッドならcv::setNumThreads(2)またはsetting nstripes=2にすることで、すべてのスレッドを使うが、ワークロードは2スレッドだけに分割する。
(この辺自信ない。)

※ 標準C++ 11が使えるなら、ParallelMandelbrotクラスをやめてラムダ式にすることもできる。(コード全体を参照)

コード全体

parallel.cpp

#include <iostream>
#include <opencv2/core.hpp>
#include <opencv2/imgcodecs.hpp>

using namespace std;
using namespace cv;

namespace
{
//! [mandelbrot-escape-time-algorithm]
int mandelbrot(const complex<float> &z0, const int max)
{
    complex<float> z = z0;
    for (int t = 0; t < max; t++)
    {
        if (z.real()*z.real() + z.imag()*z.imag() > 4.0f) return t;
        z = z*z + z0;
    }

    return max;
}
//! [mandelbrot-escape-time-algorithm]

//! [mandelbrot-grayscale-value]
int mandelbrotFormula(const complex<float> &z0, const int maxIter=500) {
    int value = mandelbrot(z0, maxIter);
    if(maxIter - value == 0)
    {
        return 0;
    }

    return cvRound(sqrt(value / (float) maxIter) * 255);
}
//! [mandelbrot-grayscale-value]

//! [mandelbrot-parallel]
class ParallelMandelbrot : public ParallelLoopBody
{
public:
    ParallelMandelbrot (Mat &img, const float x1, const float y1, const float scaleX, const float scaleY)
        : m_img(img), m_x1(x1), m_y1(y1), m_scaleX(scaleX), m_scaleY(scaleY)
    {
    }

    virtual void operator ()(const Range& range) const CV_OVERRIDE
    {
        for (int r = range.start; r < range.end; r++)
        {
            int i = r / m_img.cols;
            int j = r % m_img.cols;

            float x0 = j / m_scaleX + m_x1;
            float y0 = i / m_scaleY + m_y1;

            complex<float> z0(x0, y0);
            uchar value = (uchar) mandelbrotFormula(z0);
            m_img.ptr<uchar>(i)[j] = value;
        }
    }

    ParallelMandelbrot& operator=(const ParallelMandelbrot &) {
        return *this;
    };

private:
    Mat &m_img;
    float m_x1;
    float m_y1;
    float m_scaleX;
    float m_scaleY;
};
//! [mandelbrot-parallel]

//! [mandelbrot-sequential]
void sequentialMandelbrot(Mat &img, const float x1, const float y1, const float scaleX, const float scaleY)
{
    for (int i = 0; i < img.rows; i++)
    {
        for (int j = 0; j < img.cols; j++)
        {
            float x0 = j / scaleX + x1;
            float y0 = i / scaleY + y1;

            complex<float> z0(x0, y0);
            uchar value = (uchar) mandelbrotFormula(z0);
            img.ptr<uchar>(i)[j] = value;
        }
    }
}
//! [mandelbrot-sequential]
}

int main()
{
    //! [mandelbrot-transformation]
    Mat mandelbrotImg(4800, 5400, CV_8U);
    float x1 = -2.1f, x2 = 0.6f;
    float y1 = -1.2f, y2 = 1.2f;
    float scaleX = mandelbrotImg.cols / (x2 - x1);
    float scaleY = mandelbrotImg.rows / (y2 - y1);
    //! [mandelbrot-transformation]

    double t1 = (double) getTickCount();

    #ifdef CV_CXX11
    //ラムダ式が使える
    //! [mandelbrot-parallel-call-cxx11]
    parallel_for_(Range(0, mandelbrotImg.rows*mandelbrotImg.cols), [&](const Range& range){
        for (int r = range.start; r < range.end; r++)
        {
            int i = r / mandelbrotImg.cols;
            int j = r % mandelbrotImg.cols;

            float x0 = j / scaleX + x1;
            float y0 = i / scaleY + y1;

            complex<float> z0(x0, y0);
            uchar value = (uchar) mandelbrotFormula(z0);
            mandelbrotImg.ptr<uchar>(i)[j] = value;
        }
    });
    //! [mandelbrot-parallel-call-cxx11]

    #else

    //! [mandelbrot-parallel-call]
    ParallelMandelbrot parallelMandelbrot(mandelbrotImg, x1, y1, scaleX, scaleY);
    parallel_for_(Range(0, mandelbrotImg.rows*mandelbrotImg.cols), parallelMandelbrot);
    //! [mandelbrot-parallel-call]

    #endif

    t1 = ((double) getTickCount() - t1) / getTickFrequency();
    cout << "Parallel Mandelbrot: " << t1 << " s" << endl;

    Mat mandelbrotImgSequential(4800, 5400, CV_8U);
    double t2 = (double) getTickCount();
    sequentialMandelbrot(mandelbrotImgSequential, x1, y1, scaleX, scaleY);
    t2 = ((double) getTickCount() - t2) / getTickFrequency();
    cout << "Sequential Mandelbrot: " << t2 << " s" << endl;
    cout << "Speed-up: " << t2/t1 << " X" << endl;

    imwrite("Mandelbrot_parallel.png", mandelbrotImg);
    imwrite("Mandelbrot_sequential.png", mandelbrotImgSequential);

    return EXIT_SUCCESS;
}

コンパイル

 g++ parallel.cpp  -I/usr/local/include/opencv2 -I/usr/local/include/opencv -L/usr/local/lib -lopencv_core -lopencv_imgcodecs -lopencv_highgui -lopencv_imgproc

結果

Parallel Mandelbrot: 68.8412 s
Sequential Mandelbrot: 163.733 s
Speed-up: 2.37842 X

結果
こんな感じになる。

疑問

マンデルブロ集合に属していないピクセルはすぐにescapeされ、最大回数を超えてもエスケープしないピクセルは属するものと判断する。

上記の原文は

Pixels that do not belong to the Mandelbrot set will escape quickly whereas we assume that the pixel is in the set after a fixed maximum number of iterations.

また、

変換

この図をみると、最大イテレーションを超えたピクセルは白で描かれることになるが、結果の図の白い部分がマンデルブロ集合、ということになるんだろうか?今まで黒がマンデルブロ集合だと思っていた・・・。

2019/3/8更新:
最大イテレーションに近づくほど白に近づくが、最大イテレーションそのものに達したピクセルは黒で塗られているため、マンデルブロ集合はやはり黒の部分だった。下記のコードを参照。


int mandelbrotFormula(const complex<float> &z0, const int maxIter=500) {
    int value = mandelbrot(z0, maxIter);
    if(maxIter - value == 0)
    {
        return 0;
    }

    return cvRound(sqrt(value / (float) maxIter) * 255);
}
8
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
7