はじめに
データ同化を研究に使いたくて色々勉強してます。
データ同化のベンチマークでよく使われるLorenz96モデル https://journals.ametsoc.org/jas/article/55/3/399/24564/Optimal-Sites-for-Supplementary-Weather
を解くコードをJuliaで書いてました。これが遅いのか早いのかはっきりしないので,とりあえずPythonで書き直して比較しました。
Lorenz96モデル
方程式
$$
\frac{dX_j}{dt} = (X_{j+1} - X_{j-2})X_{j-1} - X_j +F
$$
ただし,$j= 1\dots N$。今回はN=40。
初期に$X_j=F$とすると,定常解が得られます。今回は$X_{20}=1.001F, ; F=8$として解きます。F=8だとカオス的な挙動が見られます。
これを4段4次精度のルンゲクッタで解きます。
Julia版
Juliaのバージョンは1.5.1です。Jupyter labで実行しました。
using LinearAlgebra
## パラメータ
## Fの大きさで挙動が変わる
F = 8
N = 40
dt = 0.01
tend = 146
nstep = Int32(tend/dt)
# dx/dt=f(x)
# Lorentz96の方程式
function f(x)
f = fill(0.0, N)
for i=3:N-1
f[i] = (x[i+1]-x[i-2])x[i-1] - x[i] + F
end
# 周期境界
f[1] = (x[2]-x[N-1])x[N] - x[1] + F
f[2] = (x[3]-x[N])x[1] - x[2] + F
f[N] = (x[1]-x[N-2])x[N-1] - x[N] + F
return f
end
# L96をRK4で解く
function RK4(xold, dt)
k1 = f(xold)
k2 = f(xold + k1*dt/2.)
k3 = f(xold + k2*dt/2.)
k4 = f(xold + k3*dt)
xnew = xold + dt/6.0*(k1 + 2.0k2 + 2.0k3 + k4)
end
# 真値を全ステップ計算
function progress(deltat)
xn = zeros(Float64, N, nstep)
t = zeros(Float64, nstep)
#X = fill(F,N)+randn(N)
X = fill(Float64(F), N)
# 初期擾乱はj=20の点にFの値の0.1%の値を与える。
X[20] = 1.001*F
for j=1:nstep
X = RK4(X, deltat)
for i=1:N
xn[i, j] = X[i]
end
t[j] = dt*j
end
return xn, t
end
# x_trueの計算
@time xn, t = progress(dt)
Python版
Pythonは3.7.3です。
Pythonはpipでインストールしたnumpyで計算します。Anacondaでいれたnumpyのほうが早いですが,とりあえずテストなので,気にしないことにします。
import numpy as np
from copy import copy
## パラメータ
## Fの大きさで挙動が変わる
F = 8
# 解析データ点の個数(次元)
N = 40
dt = 0.01
# 1年分計算
tend=146
nstep = int(tend/dt)
def f(x):
f = np.zeros((N))
for i in range(2, N-1):
f[i] = (x[i+1]-x[i-2])*x[i-1] - x[i] + F
f[0] = (x[1]-x[N-2])*x[N-1] - x[0] + F
f[1] = (x[2]-x[N-1])*x[0] - x[1] + F
f[N-1] = (x[0]-x[N-3])*x[N-2] - x[N-1] + F
return f
# L96をRK4で解く
def Model(xold, dt):
k1 = f(xold)
k2 = f(xold + k1*dt/2.)
k3 = f(xold + k2*dt/2.)
k4 = f(xold + k3*dt)
xnew = xold + dt/6.0*(k1 + 2.0*k2 + 2.0*k3 + k4)
return xnew
# 真値を全ステップ計算
def progress(deltat):
xn = np.zeros((N, nstep))
t = np.zeros((nstep))
X = float(F)*np.ones(N)
#X = fill(Float64(F), N)
# 初期擾乱はj=19(0始まりを考慮)の点にFの値の0.1%の値を与える。
X[19] = 1.001*F
for i in range(N):
xn[i, 0] = copy(X[i])
for j in range(1, nstep):
X = Model(X, deltat)
xn[:, j] = X[:]
#print(X[20], X[1], deltat)
t[j] = dt*j
return xn, t
# 実行
start = time.time()
xn, t = progress(dt)
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
計算時間
Julia 0.73 s
Python 2.94 s
でJuliaのほうが早かったです。2.3 GHz Quad-Core Intel Core i5積んだMacBook Proでやりました。
Pythonの最適化はほとんど考えてないので,もっと早くなると思います。arrayのインデックスの順番変えたりはしましたが,早くなりませんでした。PythonでもJITコンパイルできたりしますが,やったことないので,今回は保留しました。
とりあえず,Juliaの計算速度は遅くはなさそうでした。ちなみにExtended Kalman Filterによるデータ同化のコードも書いてみましたが,Juliaのほうが早かったです。
おまけ:作図
Juliaで計算結果をプロットするコード
ホフメラー図
using PyPlot
fig = plt.figure(figsize=(6, 5.))
ax1 = fig.add_subplot(111)
ax1.set_title("true")
ax1.set_xlabel("X_i")
ax1.set_ylabel("time step")
image1 = ax1.contourf(xn'[1:5000,:], )
cb1 = fig.colorbar(image1, ax=ax1, label="X")
時間変化
using PyPlot
fig = plt.figure(figsize=(5, 8.))
ax1 = fig.add_subplot(211)
nhalf = Int(nstep/2)
nstart = nhalf
nend = nstep
ax1.plot(t[nstart:nend], xn[1, nstart:nend], label="x1")
ax1.plot(t[nstart:nend], xn[2, nstart:nend], label="x2")
ax1.plot(t[nstart:nend], xn[3, nstart:nend], label="x3")
おまけ:データ保存
HDF5で保存。計算結果の後半,5ステップおきのデータを保存。
using HDF5
h5open("L96_true_obs.h5", "w") do file
write(file, "Xn_true", xn[:, nhalf:5:nstep])
write(file, "t", t[nhalf:5:nstep])
end
リードはこんな感じ
file = h5open("L96_true_obs.h5", "r")
Xn_true = read(file, "Xn_true")
t = read(file, "t")
close(file)
おまけ:Fortran
Fortranでも書いてみました。久しぶりに書いたけど,Fotranめんどう。
program main
implicit none
double precision :: F, dt, tend
integer :: N, nstep, i
real(8) :: xn(40, 14600), t(14600)
F = 8.0
N = 40
dt = 0.01
tend = 146
nstep = tend/dt
write(*, *) "F=", F
call progress(dt, N, nstep, F, xn, t)
write(*,*) "xn(:, 1)="
write(*,*) xn(1:N,1)
write(*,*) "xn(:, 2)="
write(*,*) xn(1:N,2)
write(*,*) "xn(:, nstep)=", xn(1:N,nstep)
!do i=1, N
! write(1,*) xn(i,1)
! write(10,*) xn(i,10)
! !write(5,*) xn(i,5)
! write(100,*) xn(i,nstep)
!enddo
!do i=1, nstep
! write(200,*) xn(1:N, i)
!enddo
end program main
!! Lorenz96の式
subroutine func(x, N, F, g)
implicit none
integer :: i, N
real(8) :: g(N), x(N), F
do i=3, N-1
g(i) = (x(i+1)-x(i-2))*x(i-1) - x(i) + F
enddo
g(1) = (x(2)-x(N-1))*x(N) - x(1) + F
g(2) = (x(3)-x(N))*x(1) - x(2) + F
g(N) = (x(1)-x(N-2))*x(N-1) - x(N) + F
return
end subroutine func
subroutine RK4(xold, N, dt, F, xnew)
implicit none
integer :: i, N
real(8) :: k1(N), k2(N), k3(N), k4(N), xold(N), xnew(N), dt, F
real(8) :: temp(N)
do i=1, N
temp(i) = xold(i)
enddo
call func(temp, N, F, k1)
do i=1, N
temp(i) = xold(i) + k1(i)*dt/2.
enddo
call func(temp, N, F, k2)
do i=1, N
temp(i) = xold(i) + k2(i)*dt/2.
enddo
call func(temp, N, F, k3)
do i=1, N
temp(i) = xold(i) + k3(i)*dt
enddo
call func(temp, N, F, k4)
do i=1, N
xnew(i)=xold(i) + dt/6.0*(k1(i)+2.0*k2(i)+2.0*k3(i)+k4(i))
enddo
return
end subroutine RK4
subroutine progress(dt, N, nstep, F, xn, t)
implicit none
integer :: N, nstep, i, j
real(8) :: dt, F, xn(N, nstep), t(nstep), x(N), temp(N)
do i=1, N
x(i) = F
enddo
x(20) = 1.001*F
do i=1,N
temp(i) = x(i)
enddo
do j=1, nstep
call RK4(temp, N, dt, F, temp)
do i=1, N
xn(i, j) = temp(i)
enddo
t(j) = dt*j
enddo
return
end subroutine progress
Homebrewでいれたgfortranでやったところ,
0.045s でした。
上のJuliaのコードはこれに比べるとだいぶ遅いですが,黒木さんによって最適化されたコード https://gist.github.com/genkuroki/f0426a7f3772c804ace0e6875445ea49
ではJuliaで5msで計算できたとのことです。
上記のコードはグローバル変数使ってるので,そこを直すと早くなるはず。
(これからやります)