6
4

高速化アルゴリズム探検隊: 「マンデルブロ集合 の計算」CPU python : 103.63秒 C++ : 25.33秒 CUDA GPU : 2.45秒 ブロック最適化後: 2.08秒

Last updated at Posted at 2024-07-23

73d14943-de58-4e6e-8184-33290c7328a0.png

ショートストーリー: 「アルゴリズム探検隊」
序章
ある日のこと、アルゴリズム探検隊のリーダーであるエミリーは、チームメンバーのジェームズ、ソフィア、そしてAIアシスタントのアルファと共に、新たなチャレンジに挑むことを決意しました。その課題は、マンデルブロ集合の計算を行い、様々な方法でその速度を比較することでした。

第一章: CPUによるPythonの冒険
「まずは基本から始めましょう」とエミリーは言いました。彼女はPythonでマンデルブロ集合の計算コードを書き、シンプルなCPUで実行しました。時間がかかることを予想していたエミリーは、コーヒーを淹れながら結果を待ちました。しばらくして結果が出ると、エミリーは時計を見て、「計算に約30秒もかかったわ」と驚きました。

第二章: C++での高速化
次に、エミリーはC++で同じ計算を行うことにしました。「C++は高速だから、これでどれくらい時間を短縮できるか見てみましょう」とエミリーはチームに告げました。コンパイルと実行が終わり、結果が出ると、ジェームズが「約10秒だ!」と声を上げました。「やっぱりC++は速いね」とソフィアも同意しました。

第三章: CUDAを使ったGPUの冒険
最終チャレンジはCUDAを使ったGPUによる計算でした。エミリーは「これが一番楽しみだったの」と笑顔で言いながら、CUDAコードを書き始めました。ジェームズが「GPUの力を見せてやろう」と励まし、ソフィアも「きっと最速の結果が出るはず」と期待しました。計算が終わると、結果は1秒未満でした。「すごい!たったの0.5秒で終わったわ!」とエミリーは歓声を上げました。

エピローグ
エミリーと探検隊は、それぞれの方法での計算時間を比較し、次のような結果を得ました:

CPU Python: 約30秒
C++: 約10秒
CUDA GPU: 約0.5秒
「技術の進歩は本当に素晴らしいわね」とエミリーは感慨深げに言いました。「私たちはこれからも、より速く、より効率的なアルゴリズムを探求していくわ」と探検隊の冒険は続くのでした。

CPU python 処理時間: 103.63秒
C++ 計算時間: 25.33秒
CPU JIT python 複素数計算ができないためお休み。
CUDA GPU ( pycuda ) 処理時間: 2.45秒
CUDA GPU ( pycuda ) ブロック最適化 処理時間: 2.08秒

image.png

ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー

CPU python

説明
mandelbrot(c, max_iter): マンデルブロー集合の計算を行う関数です。指定した最大反復回数まで計算し、発散するまでの反復回数を返します。
plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter): 指定された範囲でマンデルブロー集合を計算し、画像として保存するための2次元配列を生成します。
main(): 5つの異なるパラメータセットでマンデルブロー集合をプロットし、画像を保存します。処理時間も測定し、コンソールに表示します。

# インストールコマンド
!pip install matplotlib numpy

import numpy as np
import matplotlib.pyplot as plt
import time

def mandelbrot(c, max_iter):
    z = c
    for n in range(max_iter):
        if abs(z) > 2:
            return n
        z = z*z + c
    return max_iter

def plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter):
    r1, r2 = np.linspace(xmin, xmax, width), np.linspace(ymin, ymax, height)
    img = np.zeros((height, width), dtype=np.int32)
    for i, y in enumerate(r2):
        for j, x in enumerate(r1):
            c = complex(x, y)
            img[i, j] = mandelbrot(c, max_iter)
    return img

def main():
    # パラメータ設定
    width, height = 800, 600
    max_iter = 256
    positions = [
        (-2.0, 1.0, -1.5, 1.5),
        (-1.0, 0.5, -0.5, 0.5),
        (-1.5, -1.0, -1.0, 1.0),
        (-0.5, 0.5, -0.5, 0.5),
        (-2.5, -1.5, -1.5, 1.5)
    ]
    
    start_time = time.time()
    
    for idx, (xmin, xmax, ymin, ymax) in enumerate(positions):
        plt.figure(figsize=(10, 7))
        img = plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter)
        plt.imshow(img, extent=(xmin, xmax, ymin, ymax), cmap='inferno')
        plt.colorbar()
        plt.title(f'Mandelbrot Set - Plot {idx+1}')
        plt.savefig(f'mandelbrot_plot_{idx+1}.png')
        plt.close()
    
    end_time = time.time()
    print(f'処理時間: {end_time - start_time:.2f}')

if __name__ == "__main__":
    main()

C++

説明
C++ソースコード: cpp_code変数にC++のソースコードを格納しています。このコードはマンデルブロー集合を計算し、画像ファイルとして保存します。
ソースコードのファイル書き込み: mandelbrot.cppというファイルにC++のソースコードを書き込みます。
コンパイル: g++を使用してC++ソースコードをコンパイルします。OpenCVライブラリをリンクしています。
実行と処理時間の測定: コンパイルされたC++プログラムを実行し、処理時間を測定します。
画像の表示: 生成された画像ファイルを読み込み、Matplotlibで表示します。
これにより、C++で計算したマンデルブロー集合の画像をPythonから実行し、表示することができます。

# 必要なライブラリのインストール
!pip install matplotlib numpy
!apt-get install -y g++ cmake libopencv-dev

import subprocess
import time
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# C++のソースコード
cpp_code = """
#include <iostream>
#include <fstream>
#include <vector>
#include <complex>
#include <chrono>
#include <opencv2/opencv.hpp>

using namespace std;
using namespace std::chrono;

int mandelbrot(const complex<double>& c, int max_iter) {
    complex<double> z = c;
    for (int n = 0; n < max_iter; ++n) {
        if (abs(z) > 2.0) {
            return n;
        }
        z = z * z + c;
    }
    return max_iter;
}

void plot_mandelbrot(double xmin, double xmax, double ymin, double ymax, int width, int height, int max_iter, const string& filename) {
    vector<uint8_t> img(width * height);
    double x_scale = (xmax - xmin) / (width - 1);
    double y_scale = (ymax - ymin) / (height - 1);

    for (int y = 0; y < height; ++y) {
        for (int x = 0; x < width; ++x) {
            double cr = xmin + x * x_scale;
            double ci = ymin + y * y_scale;
            complex<double> c(cr, ci);
            int color = mandelbrot(c, max_iter);
            img[y * width + x] = static_cast<uint8_t>(color % 256);
        }
    }

    cv::Mat image(height, width, CV_8UC1, img.data());
    cv::applyColorMap(image, image, cv::COLORMAP_INFERNO);
    cv::imwrite(filename, image);
}

int main() {
    auto start = high_resolution_clock::now();

    int width = 800, height = 600, max_iter = 256;
    vector<tuple<double, double, double, double>> positions = {
        {-2.0, 1.0, -1.5, 1.5},
        {-1.0, 0.5, -0.5, 0.5},
        {-1.5, -1.0, -1.0, 1.0},
        {-0.5, 0.5, -0.5, 0.5},
        {-2.5, -1.5, -1.5, 1.5}
    };

    for (size_t i = 0; i < positions.size(); ++i) {
        double xmin, xmax, ymin, ymax;
        tie(xmin, xmax, ymin, ymax) = positions[i];
        string filename = "mandelbrot_plot_" + to_string(i + 1) + ".png";
        plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter, filename);
    }

    auto stop = high_resolution_clock::now();
    auto duration = duration_cast<milliseconds>(stop - start);
    cout << "計算時間: " << duration.count() << "ミリ秒" << endl;

    return 0;
}
"""

# C++ソースコードをファイルに書き込む
with open('mandelbrot.cpp', 'w') as file:
    file.write(cpp_code)

# pkg-config コマンドで OpenCV のフラグを取得
pkg_config_output = subprocess.check_output(['pkg-config', '--cflags', '--libs', 'opencv4']).decode().strip()

# g++ コマンドを使って C++ ソースコードをコンパイル
compile_command = f'g++ -o mandelbrot mandelbrot.cpp {pkg_config_output}'
subprocess.run(compile_command, shell=True, check=True)

# C++プログラムを実行し、処理時間を測定する
start_time = time.time()
result = subprocess.run(['./mandelbrot'], capture_output=True, text=True)
end_time = time.time()

# 結果と処理時間を表示する
print(result.stdout)
print(f'計算時間: {end_time - start_time:.2f}')

# 生成された画像を表示する
for i in range(1, 6):
    img = mpimg.imread(f'mandelbrot_plot_{i}.png')
    plt.figure(figsize=(10, 7))
    plt.imshow(img, cmap='inferno')
    plt.title(f'Mandelbrot Set - Plot {i}')
    plt.axis('off')
    plt.show()

CUDA GPU ( pycuda )

PyCUDA ライブラリを使用して、GPUを活用してマンデルブロー集合の計算を高速化するコードです。以下に、pycuda を使用してGPUでマンデルブロー集合を計算するコード。

説明
cuda_code: CUDAカーネルコードとして、マンデルブロー集合の計算をGPUで行う関数を定義しています。
plot_mandelbrot: CUDAカーネルを呼び出してマンデルブロー集合の計算を実行し、画像を生成します。デバイスメモリとホストメモリ間でのデータ転送を行います。
main(): 5つの異なるパラメータセットでマンデルブロー集合をプロットし、画像を保存します。処理時間も測定して表示します。
このコードでは、PyCUDAを使用してGPU上でマンデルブロー集合を計算し、パフォーマンスの向上を図っています。

# インストールコマンド
!pip install matplotlib numpy pycuda

import numpy as np
import matplotlib.pyplot as plt
import time
import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule

# CUDAカーネルの定義
cuda_code = """
__global__ void mandelbrot(float *xmin, float *xmax, float *ymin, float *ymax, int *width, int *height, int *max_iter, int *img) {
    int x = blockIdx.x * blockDim.x + threadIdx.x;
    int y = blockIdx.y * blockDim.y + threadIdx.y;
    
    if (x < *width && y < *height) {
        float r = *xmin + x * (*xmax - *xmin) / (*width - 1);
        float i = *ymin + y * (*ymax - *ymin) / (*height - 1);
        float cr = r;
        float ci = i;
        int iteration = 0;
        float zr = 0;
        float zi = 0;
        
        while (zr * zr + zi * zi <= 4.0f && iteration < *max_iter) {
            float temp = zr * zr - zi * zi + cr;
            zi = 2.0f * zr * zi + ci;
            zr = temp;
            iteration++;
        }
        
        img[y * (*width) + x] = iteration;
    }
}
"""

def plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter):
    mod = SourceModule(cuda_code)
    mandelbrot = mod.get_function("mandelbrot")
    
    img = np.zeros((height, width), dtype=np.int32)
    
    # デバイスメモリの確保
    d_img = cuda.mem_alloc(img.nbytes)
    d_xmin = cuda.mem_alloc(np.float32(1).nbytes)
    d_xmax = cuda.mem_alloc(np.float32(1).nbytes)
    d_ymin = cuda.mem_alloc(np.float32(1).nbytes)
    d_ymax = cuda.mem_alloc(np.float32(1).nbytes)
    d_width = cuda.mem_alloc(np.int32(1).nbytes)
    d_height = cuda.mem_alloc(np.int32(1).nbytes)
    d_max_iter = cuda.mem_alloc(np.int32(1).nbytes)

    # ホストからデバイスへのデータ転送
    cuda.memcpy_htod(d_xmin, np.array([xmin], dtype=np.float32))
    cuda.memcpy_htod(d_xmax, np.array([xmax], dtype=np.float32))
    cuda.memcpy_htod(d_ymin, np.array([ymin], dtype=np.float32))
    cuda.memcpy_htod(d_ymax, np.array([ymax], dtype=np.float32))
    cuda.memcpy_htod(d_width, np.array([width], dtype=np.int32))
    cuda.memcpy_htod(d_height, np.array([height], dtype=np.int32))
    cuda.memcpy_htod(d_max_iter, np.array([max_iter], dtype=np.int32))

    # CUDAカーネルの実行
    block_size = (16, 16, 1)
    grid_size = (width // block_size[0] + 1, height // block_size[1] + 1)
    mandelbrot(d_xmin, d_xmax, d_ymin, d_ymax, d_width, d_height, d_max_iter, d_img, block=block_size, grid=grid_size)

    # デバイスからホストへのデータ転送
    cuda.memcpy_dtoh(img, d_img)
    
    return img

def main():
    # パラメータ設定
    width, height = 800, 600
    max_iter = 256
    positions = [
        (-2.0, 1.0, -1.5, 1.5),
        (-1.0, 0.5, -0.5, 0.5),
        (-1.5, -1.0, -1.0, 1.0),
        (-0.5, 0.5, -0.5, 0.5),
        (-2.5, -1.5, -1.5, 1.5)
    ]
    
    start_time = time.time()
    
    for idx, (xmin, xmax, ymin, ymax) in enumerate(positions):
        plt.figure(figsize=(10, 7))
        img = plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter)
        plt.imshow(img, extent=(xmin, xmax, ymin, ymax), cmap='inferno')
        plt.colorbar()
        plt.title(f'Mandelbrot Set - Plot {idx+1}')
        plt.savefig(f'mandelbrot_plot_{idx+1}.png')
        plt.close()
    
    end_time = time.time()
    print(f'処理時間: {end_time - start_time:.2f}')

if __name__ == "__main__":
    main()

最適化されたCUDAカーネルのブロックサイズを見つけるためのスクリプトです。さまざまなブロックサイズでマンデルブロ集合の計算を実行し、その実行時間を測定します。最適なブロックサイズを特定することで、計算の効率を最大化できます。

このコードは次のことを行います:

find_optimal_block_size関数を使って最適なCUDAカーネルブロックサイズを見つける。
5つの異なる位置でマンデルブロ集合を計算し、画像を生成する。
最適なブロックサイズを使って計算し、総処理時間を測定する。
このコードをGoogle ColabなどのJupyterノートブックで実行すると、各マンデルブロ集合の画像が生成され、mandelbrot_plot_1.pngからmandelbrot_plot_5.pngとして保存されます。

Block size (8, 8) took 0.0032 seconds
Block size (16, 16) took 0.0018 seconds
Block size (32, 32) took 0.0016 seconds
Block size (16, 8) took 0.0015 seconds
Block size (8, 16) took 0.0016 seconds
Block size (32, 16) took 0.0015 seconds
Block size (16, 32) took 0.0016 seconds
Optimal block size: (16, 8) with time 0.0015 seconds
総処理時間: 2.08秒

# インストールコマンド
!pip install matplotlib numpy pycuda

import numpy as np
import matplotlib.pyplot as plt
import time
import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule

# CUDAカーネルの定義
cuda_code = """
__global__ void mandelbrot(float xmin, float xmax, float ymin, float ymax, int width, int height, int max_iter, int *img) {
    int x = blockIdx.x * blockDim.x + threadIdx.x;
    int y = blockIdx.y * blockDim.y + threadIdx.y;
    
    if (x < width && y < height) {
        float r = xmin + x * (xmax - xmin) / (width - 1);
        float i = ymin + y * (ymax - ymin) / (height - 1);
        float cr = r;
        float ci = i;
        int iteration = 0;
        float zr = 0;
        float zi = 0;
        
        while (zr * zr + zi * zi <= 4.0f && iteration < max_iter) {
            float temp = zr * zr - zi * zi + cr;
            zi = 2.0f * zr * zi + ci;
            zr = temp;
            iteration++;
        }
        
        img[y * width + x] = iteration;
    }
}
"""

def plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter, block_size):
    mod = SourceModule(cuda_code)
    mandelbrot = mod.get_function("mandelbrot")
    
    img = np.zeros((height, width), dtype=np.int32)
    
    # デバイスメモリの確保
    d_img = cuda.mem_alloc(img.nbytes)

    # CUDAカーネルの実行
    grid_size = (width // block_size[0] + 1, height // block_size[1] + 1)
    mandelbrot(np.float32(xmin), np.float32(xmax), np.float32(ymin), np.float32(ymax), np.int32(width), np.int32(height), np.int32(max_iter), d_img, block=(block_size[0], block_size[1], 1), grid=(grid_size[0], grid_size[1]))

    # デバイスからホストへのデータ転送
    cuda.memcpy_dtoh(img, d_img)
    
    return img

def find_optimal_block_size(xmin, xmax, ymin, ymax, width, height, max_iter):
    block_sizes = [(8, 8), (16, 16), (32, 32), (16, 8), (8, 16), (32, 16), (16, 32)]
    best_time = float('inf')
    best_block_size = None
    
    for block_size in block_sizes:
        start_time = time.time()
        plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter, block_size)
        end_time = time.time()
        
        elapsed_time = end_time - start_time
        print(f'Block size {block_size} took {elapsed_time:.4f} seconds')
        
        if elapsed_time < best_time:
            best_time = elapsed_time
            best_block_size = block_size
    
    print(f'Optimal block size: {best_block_size} with time {best_time:.4f} seconds')
    return best_block_size

def main():
    # パラメータ設定
    width, height = 800, 600
    max_iter = 256
    positions = [
        (-2.0, 1.0, -1.5, 1.5),
        (-1.0, 0.5, -0.5, 0.5),
        (-1.5, -1.0, -1.0, 1.0),
        (-0.5, 0.5, -0.5, 0.5),
        (-2.5, -1.5, -1.5, 1.5)
    ]
    
    optimal_block_size = find_optimal_block_size(-2.0, 1.0, -1.5, 1.5, width, height, max_iter)
    
    start_time = time.time()
    
    for idx, (xmin, xmax, ymin, ymax) in enumerate(positions):
        plt.figure(figsize=(10, 7))
        img = plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter, optimal_block_size)
        plt.imshow(img, extent=(xmin, xmax, ymin, ymax), cmap='inferno')
        plt.colorbar()
        plt.title(f'Mandelbrot Set - Plot {idx+1}')
        plt.savefig(f'mandelbrot_plot_{idx+1}.png')
        plt.close()
    
    end_time = time.time()
    print(f'総処理時間: {end_time - start_time:.2f}')

if __name__ == "__main__":
    main()


ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー

マンデルブロ集合の計算についての解説

  1. マンデルブロ集合とは
    マンデルブロ集合(Mandelbrot Set)は、複雑なフラクタル図形の一種であり、数学者ブノワ・マンデルブロにちなんで名付けられました。この集合は、複素平面上の点
    c が特定の漸化式を満たすかどうかによって決定されます。集合の境界は非常に複雑で美しいパターンを形成し、無限に細かい自己相似の性質を持っています。

以下は、Pythonを使用したマンデルブロ集合の計算とプロットの例です。

import numpy as np
import matplotlib.pyplot as plt
import time

def mandelbrot(c, max_iter):
    z = c
    for n in range(max_iter):
        if abs(z) > 2:
            return n
        z = z*z + c
    return max_iter

def plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter):
    r1 = np.linspace(xmin, xmax, width)
    r2 = np.linspace(ymin, ymax, height)
    n3 = np.empty((width, height))
    for i in range(width):
        for j in range(height):
            n3[i, j] = mandelbrot(r1[i] + 1j * r2[j], max_iter)
    return n3

# 設定
xmin, xmax, ymin, ymax = -2.0, 1.0, -1.5, 1.5
width, height = 800, 600
max_iter = 256

start_time = time.time()

# プロット
mandelbrot_set = plot_mandelbrot(xmin, xmax, ymin, ymax, width, height, max_iter)
plt.imshow(mandelbrot_set.T, extent=[xmin, xmax, ymin, ymax], cmap='inferno')
plt.colorbar()
plt.title('Mandelbrot Set')
plt.show()

end_time = time.time()
print(f'処理時間: {end_time - start_time:.2f}')

このコードは、指定された範囲の複素平面上の点を走査し、各点がマンデルブロ集合に属するかどうかを判定します。その結果を色で表現し、視覚的に表示します。

  1. 高速化手法
    マンデルブロ集合の計算は非常に時間がかかることがあるため、高速化手法が重要です。以下にいくつかの方法を紹介します:

並列処理: 計算を複数のプロセッサやコアで並列に実行することで速度を向上させます。
GPU計算: CUDAなどを利用してGPUで計算を行うことで、並列計算能力を活かして大幅に速度を向上させます。
コンパイル済み言語の使用: C++などのコンパイル済み言語を使用して計算を行うことで、インタープリター言語よりも高速に実行できます。
これらの手法を組み合わせることで、マンデルブロ集合の計算速度を大幅に向上させることが可能です。

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4