はじめに
この記事では画像を時系列データとしてとらえ、convolutionalLSTMを用いて将来の画像を予測したいと思います。convLSTMは(精度が出ないからなのかもしれませんが)あまり記事や実装例が少ないと思ったので拙速なコードではありますが公開しておきたいと思います。
実装メインなので、convLSTMの構造については 畳み込みLstm が詳しいかと思います。
ライブラリの読み込み
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split
import glob
from PIL import Image
from tqdm import tqdm
import zipfile
import io
使用画像
画像は以前の記事(天気図をクラスター分析してみた)で使用した衛星画像を用いました。ただ、それっぽいコンペ 【SOTA】Weather Challenge:雲画像予測 があったので規約に注意してこのデータセットを用いるのが簡便かと思います。
この記事では24時間ごとの画像5枚から次の日の画像を予測するモデルを考えます。
画像の読み込み
このコードはgoogle colabで実行していたため、画像はzipファイルで与えられます。そのため解凍する必要があります。また、元画像はサイズが非常に大きいので簡便化のために画像サイズを小さくしています。
# 縮小後の画像サイズ
height = 100
width = 180
# 読み込んだ画像を入れる配列
imgs=np.empty((0, height, width, 3))
# zipファイルを読み込んでnumpy配列にする
zip_f = zipfile.ZipFile('drive/My Drive/Colab Notebooks/convLSTM/wide.zip')
for name in tqdm(zip_f.namelist()):
with zip_f.open(name) as file:
path = io.BytesIO(file.read()) #解凍
img = Image.open(path)
img = img.resize((width, height))
img_np = np.array(img).reshape(1, height, width, 3)
imgs = np.append(imgs, img_np, axis=0)
データの整形
このままではデータがそのまま並んでいるだけなので、時系列データとして処理できる形にします。
サイズはxが(サンプル数,時系列の長さ,高さ,幅,チャンネル数)、yが(サンプル数,高さ,幅,チャンネル数)になります。
# 時系列で学習できる形式に整える
n_seq = 5
n_sample = imgs.shape[0] - n_seq
x = np.zeros((n_sample, n_seq, height, width, 3))
y = np.zeros((n_sample, height, width, 3))
for i in range(n_sample):
x[i] = imgs[i:i+n_seq]
y[i] = imgs[i+n_seq]
x, y = (x-128)/128, (y-128)/128
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.1, shuffle = False)
モデルの構築
モデルを作成します。convolution層と同じような感じですがパラメーターとしてreturn_sequences
が追加されています。これは時系列データをしてデータを返すかどうかであり、最後のconvLSTM層のみFalseにします。
(モデルを調整する過程でShortcut ConnectionやSkip Conectionを試すなど迷走していたためfunctional APIを使う形になっていますが、Sequentialで十分ですね)
from keras import layers
from keras.layers.core import Activation
from tensorflow.keras.models import Model
inputs = layers.Input(shape=(5, height, width, 3))
x0 = layers.ConvLSTM2D(filters=16, kernel_size=(3,3), padding="same", return_sequences=True, data_format="channels_last")(inputs)
x0 = layers.BatchNormalization(momentum=0.6)(x0)
x0 = layers.ConvLSTM2D(filters=16, kernel_size=(3,3), padding="same", return_sequences=True, data_format="channels_last")(x0)
x0 = layers.BatchNormalization(momentum=0.8)(x0)
x0 = layers.ConvLSTM2D(filters=3, kernel_size=(3,3), padding="same", return_sequences=False, data_format="channels_last")(x0)
out = Activation('tanh')(x0)
model = Model(inputs=inputs, outputs=out)
model.summary()
モデルの詳細はこんな感じです
Model: "functional_1"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_1 (InputLayer) [(None, 5, 100, 180, 3)] 0
_________________________________________________________________
conv_lst_m2d (ConvLSTM2D) (None, 5, 100, 180, 16) 11008
_________________________________________________________________
batch_normalization (BatchNo (None, 5, 100, 180, 16) 64
_________________________________________________________________
conv_lst_m2d_1 (ConvLSTM2D) (None, 5, 100, 180, 16) 18496
_________________________________________________________________
batch_normalization_1 (Batch (None, 5, 100, 180, 16) 64
_________________________________________________________________
conv_lst_m2d_2 (ConvLSTM2D) (None, 100, 180, 3) 2064
_________________________________________________________________
activation (Activation) (None, 100, 180, 3) 0
=================================================================
Total params: 31,696
Trainable params: 31,632
Non-trainable params: 64
それでは学習してみましょう。なお、colabだとバッチサイズを大きくするとメモリの使用量オーバーが発生するので小さくしてあります。(めっちゃ高性能マシン買ってローカルで実行できるようになりたい…)
model.compile(optimizer='rmsprop',
loss='mae', metrics=['mse'])
call_backs=[EarlyStopping(monitor="val_loss",patience=5)]
model.fit(x_train, y_train, batch_size=16, epochs=100, verbose=2, validation_split=0.2, shuffle=True, callbacks=call_backs)
実行中のlossはこのような感じになりました。
あまりいい感じではないですね…
実行結果を図に表示してみます。
# 描画
%matplotlib inline
i=15
fig, axes = plt.subplots(1, 2, figsize=(12,6))
axes[0].imshow((y_test[i]+1)/2)
axes[1].imshow((model.predict(x_test[[i]]).reshape(100,180,3)+1)/2)
この結果を見ると非常にあいまいな結果となってしまいました。これは明瞭な結果を出すよりもあいまいにしたほうが平均的に高いスコアになってしまうことが原因と考えられます。
損失関数を別のものに変えるか、もっと予測の精度が出そうな数時間後の画像の予測にすれば改善する可能性はあると思います。
発展
本記事と同じような雲画像を予測するコンペが行われたことがあり、精度向上についての多くの取り組みが参考になります。chainerでの実装ではありますが、フォーラムにサンプルコードもあるので参考になるかと思います。