この記事の本文およびスクリプトは、ChatGPT5とGemini 2.5Proの生成結果に筆者が手直しを加えることで作成されました。
はじめに
気象・気候分野の研究では、巨大なデータセットを扱うことが日常茶飯事です。
このような巨大データを扱う際、しばしば次のような悩みにぶちあたります。
- 数十年分のデータを読み込もうとすると、メモリが足りなくてPCが固まる…
- 全球の各格子点で統計計算をしたら、一晩経っても終わらない…
大規模なデータを扱う宿命とも言えるこれらの課題。実は、PythonのライブラリDaskを使うことで、鮮やかに解決できるかもしれません。
この記事では、大規模データを効率的に処理するための考え方と実践的なテクニックを解説します。
Jupyter Notebookは対話的な分析に非常に便利ですが、Daskの計算、特にメモリを多く使う処理では、時々予期せぬ遅延が発生することがあります。 本記事のPythonスクリプトの内容をJupyter Notebook上で実行しても計算が速くならない可能性があります。 なお、Jupyter Labには https://github.com/dask/dask-labextension という拡張パッケージもあるようですが、筆者はまだ試したことがありません。
1. Daskとは? なぜメモリ不足が解消されるのか?
Daskは、PandasやNumPyといったお馴染みのライブラリと連携し、メモリに乗り切らないほど大きなデータを効率的に扱うための並列計算ライブラリです。
Daskを理解する上でのキーワードは2つあります。
1.1. ポイント①:チャンク (Chunks)
Daskは、巨大なデータ(例えば全球・長期間のNetCDFファイル)を、まるでお豆腐を切り分けるように、チャンクと呼ばれる小さな塊に分割して扱います。
それぞれのチャンクはメモリに収まるサイズなので、PCのメモリを使い果たすことなく処理を進めることができます。xarray
と組み合わせれば、open_dataset
時にchunks
引数を指定するだけで、この恩恵を受けられます。
1.2. ポイント②:遅延評価 (Lazy Evaluation)
Daskのもう一つの特徴は、遅延評価です。
これは、「計算の命令を受けてもすぐには実行せず、まず『計算の手順書(タスクグラフ)』を作成する」という仕組みです。
料理に例えると、レシピ(手順書)を最後まで書き上げるだけで、実際の調理(計算)はまだ始めません。
そして、.compute()
という命令が与えられた瞬間に、Daskは最適化されたタスクグラフに従って、分割したチャンクを複数のCPUコアに割り振り、一気に並列処理を実行します。これにより、計算時間の大幅な短縮が期待できます。
2. 実践!DaskでWelchのt検定をやってみよう
それでは、実際に手を動かしながら、Daskを使ったデータ処理を体験していきましょう。最終的には、2つの気候シミュレーション結果の差を検定するスクリプトを完成させます。
2.1. まずは元データのチャンクサイズを確認する。
Daskで効率的に処理を行うには、 読み込みたいデータがディスク上でどのように保存されているか(チャンクサイズ) を知ることが非常に重要です。
もし、Daskで指定するチャンクサイズが、ディスク上のチャンクサイズと全く異なると、Daskは内部でデータの並べ替え(Rechunking)という非常にコストの高い処理を行うことになります。これを避けるため、まずは元ファイルの情報を確認しましょう。
import xarray as xr
# ファイルパス
data_path = "exp1/prcp.nc"
# まずはチャンクを指定せずにファイルを開く
ds = xr.open_dataset(data_path)
# ファイル内の各変数のエンコーディング情報を確認
for name, da in ds.data_vars.items():
enc = da.encoding
# NetCDFでは'chunksizes', Zarrでは'chunks'に情報が入っていることが多い
on_disk_chunks = enc.get("chunksizes") or enc.get("chunks")
print(f"変数 '{name}':")
print(f" ディスク上のチャンク = {on_disk_chunks}")
print(f" 全体の形 = {da.shape}")
print(f" 次元 = {da.dims}")
$ python check_chunks.py
変数 'time_bnds':
ディスク上のチャンク = (1, 2)
全体の形 = (1200, 2)
次元 = ('time', 'bnds')
変数 'lon_bnds':
ディスク上のチャンク = None
全体の形 = (256, 2)
次元 = ('lon', 'bnds')
変数 'lat_bnds':
ディスク上のチャンク = None
全体の形 = (128, 2)
次元 = ('lat', 'bnds')
変数 'pr':
ディスク上のチャンク = (1, 128, 256)
全体の形 = (1200, 128, 256)
次元 = ('time', 'lat', 'lon')
この結果を見て、ディスク上のチャンクサイズを把握します。Daskで指定するチャンクサイズは、このディスク上のチャンクサイズの倍数に設定するのが理想的です。今回の例では、ディスク上のチャンクと全体の形を比較すると、緯度・経度方向が一致していますので、チャンクを切らない方が良さそうです。一方、時間方向には好きに刻んで良さそうです。
2.2. Daskでデータをチャンクで読み込む
元データの構造を把握したら、いよいよDaskでデータを読み込みます。xarray.open_dataset
のchunks
引数に、先ほど確認したディスク上のチャンクサイズを考慮してサイズを指定します。
import xarray as xr
# ファイルパス
data_path = "exp1/prcp.nc"
# 2.1で確認した情報を元にチャンクサイズを決める
ds_dask = xr.open_dataset(data_path, chunks={"time": 512, "lat": 128, "lon": 256})
da_dask = ds_dask["pr"]
print(da_dask)
$ python compute_with_dask.py
<xarray.DataArray 'pr' (time: 1200, lat: 128, lon: 256)> Size: 157MB
dask.array<open_dataset-pr, shape=(1200, 128, 256), dtype=float32, chunksize=(512, 128, 256), chunktype=numpy.ndarray>
Coordinates:
* time (time) object 10kB 1951-01-16 12:00:00 ... 2050-12-16 12:00:00
* lon (lon) float64 2kB 0.0 1.406 2.812 4.219 ... 354.4 355.8 357.2 358.6
* lat (lat) float64 1kB 88.93 87.54 86.14 84.74 ... -86.14 -87.54 -88.93
Attributes:
standard_name: precipitation_flux
units: kg/m**2/s
CDI_grid_type: gaussian
CDI_grid_num_LPE: 64
orignal_name: PRCP
これを実行すると、コンソールには見慣れたxarray.DataArray
の情報が表示されますが、よく見ると配列の中身がdask.array<chunksize=(1200, 128, 256), ...>
のようになっているはずです。これが、データの実体がまだメモリに読み込まれず、Daskによって管理されている証拠です。
2.3. 簡単な計算で「遅延評価」を体験する
次に、このデータを使って簡単な計算をしてみましょう。例えば、時間平均を計算します。
# 前からの続き
mean_da = da_dask.mean(dim="time")
print(mean_da)
$ compute_with_dask.py
<xarray.DataArray 'pr' (lat: 128, lon: 256)> Size: 131kB
dask.array<mean_agg-aggregate, shape=(128, 256), dtype=float32, chunksize=(128, 256), chunktype=numpy.ndarray>
Coordinates:
* lon (lon) float64 2kB 0.0 1.406 2.812 4.219 ... 354.4 355.8 357.2 358.6
* lat (lat) float64 1kB 88.93 87.54 86.14 84.74 ... -86.14 -87.54 -88.93
これを実行しても、まだ計算は実行されません。mean_da
の中身も、やはりdask.array
になっているはずです。Daskは「時間平均を取る」という命令をタスクグラフに追加しただけなのです。
では、どうすれば計算結果を得られるのでしょうか?ここで登場するのが.compute()
メソッドです。
# 前からの続き
from dask.diagnostics import ProgressBar
# .compute()で計算を実行する
with ProgressBar():
result = mean_da.compute()
print(result)
$ compute_with_dask.py
[########################################] | 100% Completed | 18.97 s
<xarray.DataArray 'pr' (lat: 128, lon: 256)> Size: 131kB
array([[4.5946908e-06, 4.5965180e-06, 4.5994357e-06, ..., 4.5887577e-06,
4.5907150e-06, 4.5936322e-06],
[6.0418320e-06, 6.0375569e-06, 6.0307220e-06, ..., 6.0537495e-06,
6.0487741e-06, 6.0450388e-06],
[6.5269346e-06, 6.5351310e-06, 6.5394520e-06, ..., 6.5046634e-06,
6.5113527e-06, 6.5187596e-06],
...,
[2.0851039e-06, 2.0797195e-06, 2.0753000e-06, ..., 2.1066608e-06,
2.0991031e-06, 2.0919338e-06],
[2.0624450e-06, 2.0653790e-06, 2.0686366e-06, ..., 2.0564150e-06,
2.0581058e-06, 2.0598022e-06],
[1.1929094e-06, 1.1877125e-06, 1.1826327e-06, ..., 1.2071769e-06,
1.2023437e-06, 1.1977698e-06]], dtype=float32)
Coordinates:
* lon (lon) float64 2kB 0.0 1.406 2.812 4.219 ... 354.4 355.8 357.2 358.6
* lat (lat) float64 1kB 88.93 87.54 86.14 84.74 ... -86.14 -87.54 -88.93
.compute()
が呼び出された瞬間に、Daskはそれまで組み立ててきたタスクグラフ(データ読み込み→平均計算)を一気に実行します。ProgressBar
を使うと、計算の進捗が可視化されるので、大規模な計算を行う際には非常に便利です。
2.4. 複数の計算を効率的に計算する
Welchのt検定を行うには、2つのデータセットそれぞれについて、サンプル数(n)、平均(mean)、分散(var)が必要です。これらを個別に.compute()
で計算すると、同じデータを何度も読み込むことになり非効率です。
Daskでは、複数の計算をdask.persist()
で一度にまとめて実行させることができます。
import os
import xarray as xr
import dask
from dask.diagnostics import ProgressBar
# ファイルパス
data_dir = "./data/processed/"
exp1_name = "cgcm_pi"
exp2_name = "cgcm_42"
file_name= "prcp.nc"
var_name = "pr"
data1_path = os.path.join(data_dir, exp1_name, "cat", file_name)
data2_path = os.path.join(data_dir, exp2_name, "cat", file_name)
# 気候モデルの日付対応
time_coder = xr.coders.CFDatetimeCoder(use_cftime=True)
# 2つのデータセットを読み込む
da1 = xr.open_dataset(data1_path, chunks={"time": 512}, decode_times=time_coder)[var_name]
da2 = xr.open_dataset(data2_path, chunks={"time": 512}, decode_times=time_coder)[var_name]
# 統計量を計算する関数を定義
def stats_over_time(da: xr.DataArray) -> tuple[int, float, float]:
n = da.count(dim="time")
mean = da.mean(dim="time", skipna=True)
var = da.var(dim="time", skipna=True, ddof=1)
return n, mean, var
# 2つのデータセットから、3つの統計量をそれぞれ計算
n1, m1, v1 = stats_over_time(da1)
n2, m2, v2 = stats_over_time(da2)
# dask.persist()で、6つの統計量をまとめて計算し、中間結果をメモリに維持
# これにより、後続のt検定の計算が高速になる
with ProgressBar():
n1, m1, v1, n2, m2, v2 = dask.persist(n1, m1, v1, n2, m2, v2)
# 計算結果のチェック
print(n1)
ここでは.compute()
の代わりにdask.persist()
を使いました。これは、計算を実行しつつ、結果をPythonの変数としてではなく、Daskが管理するメモリ空間内に保持する命令です。後続の計算でこれらの結果を再利用する場合に効率的です。
print(n1)
を実行すると、計算結果が格納されたxarray.DataArrayが表示されます。配列の中身がdask.arrayではなく、具体的な数値(この場合はサンプル数)に変わっていることが確認できます。
3. 実践!統計的に妥当なt検定の実装
本記事ではDaskを用いた高速化に焦点を当てますが、気象データでt検定を行う際には統計的な妥当性を確保することが非常に重要です。
特に、日々の気象データが持つ「自己相関」(今日の値が昨日の値と似ている性質)を無視すると、誤った結論を導く可能性があります。そのため、本記事で紹介するスクリプトでは、自己相関を補正するための実効サンプルサイズという考え方を導入しています。
t検定の理論や自己相関の詳しい解説については、以下の講義資料が非常に参考になりますので、まずはこちらをご一読ください。
- 参考資料: 気象情報解析特論 第10回 平均値の差のt検定
本記事では、この統計的な背景を踏まえた上で、Daskによる実装と高速化の側面に絞って解説を進めます。
3.1. Daskを使ったt検定の実装フロー
統計的妥当性を考慮したt検定の計算フローは以下のようになります。
-
データ読み込み:
xarray
とdask
を使い、データをチャンクに分割して読み込みます。 -
自己相関の計算: リチャンクする前の、時間軸が細かく分割されたデータを使って、各グリッドのラグ1自己相関係数(
rho
)を計算します。(xr.corr
は並列化しやすいため) -
リチャンク:
time
次元のチャンクを-1
に設定し、各グリッドの時系列データがメモリ上で連続になるようにします。 -
基本統計量の計算: リチャンク後のデータに対し、サンプル数(
n
)、平均(m
)、分散(v
)を計算します。ここでは数値的に安定なWelfordアルゴリズムをapply_ufunc
で適用します。 -
実効サンプルサイズの計算:
n
とrho
から、実効サンプルサイズ(n_eff
)を計算します。 -
p値の計算:
n_eff
,m
,v
を使って、Welchのt検定のp値を計算します。 - 結果の保存: 計算されたp値マップをNetCDFファイルとして書き出します。
この「計算ごとに最適なチャンク形式を使い分ける」のが、パフォーマンスを引き出す鍵です。
3.2. 全体をスクリプトにまとめる
これまでのステップを統合し、t検定の計算とp値の保存までを行う一連の処理を、再利用可能なPythonスクリプトとしてまとめます。
以下のスクリプトは、統計的な妥当性と計算の安定性・効率性を両立させた完成形です。コマンドラインからチャンクサイズを引数として渡せるようにして、後のチューニング作業を楽にする工夫も加えてあります。
サンプルコード(spatial_welch_p.pyの完全版)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Daskとxarrayを使用し、2つの大規模気象データセット間で空間的なWelchのt検定を実行するスクリプト。
時系列の自己相関を考慮した実効サンプルサイズを用いることで、統計的な妥当性を確保する。
また、数値的に安定なWelfordアルゴリズムと、計算内容に応じたチャンク戦略により、
計算の安定性とパフォーマンスを両立させる。
実行方法:
# ターミナルから以下のように実行
python spatial_welch_p.py --variable prcp --season JJA --exp1_name exp1 --exp2_name exp2 --overwrite
"""
import argparse
import csv
import os
import resource
import sys
import time
from pathlib import Path
import dask
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from scipy import stats
# --- グローバル設定 ---
engine = "netcdf4"
seas_dict = {
"DJF": [1, 2, 12], "MAM": [3, 4, 5],
"JJA": [6, 7, 8], "SON": [9, 10, 11]
}
varname_dict = {
"precw": "prw", "z_p850": "zg", "T2": "tas",
"prcp": "pr", "uqvint": "hus", "vqvint": "hus",
}
# --- 関数定義 ---
def welford_reduce(da, axis=-1):
"""
Welford's online algorithmをNumPy配列に適用する内部関数。
apply_ufuncによって呼び出され、数値的に安定した形で統計量を計算する。
"""
count = np.zeros(da.shape[:-1], dtype=np.int64)
mean = np.zeros(da.shape[:-1], dtype=np.float64)
M2 = np.zeros(da.shape[:-1], dtype=np.float64)
for i in range(da.shape[axis]):
valid_slice = np.isfinite(da[..., i])
delta = da[..., i] - mean
count += valid_slice.astype(np.int64)
count_safe = np.where(count > 0, count, 1)
mean += (delta / count_safe) * valid_slice
delta2 = da[..., i] - mean
M2 += (delta * delta2) * valid_slice
variance = np.where(count > 1, M2 / (count - 1), np.nan)
return count.astype(np.float64), mean, variance
def stable_stats_over_time(da: xr.DataArray):
"""
Welfordアルゴリズムを使い、時間次元に沿ってサンプル数、平均、不偏分散を計算する。
この関数はtime次元が単一チャンクであることを要求する。
"""
n, mean, var = xr.apply_ufunc(
welford_reduce, da,
input_core_dims=[['time']],
output_core_dims=[[], [], []],
exclude_dims={'time'},
dask="parallelized",
output_dtypes=[np.float64, np.float64, np.float64]
)
n.name, mean.name, var.name = "n", "mean", "var"
return n, mean, var
def calculate_autocorr(da: xr.DataArray):
"""
各グリッドでラグ1自己相関係数を計算する。
xr.corrはtime次元が分割されている方がDaskでの並列化効率が良い。
"""
da_lagged = da.shift(time=1)
rho = xr.corr(da, da_lagged, dim="time")
return rho.astype(np.float32)
def calculate_effective_n(n, rho):
"""
自己相関係数を用いて実効サンプルサイズを計算する。
"""
rho = rho.clip(max=0.999)
n_eff = n * (1 - rho) / (1 + rho)
n_eff = n_eff.where(n_eff <= n, n)
n_eff = n_eff.clip(min=2)
return n_eff
def welch_p(n1, m1, v1, n2, m2, v2):
"""
2つのデータセットの統計量からWelchのt検定のp値を計算する。
自由度はSatterthwaiteの式を用いてより正確に計算する。
"""
s1_sq = xr.where(n1 > 0, v1 / n1, np.nan)
s2_sq = xr.where(n2 > 0, v2 / n2, np.nan)
t_stat = (m1 - m2) / np.sqrt(s1_sq + s2_sq)
dof_num = (s1_sq + s2_sq)**2
dof_den = (s1_sq**2 / xr.where(n1 > 1, n1 - 1, 1)) + \
(s2_sq**2 / xr.where(n2 > 1, n2 - 1, 1))
dof = dof_num / dof_den
p = 2.0 * stats.t.sf(np.abs(t_stat), dof)
mask = (n1 >= 2) & (n2 >= 2) & ((s1_sq + s2_sq) > 0) & (dof > 0)
return xr.where(mask, p, np.nan).astype("float32").rename("p_value")
def main(args):
"""スクリプトのメイン実行部分"""
start_time = time.perf_counter()
# --- パスとファイル名の設定 ---
# データはカレントディレクトリの 'data/exp1_name', 'data/exp2_name' にあると想定
base_data_dir = Path("./data")
data1_dir = base_data_dir / args.exp1_name
data2_dir = base_data_dir / args.exp2_name
out_dir = base_data_dir / "stats"
out_dir.mkdir(exist_ok=True)
var_name_in_file = varname_dict.get(args.variable)
if not var_name_in_file:
raise KeyError(f"Variable '{args.variable}' is not defined in varname_dict.")
# 季節ごとのファイルを作成(cdoコマンドが必要)
input_filename = f"{args.variable}_{args.season}.nc"
data1_path = data1_dir / input_filename
data2_path = data2_dir / input_filename
# --- 季節ファイルの存在チェックと作成 ---
for data_path, data_dir in [(data1_path, data1_dir), (data2_path, data2_dir)]:
if not data_path.exists():
print(f"Seasonal file not found: {data_path}. Creating...")
org_path = data_dir / f"{args.variable}.nc"
if not org_path.exists():
raise FileNotFoundError(f"Original data file not found: {org_path}")
# cdoを使って季節ファイルを切り出す
mons_str = ",".join(map(str, seas_dict[args.season]))
os.system(f"cdo -selmon,{mons_str} {org_path} {data_path}")
print(f"Created {data_path}")
# --- Daskチャンク設定 ---
chunks = {"time": args.time_chunk} if args.time_chunk > 0 else "auto"
# --- メイン計算フロー ---
print("Starting analysis...")
# 1. データの読み込み
da1 = xr.open_dataset(data1_path, chunks=chunks, engine=engine)[var_name_in_file]
da2 = xr.open_dataset(data2_path, chunks=chunks, engine=engine)[var_name_in_file]
# 2. 自己相関の計算 (リチャンク前)
print("Computing autocorrelation...")
rho1 = calculate_autocorr(da1)
rho2 = calculate_autocorr(da2)
# 3. リチャンク
print("Rechunking data...")
da1_rechunked = da1.chunk({"time": -1, "lat": "auto", "lon": "auto"})
da2_rechunked = da2.chunk({"time": -1, "lat": "auto", "lon": "auto"})
# 4. 基本統計量の計算 (リチャンク後)
print("Computing stable statistics (n, mean, var)...")
n1, m1, v1 = stable_stats_over_time(da1_rechunked)
n2, m2, v2 = stable_stats_over_time(da2_rechunked)
# 中間結果をまとめて計算実行
print("Persisting intermediate results...")
with ProgressBar():
rho1, rho2, n1, m1, v1, n2, m2, v2 = dask.persist(
rho1, rho2, n1, m1, v1, n2, m2, v2)
# 5. 実効サンプルサイズの計算
print("Computing effective sample size...")
n_eff1 = calculate_effective_n(n1, rho1)
n_eff2 = calculate_effective_n(n2, rho2)
# 6. p値の計算
print("Performing Welch's t-test...")
pmap = welch_p(n_eff1, m1, v1, n_eff2, m2, v2)
# 7. 結果の保存
out_path = out_dir / f"{args.variable}_{args.season}_pval.nc"
if out_path.exists() and args.overwrite:
out_path.unlink()
print(f"Writing results to {out_path}...")
delayed_write = pmap.to_netcdf(out_path, compute=False)
with ProgressBar():
delayed_write.compute()
print(f"Analysis finished in {time.perf_counter() - start_time:.2f} seconds.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Perform spatial Welch's t-test on large climate datasets using Dask.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('--variable', type=str, required=True, help='Variable name to analyze (e.g., prcp).')
parser.add_argument('--season', type=str, required=True, choices=seas_dict.keys(), help='Season to analyze (e.g., JJA).')
parser.add_argument('--exp1_name', type=str, required=True, help="Directory name for experiment 1 data (e.g., 'exp1').")
parser.add_argument('--exp2_name', type=str, required=True, help="Directory name for experiment 2 data (e.g., 'exp2').")
parser.add_argument('--time_chunk', type=int, default=-1, help='Chunk size for the time dimension. -1 for single chunk.')
parser.add_argument('--overwrite', action='store_true', help='Overwrite existing output files.')
parsed_args = parser.parse_args()
main(parsed_args)
4. 最速を目指せ!チャンク設計の最適化
Daskのパフォーマンスはチャンクサイズに大きく左右されます。ここが最も重要で、かつ奥が深い部分です。
- チャンクが小さすぎる: タスクの数が膨大になり、Daskがタスクを管理するコスト(オーバーヘッド)で逆に遅くなります。
- チャンクが大きすぎる: 各チャンクがワーカー(CPUコア)のメモリに収まらず、計算が失敗したり、極端に遅くなったりします。
では、最適なサイズはどうやって見つけるのか? 答えは 「データや計算内容、マシンスペックによるので、試すのが一番!」 です。
「いろいろ試すのって、結局大変じゃない…?」
そう思われるかもしれません。確かに、手作業で一つずつ設定を変えて実行し、時間を計るのは骨の折れる作業です。
しかし、3.2節で示したスクリプトの目的は、まさにその「大変な試行錯誤」を自動化することにあります。
一度このスクリプトをセットして実行すれば、あとはPCが勝手に全ての組み合わせを試し、結果を記録してくれます。夜に実行しておけば、朝には最適設定のヒントが得られている、という使い方ができます。
ここで一度最適なチャンクサイズを見つけておけば、今後の同じデータ形式での解析が何時間も、場合によっては何日も短縮される可能性があります。これは、将来の自分の時間を節約するための、非常に価値のある「先行投資」なのです。
4.1. 計算内容で変わる最適なチャンク
今回のスクリプトでは、2種類の計算が登場しました。
-
calculate_autocorr
:xr.corr
を使用。この関数は、時間軸に沿った相関を計算しますが、Daskは時間軸が細かくチャンク分割されている方が、各チャンクの相関計算を並列化できるため高速です。 -
stable_stats_over_time
:apply_ufunc
で自作関数を適用。各グリッドの時系列全体を一度に処理する必要があるため、時間軸が単一のチャンク ("time": -1
) である必要があります。
このように、Daskのパフォーマンスは「どの処理を」「どのようなデータ(チャンク)に対して行うか」に依存します。 今回のスクリプトでは、計算の順番を工夫し、それぞれの処理に最適なチャンク形式のデータを渡すことで、全体のパフォーマンスを向上させています。
4.2. チューニング用スクリプト
このrun_tuning.sh
は、先ほど作成したPythonスクリプトに対し、チャンクサイズの組み合わせをループで次々に試して、それぞれの計算時間とメモリ使用量を記録してくれます。
サンプルコード(run_tuning.sh)
#!/bin/bash
# ==============================================================================
# Description:
# Daskを使ったPythonスクリプト(welch_test_dask.py)に対し、
# チャンクサイズの様々な組み合わせを試行し、最適な設定を見つけるための
# ラッパースクリプト。
#
# How to use:
# 1. 下記の TIME_CHUNKS, LAT_CHUNKS, LON_CHUNKS の配列を編集し、
# 試したいチャンクサイズの候補を定義します。
# (ヒント: on-diskのチャンクサイズの倍数を指定するのが効率的です)
# 2. このスクリプトに実行権限を付与します: chmod +x run_tuning.sh
# 3. 実行します: ./run_tuning.sh
# 4. tuning_results.csv に記録される計算時間とメモリ使用量を確認します。
# ==============================================================================
# --- ★試行したいチャンクサイズの候補を定義 ---
TIME_CHUNKS=(256, 1024, 2048)
LAT_CHUNKS=(128)
LON_CHUNKS=(256)
# -----------------------------------------------
# 実行するPythonスクリプトのパス
PYTHON_SCRIPT="welch_test_dask.py"
# 結果を記録するCSVファイル名
RESULTS_CSV="tuning_results.csv"
# 以前の結果ファイルが存在すれば、タイムスタンプを付けてリネーム
if [ -f ${RESULTS_CSV} ]; then
echo "Renaming old ${RESULTS_CSV}"
timestamp=$(date '+%Y-%m-%dT%H%M%S')
mv ${RESULTS_CSV} ${RESULTS_CSV%.*}_${timestamp}.csv
fi
echo "Starting chunk tuning experiment..."
echo "Results will be saved to ${RESULTS_CSV}"
echo "========================================="
# 全てのチャンクサイズの組み合わせでループ
for lon_chunk in "${LON_CHUNKS[@]}"; do
for lat_chunk in "${LAT_CHUNKS[@]}"; do
for t_chunk in "${TIME_CHUNKS[@]}"; do
echo "Running with: time=${t_chunk}, lat=${lat_chunk}, lon=${lon_chunk}"
# Pythonスクリプトを引数付きで実行
python ${PYTHON_SCRIPT} \
--time_chunk ${t_chunk} \
--lat_chunk ${lat_chunk} \
--lon_chunk ${lon_chunk} \
--overwrite \
--get_cost
echo "-----------------------------------------"
sleep 1 # 念のため間隔をあける
done
done
done
echo "========================================="
echo "Tuning experiment finished!"
echo "Analyze the results in ${RESULTS_CSV}"
4.3. 結果の分析
run_tuning.sh
を実行すると、tuning_results.csv
に各設定での計算コストが記録されます。以下は、時間方向に約9000タイムステップあるデータについて、空間チャンクを(128, 256)
に固定し、時間チャンクだけを変更した場合の実行結果の例です。
共有サーバで計算する場合、elapsed_time_s
(経過時間)は他のユーザーの負荷によって変動します。より正確にチャンクサイズ変更の純粋な効果を評価するには、あなたのプログラムのためにCPUが実際に稼働した時間であるCPU時間 (user_time_s
+ system_time_s
) に着目するのがおすすめです。
time_chunk | elapsed_time_s | max_memory_gb | cpu_time_s (user+sys) |
---|---|---|---|
256 | 540.19 | 2.835 | 989.70 |
1024 | 281.72 | 3.572 | 795.49 |
2048 | 13.62 | 2.017 | 30.25 |
この表を見ると、time_chunk
を256
から2048
に増やすことで、CPU時間(cpu_time_s
)が約990秒から約30秒へと、30倍以上も短縮されています。
これは、チャンクが小さすぎると(time_chunk=256
)、Daskが処理するタスクの数が膨大になり、その管理コスト(オーバーヘッド)が実際の計算時間よりもはるかに大きくなってしまう典型的な例です。チャンクをある程度大きくすることで(time_chunk=2048
)、タスク管理のオーバーヘッドが劇的に減り、CPUが効率的に計算を実行できるようになったことを示しています。
このように、自分の計算環境とデータに合わせた「スイートスポット」を見つけることが、Daskを使いこなす上で非常に重要です。
4.4. 少し深掘り (user_time
とsystem_time
のバランス)
はじめに
-
user_time
(ユーザー時間): あなたが書いたPythonコード(t検定の計算など)を実行するために、CPUが純粋に計算していた時間です。 -
system_time
(システム時間): ファイルの読み書きや、DaskがOSに「このタスクを実行して」とお願いする処理(システムコール)など、OSの機能を呼び出すためにCPUが使われた時間です。いわば、「計算のための準備や後片付け」の時間です。
テーブルから読み解くバランスの変化
tuning_results.csv
を見ると、このバランスが劇的に変化しているのが分かります。
time_chunk | user_time_s | system_time_s | バランス |
---|---|---|---|
256 | 22.87 | 966.83 | system_timeが圧倒的に多い (約42倍) |
1024 | 20.88 | 774.61 | system_timeが多い (約37倍) |
2048 | 18.23 | 12.02 | ほぼ均等 |
なぜsystem_time
が異常に長かったのか?
time_chunk
が256や1024と小さい場合、system_time
が異常に長くなっていますね。これは、Daskのタスク管理のオーバーヘッドが原因です。
-
チャンクが小さい場合 (256, 1024):
- Daskは、非常にたくさんの小さなチャンク(豆腐)を処理するために、OSに対して膨大な回数の「このファイルの一部を読んで」「この小さな計算をして」というお願い(システムコール)をします。
- OSは、この無数のお願いをさばくだけで手一杯になります。CPUは、実際の計算(
user_time
)よりも、この準備や後片付け(system_time
)にほとんどの時間を費やしてしまいます。
-
チャンクが最適な場合 (2048):
- チャンクが適度な大きさになると、DaskからOSへのお願いの回数が劇的に減ります。
- OSの負担が軽くなり、
system_time
が大幅に減少します。 - その結果、CPUは本来の仕事である科学計算(
user_time
)に集中できるようになり、全体の処理が高速化します。
理想的なバランスは?
理想的なのは、system_time
がuser_time
に対して極端に大きくならない状態です。
今回の例のようにsystem_time
がuser_time
の何十倍にもなっている場合、それは「I/O(ファイルの読み書き)やタスク管理にボトルネックがある」という明確なサインです。チャンクサイズを調整することで、このボトルネックを解消し、CPUが効率よく計算に時間を使えるようになった、ということがこの結果から読み取れます。
5. まとめ
- Daskとxarrayを組み合わせることで、高速で大規模な気象・気候データを扱うことができます。
- 計算のパフォーマンスはチャンク設計に大きく依存し、処理内容によって最適なチャンクサイズは異なります。
- 計算の順番を工夫し、各処理に最適なデータ形式を与えることで、全体のパフォーマンスを向上させることができます。
今回紹介したアプローチは、t検定だけでなく、EOF解析や相関分析など、様々な空間データに対する統計計算に応用できます。ぜひ皆さんの研究にもDaskを活用して、データ解析を効率化・高速化してみてください!
6. 今後の課題
- Zarrの活用
- Daskクラスタによるさらなるスケールアップ
- https://github.com/dask/dask-labextension のテスト