Basemapは既にEOLがアナウンスされているため、図化の項はもはや機能しない可能性があることにご注意ください。
解析雨量をPythonで読みたい
気象庁/気象業務支援センターの1kmメッシュ解析雨量(2006年以降)はGRIB2形式の*.binファイルで提供されています。しかしながら独自の圧縮方式をとっているため、一般的なGRIB2形式に対応するGDALやpygrib2では扱うことができません。解析雨量に対応しているwgrib2を使用してもよいですが、一度バイナリファイルやNetCDFファイルにダンプする必要が生じます。
以下ではシンプルなPythonのコードで解析雨量を直接Numpy配列として読み込み、ついでにMatplotlibとBasemapで描画も行う手順を解説します。
解析雨量GRIB2ファイルの構成
バイナリデータの仕様は気象庁の配信資料に関する技術情報(気象編)第238号および第193号の改訂版に記載されています。また各節で使用されるテンプレート等については国際気象通報式・別冊で規定されています。
データは全部で8節からなり、第2節を除いた第0節から第8節までで構成されます。大雑把に言えば、データの開始・終了を表す第0・8節、データの解像度・範囲や時刻等の仕様を表す第1~4・6節、レベル値(生データ)をデータ代表値(1km格子を代表する降水量の10倍値)に変換するテーブルを表す第5節、圧縮されたレベル値が含まれる第7節に分けられています。第7節では降水量に対応する整数のレベル値がランレングス圧縮で西から東向きに2560要素ずつ、北から格納されています1 。したがって降水量を得るには、ランレングス展開した第7節のレベル値を、第5節の変換テーブルに基づいて変換し、1/10倍すればよいです。
各データ節の長さ
各節は固定長の節と可変長の節があり、節の先頭に節の長さ(バイト数)と節番号が格納されています。
解析雨量のデータフォーマットでは第0, 1, 3, 4, 6節は順に16, 21, 72, 82, 6バイトの固定長と定義されており、第5, 7節はレベル数・降水量に応じて可変です。ただし第5節は現行の解析雨量が最大98レベルで変化していないことから、実質213バイト(=17+2*98)になります。
特定の節を読むには、それより前の節の長さを足した位置から読めばよいです。
データ代表値
第5節で定義されるレベル値とデータ代表値の対応は次の表のようになります。
レベル | データ代表値 |
---|---|
1 | 0 |
2 | 4 |
3 | 10 |
(...) | (10刻み) |
79 | 770 |
80 | 800 |
(...) | (50刻み) |
89 | 1250 |
90 | 1300 |
(...) | (100刻み) |
97 | 2000 |
98 | 2550 |
これに加えて第7節では"欠測値"がレベル0として表現されています。整数値としてデコードする場合は欠測値に適当な値を割り当てたほうが都合が良いため、以下のコードでは"-10"としています。
ランレングス符号化
第7節で用いられているランレングス符号化の方式は繰り返し数を冪の和で表現するもので、ランレングス符号化法の解説の式に基づいて展開します。
展開前は符号なし整数(0~255)のバイト列です。0~出現したレベル値の最高値(<=98)以下はレベル値をそのまま表し、それより大きい値~255は直前のレベル値の繰り返し数に対応します。後者が連続する場合は順に桁を繰り上げた冪数として足した和が繰り返し数となります。
デコード
(財)気象業務支援センターからダウンロードした解析雨量ファイルのサンプルを使用し、このファイルに含まれる降水量をNumpyのndarray(南北3360x東西2560)として読み込んでみます。
from itertools import repeat
import struct
import numpy as np
def set_table(section5):
max_level = struct.unpack_from('>H', section5, 15)[0]
table = (
-10, # define representative of level 0 (Missing Value)
*struct.unpack_from('>'+str(max_level)+'H', section5, 18)
)
return np.array(table, dtype=np.int16)
def decode_runlength(code, hi_level):
for raw in code:
if raw <= hi_level:
level = raw
pwr = 0
yield level
else:
length = (0xFF - hi_level)**pwr * (raw - (hi_level + 1))
pwr += 1
yield from repeat(level, length)
def load_jmara_grib2(file):
with open(file, 'rb') as f:
binary = f.read()
len_ = {'sec0':16, 'sec1':21, 'sec3':72, 'sec4':82, 'sec6':6}
end4 = len_['sec0'] + len_['sec1'] + len_['sec3'] + len_['sec4'] - 1
len_['sec5'] = struct.unpack_from('>I', binary, end4+1)[0]
section5 = binary[end4:(end4+len_['sec5']+1)]
end6 = end4 + len_['sec5'] + len_['sec6']
len_['sec7'] = struct.unpack_from('>I', binary, end6+1)[0]
section7 = binary[end6:(end6+len_['sec7']+1)]
highest_level = struct.unpack_from('>H', section5, 13)[0]
level_table = set_table(section5)
decoded = np.fromiter(
decode_runlength(section7[6:], highest_level), dtype=np.int16
).reshape((3360, 2560))
# convert level to representative
return level_table[decoded]
file = rb'./SRF_GPV_Ggis1km_Prr60lv_ANAL_20180707/Z__C_RJTD_20180707000000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin'
rain = load_jmara_grib2(file) / 10
最後にデータ代表値を降水量に変換するため1/10倍しています。
Fortranで書く場合
上記コードをFortranで書いてライブラリにすると40倍くらい高速化します。
subroutine load_jmara_grib2(filepath, decoded_2d)
implicit none
character(*), intent(in) :: filepath
integer, parameter :: len_sec0 = 16, len_sec1 = 21, len_sec2 = 0
integer, parameter :: len_sec3 = 72, len_sec4 = 82, len_sec6 = 6
integer :: len_sec5, len_sec7, pos, filesize
integer, parameter :: x_n = 2560, y_n = 3360
integer(1), allocatable :: binary(:)
integer(2), allocatable :: table(:)
integer(2) :: highest_level, decoded(x_n*y_n)
integer(2), intent(out) :: decoded_2d(x_n, y_n)
inquire(file=filepath, size=filesize)
allocate(binary(filesize))
open(100, file=trim(filepath), status='old', access='direct', form='unformatted', recl=filesize)
read(100, rec=1) binary
close(100)
pos = len_sec0 + len_sec1 + len_sec2 + len_sec3 + len_sec4
len_sec5 = pack_int32(binary(pos+1), binary(pos+2), binary(pos+3), binary(pos+4))
highest_level = pack_int16(binary(pos+13), binary(pos+14))
call set_table(binary(pos+1 : pos+len_sec5), table)
pos = pos + len_sec5 + len_sec6
len_sec7 = pack_int32(binary(pos+1), binary(pos+2), binary(pos+3), binary(pos+4))
call decode_runlength(binary(pos+6 : pos+len_sec7), highest_level, decoded)
where (decoded == 0)
decoded = -10 ! define representative of level 0 (Missing Value)
elsewhere
decoded = table(decoded)
endwhere
decoded_2d = reshape(decoded, (/ x_n, y_n /))
contains
subroutine set_table(section5, table)
implicit none
integer(1), intent(in) :: section5(:)
integer(2), allocatable, intent(out) :: table(:)
integer(2) :: max_level, level
max_level = pack_int16(section5(15), section5(16))
allocate(table(max_level))
do level = 1, max_level
table(level) = pack_int16(section5(16 + 2*level), section5(17 + 2*level))
end do
end subroutine
subroutine decode_runlength(code, hi_level, decoded)
implicit none
integer(1), intent(in) :: code(:)
integer(2), intent(in) :: hi_level
integer(2), intent(out) :: decoded(0:)
integer :: i, pwr, offset, length
integer(2) :: raw, level
offset = 0
do i = 1, size(code)
raw = get_int16(code(i))
if(raw <= hi_level) then
level = raw
pwr = 0
decoded(offset) = level
offset = offset + 1
else
length = (255 - hi_level)**pwr * (raw - (hi_level + 1))
pwr = pwr + 1
decoded(offset : offset+length-1) = level
offset = offset + length
end if
end do
end subroutine
integer(2) function pack_int16(arg1, arg2)
implicit none
integer(1) :: arg1, arg2
pack_int16 = ishft( iand(int(B'11111111',2), int(arg1, 2)), 8 ) &
+ iand(int(B'11111111',2), int(arg2, 2))
end function
integer(4) function pack_int32(arg1, arg2, arg3, arg4)
implicit none
integer(1) :: arg1, arg2, arg3, arg4
pack_int32 = ishft( iand(int(B'11111111',4), int(arg1, 4)), 8*3) &
+ ishft( iand(int(B'11111111',4), int(arg2, 4)), 8*2) &
+ ishft( iand(int(B'11111111',4), int(arg3, 4)), 8 ) &
+ iand(int(B'11111111',4), int(arg4, 4))
end function
integer(2) function get_int16(arg1)
implicit none
integer(1) :: arg1
get_int16 = iand(int(B'11111111',2), int(arg1, 2))
end function
end subroutine
gfortran -shared jmara.f90 -Ofast -march=native -fopt-info -o ./jmara.dll
import ctypes
import numpy as np
def load_jmara_grib2(file):
lib = np.ctypeslib.load_library('jmara.dll', '.')
lib.load_jmara_grib2_.argtypes = [
ctypes.c_char_p,
np.ctypeslib.ndpointer(dtype=np.int16),
ctypes.c_size_t # hidden argument for character length
]
lib.load_jmara_grib2_.restype = ctypes.c_void_p
representative = np.zeros((3360, 2560), dtype=np.int16, order='C')
lib.load_jmara_grib2_(
ctypes.c_char_p(file),
representative,
ctypes.c_size_t(len(file))
)
return representative
file = rb'./SRF_GPV_Ggis1km_Prr60lv_ANAL_20180707/Z__C_RJTD_20180707000000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin'
rain = load_jmara_grib2(file) / 10
簡易描画
MatplotlibとBasemapで描画
得られた降水量の2次元配列rain
について、対応する緯度と経度を生成してMatplotlibで描画します。ここではデータが格子内の代表値であることを考慮して、座標が3次メッシュ格子の中心となるよう、経度1度の80等分、緯度40分(=1/1.5度)の80等分をそれぞれ1/2ずらして生成します。またBasemapを使用して海岸線、緯度経度とカラーバーを表示しています。
Basemapのインストール(Anaconda環境)
conda install -c conda-forge basemap basemap-data-hires
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.basemap import Basemap
lon = np.linspace(118, 150, 2560, endpoint=False) + 1/80 / 2
lat = np.linspace(48, 20, 3360, endpoint=False) - 1/80/1.5 / 2
X, Y = np.meshgrid(lon, lat)
fig, ax = plt.subplots(figsize=(10,8))
ax.set_title('JMA Radar/Raingauge-Analyzed Precipitation\n2018-07-07T00:00:00+00:00')
m = Basemap(llcrnrlat=20, urcrnrlat=48,
llcrnrlon=118, urcrnrlon=150,
resolution='l', ax=ax)
m.drawmeridians(np.arange(0, 360, 5), labels=[1, 0, 0, 1], linewidth=0)
m.drawparallels(np.arange(-90, 90, 5), labels=[1, 0, 1, 0], linewidth=0)
m.drawcoastlines(color='black')
levels = [0, 1, 5, 10, 20, 30, 40, 50, 60, 70, 80]
colors = ['whitesmoke', 'gainsboro', 'darkgray', 'cyan', 'deepskyblue',
'blue', 'lime', 'yellow', 'orange', 'red']
im = m.contourf(X, Y, rain, levels, colors=colors, extend='both')
im.cmap.set_under('white')
im.cmap.set_over('magenta')
cb = m.colorbar(im, 'right', ticks=levels, size='2.5%')
cb.set_label('mm/hr')
plt.show()
1ファイル分を展開するとnp.float64では65.6MBになります。データ代表値(10倍値)のまま返せばnp.int16の16.4MB程度で済みます。
ファイルパス生成(期間指定)
気象業務支援センターから購入したデータメディアは年ごとのディスクにDATA/yyyy/mm/dd
のような階層で入っています。いまこれを年ごとにparent_dir/yyyy/DATA/yyyy/mm/dd
のように保存した場合、指定期間内の個々のbinファイルのパスは以下のように生成できます。
import datetime
def half_hour_range(start, end):
for n in range(int((end - start).total_seconds() / 3600)):
yield start + datetime.timedelta(hours=n)
yield start + datetime.timedelta(hours=n, minutes=30)
def jmara_path(time, parent_dir='.'):
path = parent_dir.rstrip('/').rstrip('\\') + (
time.strftime('/%Y/DATA/%Y/%m/%d/')
+ 'Z__C_RJTD_'
+ time.strftime('%Y%m%d%H%M%S')
+ '_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin'
)
return path
start = datetime.datetime(2018, 7, 1, 0, 0, 0)
end = datetime.datetime(2018, 9, 1, 0, 0, 0)
parent_dir = 'G:'
filepaths = [jmara_path(t, parent_dir) for t in half_hour_range(start, end)]
['G:/2018/DATA/2018/07/01/Z__C_RJTD_20180701000000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/07/01/Z__C_RJTD_20180701003000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/07/01/Z__C_RJTD_20180701010000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/07/01/Z__C_RJTD_20180701013000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/07/01/Z__C_RJTD_20180701020000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
(...)
'G:/2018/DATA/2018/08/31/Z__C_RJTD_20180831213000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/08/31/Z__C_RJTD_20180831220000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/08/31/Z__C_RJTD_20180831223000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/08/31/Z__C_RJTD_20180831230000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin',
'G:/2018/DATA/2018/08/31/Z__C_RJTD_20180831233000_SRF_GPV_Ggis1km_Prr60lv_ANAL_grib2.bin']
-
第4節の72オクテット目に走査モードとして
0x00
が規定されており、これは解析雨量に付属するformat.txtの表現を借りると最初のデータが範囲の北西端であり、同じ緯度で西から東に並び、緯線に沿った格子点数分進むと、1格子南の西端のデータに移って、再び同じ緯度のデータが緯線に沿った格子点数分並ぶ
となります。 ↩