こちらのプログラムで積分の計算をどう早くするかについての考察が書かれていました。
100万倍速いプログラムを書く - 愚直に精度をあげてみる(11桁53秒) のコードを元に話を進めていきます。
該当のコードを手元で実行したところ以下の結果になりました。22秒ほど
$ python3 -V
Python 3.5.1
$ time python3 wallis.py
36722.3621743
python3 wallis.py 21.89s user 0.07s system 99% cpu 22.014 total
まずC言語のコードに落として考えます。
wallis_v1.cc
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
double f(double x)
{
return sin(M_PI * x);
}
int main(int argc, char** argv)
{
int n = 0;
if (argc > 1) {
n = atoi(argv[1]);
}
double product = 1;
for (int m = 1;m < 11;m++) {
double sum = 0;
for (int k = 0;k < n;k++) {
sum += (1.0 / (double)n) * pow(f((double)k / (double)n), m);
}
product *= sum;
}
printf("result=%f\n", (1.0 / product));
return 0;
}
$ time ./wallis_v1 1000000
result=36722.362174
./wallis_v1 1000000 0.75s user 0.01s system 97% cpu 0.777 total
これだけで大分高速化です。
次はマルチスレッドにすることを考えます。
wallis_v2.cc
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
inline double f(double x)
{
return sin(M_PI * x);
}
int main(int argc, char** argv)
{
int n = 0;
if (argc > 1) {
n = atoi(argv[1]);
}
double product_array[10];
double _n = (double)n;
double inv_n = 1.0 / _n;
#pragma omp parallel for
for (int m = 1;m < 11;m++) {
double sum = 0;
for (int k = 0;k < n;k++) {
double x = (double)k * inv_n;
sum += inv_n * pow(f(x), m);
}
product_array[m - 1] = sum;
}
double product = 1;
for (int i = 0;i < 10;i++) {
product *= product_array[i];
}
printf("result=%f\n", (1.0 / product));
return 0;
}
$ g++-6 -O3 -fopenmp -o wallis_v2 wallis_v2.cc
$ time ./wallis_v2 1000000
result=36722.362174
./wallis_v2 1000000 0.86s user 0.01s system 292% cpu 0.296 total
一応マルチスレッドの効果は出ています。
次はpowの実装ですが、今回は整数の場合に処理を限定して別関数にします。
(抜粋)wallis_v3.cc
double _pow(double x, int n)
{
double y = x;
for (int i = 1;i < n;i++) {
y *= x;
}
return y;
}
// 中略
for (int m = 1;m < 11;m++) {
double sum = 0;
for (int k = 0;k < n;k++) {
double x = (double)k * inv_n;
sum += inv_n * _pow(f(x), m);
}
product_array[m - 1] = sum;
}
$ g++-6 -O3 -fopenmp -o wallis_v3 wallis_v3.cc
$ time ./wallis_v3 1000000
result=36722.362174
./wallis_v3 1000000 0.34s user 0.00s system 255% cpu 0.133 total
FMA命令の使用を試しましたが、特に早くなりませんでした。
wallis_v4.cc
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <immintrin.h>
inline double f(double x)
{
return sin(M_PI * x);
}
double _pow(double x, int n)
{
double y = x;
for (int i = 1;i < n;i++) {
y *= x;
}
return y;
}
int main(int argc, char** argv)
{
int n = 0;
if (argc > 1) {
n = atoi(argv[1]);
}
double product_array[10];
double _n = (double)n;
double inv_n = 1.0 / _n;
double inv_n2 = 2.0 / _n;
double inv_n3 = 3.0 / _n;
#pragma omp parallel for
for (int m = 1;m < 11;m++) {
double __attribute__((aligned(32))) vec1[4] = {0, 0, 0, 0}, vec2[4], vec3[4];
__m256d v1 = _mm256_load_pd(vec1);
__m256d v2;
__m256d v3 = _mm256_broadcast_sd(&inv_n);
for (int k = 0;k < n;k+=4) {
double x = (double)k * inv_n;
vec2[0] = _pow(f(x), m);
vec2[1] = _pow(f(x + inv_n), m);
vec2[2] = _pow(f(x + inv_n2), m);
vec2[3] = _pow(f(x + inv_n3), m);
v2 = _mm256_load_pd(vec2);
v1 = _mm256_fmadd_pd(v2, v3, v1);
}
_mm256_store_pd(vec3, v1);
product_array[m - 1] = vec3[0] + vec3[1] + vec3[2] + vec3[3];
}
double product = 1;
for (int i = 0;i < 10;i++) {
product *= product_array[i];
}
printf("result=%f\n", (1.0 / product));
return 0;
}
$ g++-6 -O3 -fopenmp -mfma -o wallis_v4 wallis_v4.cc
$ time ./wallis_v4 1000000
result=36722.362174
./wallis_v4 1000000 0.33s user 0.00s system 256% cpu 0.128 total
今度はループの最適化を行います。
mのループ内でkについて同じような処理を行なっていたので、入れ替えました。
wallis_v5.cc
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
static inline double f(double x)
{
return sin(M_PI * x);
}
static inline double _pow(double x, int n)
{
double y = x;
for (int i = 1;i < n;i++) {
y *= x;
}
return y;
}
int main(int argc, char** argv)
{
int n = 0;
if (argc > 1) {
n = atoi(argv[1]);
}
double product_array[10];
for (int i = 0;i < 10;i++) {
product_array[i] = 0;
}
double _n = (double)n;
double inv_n = 1.0 / _n;
for (int k = 0;k < n;k++) {
double x = (double)k * inv_n;
for (int m = 1;m < 11;m++) {
product_array[m - 1] += _pow(f(x), m);
}
}
double product = 1;
for (int i = 0;i < 10;i++) {
product *= product_array[i] * inv_n;
}
printf("result=%f\n", (1.0 / product));
return 0;
}
$ time ./wallis_v5 1000000
result=36722.362174
./wallis_v5 1000000 0.08s user 0.00s system 95% cpu 0.089 total
これより先OpenMPは使用していませんが、早くできました。
最後は_pow
の処理を無くしました。これにより無駄な計算が減ります。
wallis_v6.cc
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
static inline double f(double x)
{
return sin(M_PI * x);
}
int main(int argc, char** argv)
{
int n = 0;
if (argc > 1) {
n = atoi(argv[1]);
}
double product_array[10];
for (int i = 0;i < 10;i++) {
product_array[i] = 0;
}
double _n = (double)n;
double inv_n = 1.0 / _n;
for (int k = 0;k < n;k++) {
double x = (double)k * inv_n;
double y = f(x);
double z = y;
for (int m = 0;m < 10;m++) {
product_array[m] += z;
z *= y;
}
}
double product = 1;
for (int i = 0;i < 10;i++) {
product *= product_array[i] * inv_n;
}
printf("result=%f\n", (1.0 / product));
return 0;
}
$ g++-6 -O3 -o wallis_v6 wallis_v6.cc
$ time ./wallis_v6 1000000
result=36722.362174
./wallis_v6 1000000 0.04s user 0.00s system 88% cpu 0.049 total
C言語の段階で 0.777sから0.049sへと高速化できました。
これらを踏まえてPythonのコードを作成します。
wallis_v2.py
from numpy import *
def f(x):
return sin(pi*x)
product_array = [0 for i in range(10)]
n = 1000000
inv_n = 1.0 / n
for k in range(n):
x = k * inv_n
y = f(x)
z = y
for m in range(10):
product_array[m] += z
z *= y
product = prod(list(map(lambda x: x * inv_n, product_array)))
result = 1.0/product
print(result)
$ time python3 wallis_v2.py
36722.3621743
python3 wallis_v2.py 8.87s user 0.09s system 99% cpu 9.040 total
22sから9sへ高速化できました。
重複する計算を極力無くすのが重要のようです。