導入
機械学習と深層学習を勉強していて思っていたことがあります。
それは、分類タスクについてあるクラスに特化している方がいいのか、または対象クラス全てを分類できた方がいいのかです。
仮に前者のようなモデルをスペシャリスト、後者のようなモデルをゼネラリストとします。
二つのモデルについて、どちらの方が精度が高いのか気になったため、本記事で衛星画像を用いた土地被覆分類をテーマに比較実験をします。
環境
Windows 11
python 3.11.9
QGIS 3.40
excel
データとモデル
モデルは前回の記事で精度が高かったXGBoostを採用します。
衛星画像はGoogle Earth Engienを用いて2020年のLandsat 5, 7, 8の合成画像を入手しました。
バンドも前回の記事の結果からNDVI, Tasseled Cap Wetness (TCW), Greenness (TCG)を採用しました。
検証地は山梨県一部地域、学習データは近くの静岡、滋賀、岐阜から取得しました。
各土地被覆ごとに200個を作成しました。
土地被覆は以下の通り設定しました。前回記事と異なり、学習データの確保が難しかったため、裸地を除きました。
0:針葉樹林
1:広葉樹林
2:農地・草原
3:都市
4:水域
実装
0. 必要ライブラリのimport
以下のライブラリを用います。バージョンは気にしていません。
import pandas as pd
import numpy as np
import rasterio
import geopandas as gpd
import matplotlib.pyplot as plt
import optuna
import statistics
from sklearn.feature_selection import SelectKBest, f_classif
from rasterstats import point_query
from sklearn.model_selection import StratifiedKFold, cross_val_score
from xgboost import XGBClassifier
from scipy import ndimage
from tqdm import tqdm
1.学習データ作成
機械学習モデルを構築するために使用する学習/検証データを作成します。
QGISで作成した点地物を用いて、特徴量を取得します。
point_path = "path/Training_Dataset.shp"
dat = gpd.read_file(point_path)
landsat_path = "path/Image_Gifu_2020.tif"
dat["NDVI"] = point_query(point_path, landsat_path, band = 1, interpolate = "nearest")
dat["TCW"] = point_query(point_path, landsat_path, band = 2, interpolate = "nearest")
dat["TCG"] = point_query(point_path, landsat_path, band = 3, interpolate = "nearest")
gifu = dat.dropna()
point_path = "path/Training_Dataset.shp"
dat = gpd.read_file(point_path)
landsat_path = "path/Image_Sizuoka_2020.tif"
dat["NDVI"] = point_query(point_path, landsat_path, band = 1, interpolate = "nearest")
dat["TCW"] = point_query(point_path, landsat_path, band = 2, interpolate = "nearest")
dat["TCG"] = point_query(point_path, landsat_path, band = 3, interpolate = "nearest")
sizuoka = dat.dropna()
point_path = "path/Training_Dataset.shp"
dat = gpd.read_file(point_path)
landsat_path = "path/Image_Siga_2020.tif"
dat["NDVI"] = point_query(point_path, landsat_path, band = 1, interpolate = "nearest")
dat["TCW"] = point_query(point_path, landsat_path, band = 2, interpolate = "nearest")
dat["TCG"] = point_query(point_path, landsat_path, band = 3, interpolate = "nearest")
siga = dat.dropna()
dat = pd.concat([gifu, sizuoka, siga])
output_path = "path/Train_Dataset.csv"
dat.to_csv(output_path)
学習データセットは、スペシャリスト用とゼネラリスト用に作り直します。
以下はスペシャリスト用のデータセットです。
概要としては、ある土地被覆のデータセットを200個、その他の土地被覆4種類×50個の計400個の学習データです。
##########################
# スペシャリスト用学習データ
##########################
label = 0 # 土地被覆のラベル
train_dat = pd.read_csv("path/Train_Dataset.csv")
targets = train_dat[train_dat["id"]==label]
non_targets = train_dat[train_dat["id"]!=label]
# スペシャリスト以外の土地被覆
for u in non_targets["id"].unique():
us = non_targets[non_targets['id']==u]
ds = us.sample(50)
targets = pd.concat([targets, ds])
dummy = pd.get_dummies(targets, columns=["id"]) # One Hot Encoding
dummy["id_"+str(label)] = dummy["id_"+str(label)] * 1 # bool値で返ってくる、数値にする
dummy = dummy[["NDVI", "TCG", "TCW", "id_"+str(label)]]
dummy.to_csv("path/Train_Dataset_Specialist_"+str(label)+".csv")
次にゼネラリスト用のデータセットを作成します。
ゼネラリストは各土地被覆5種類×80個の計400個としました。
以下にゼネラリスト用のデータセットを作成します。
##########################
# ゼネラリスト用学習データ
##########################
train_dat = pd.read_csv("patht/Train_Dataset.csv")
vess = pd.DataFrame()
for u in train_dat["id"].unique():
us = train_dat[train_dat['id']==u]
ds = us.sample(80)
vess = pd.concat([vess, ds])
vess.to_csv("path/Train_Dataset_Generalist.csv")
2.ハイパーパラメータのチューニング
XGBoostのハイパーパラメータのチューニングを行います。
内容は前回記事と同様です。
入力するデータを変更することで各モデルをチューニングできます。
train_dat = pd.read_csv("path/Train_Dataset.csv")
x, y = train_dat[["NDVI", "TCG", "TCW"]].values, train_dat["id"].values
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
def xgb_bayse_objective(trial):
# ゼネラリスト用のモデル
xgb = XGBClassifier(booster = "gbtree",
objective = "multi:softmax", # スペシャリストならobjective = "binary:logistic",
random_state = 42,
n_estimators = 10000)
params = {
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.1, log=True),
'min_child_weight': trial.suggest_int('min_child_weight', 1, 8),
'max_depth': trial.suggest_int('max_depth', 1, 10),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.1, 1.0),
'subsample': trial.suggest_float('subsample', 0.2, 0.8),
'reg_alpha': trial.suggest_float('reg_alpha', 0.001, 0.01, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 0.001, 0.1, log=True),
'gamma': trial.suggest_float('gamma', 0.0001, 0.01, log=True),
}
xgb.set_params(**params)
scores = cross_val_score(xgb, x, y, cv=kf, scoring="neg_log_loss", n_jobs=1)
val = scores.mean()
return val
xgb_study = optuna.create_study(direction="maximize", sampler = optuna.samplers.TPESampler(seed = 42))
xgb_study.optimize(xgb_bayse_objective, n_trials=100)
best_params = xgb_study.best_trial.params
best_score = xgb_study.best_trial.value
print("########################################################")
print(f'最適パラメータ {best_params}\nスコア {best_score}')
print("########################################################")
3.モデルの保存
この後、スペシャリストの予測では一工夫が必要なため、モデルを先に保存しておきます。
vess = pd.read_csv("/Train_Dataset.csv")
x, y = vess[["NDVI", "TCG", "TCW"]].values, vess["id"].values
# 例えば、ゼネラリストのハイパーパラメータ
params = {'learning_rate': 0.010459020071111883,
'min_child_weight': 5,
'max_depth': 1,
'colsample_bytree': 0.47488572324460826,
'subsample': 0.7853864268489944,
'reg_alpha': 0.004971495858721851,
'reg_lambda': 0.0012856325404634848,
'gamma': 0.00013311915703611744}
xgb = XGBClassifier(booster = "gbtree",
objective = "multi:softmax",
random_state = 42,
n_estimators = 10000)
xgb.set_params(**params)
xgb.fit(x, y)
xgb.save_model("/Model.json")
4.予測
ゼネラリストの予測は素直に入力すればいいですが、スペシャリストは工夫が必要です。
先にゼネラリストの予測を行います
#################
# ゼネラリスト予測
#################
data_path = "data_path"
model_path = "model_path"
tif = rasterio.open(data_path+"/Landsat.tif")
metadata = tif.meta # 土地被覆分類図出力用
ndvi = tif.read(1)
tcg = tif.read(3)
tcw = tif.read(2)
xgb = XGBClassifier(booster = "gbtree", objective = "binary:logistic", random_state = 42, n_estimators = 10000)
xgb.load_model(model_path+"/Generalist.json")
# 各モデルの特徴量を含むテーブルデータを作成
vess = []
print(f"Make Data")
for i in range(ndvi.shape[0]):
for j in range(ndvi.shape[1]):
n, tg, tw = ndvi[i, j], tcg[i, j], tcw[i, j]
vess.append([i, j, n, tg, tw])
vess = pd.DataFrame(vess, columns=["i", "j", "NDVI", "TCG", "TCW"])
vess = vess.dropna()
x = vess[["NDVI", "TCG", "TCW"]].values
vess["Pred_Generalist"] = xgb.predict(x)
# Save Result
print(f"Write")
map = np.full((ndvi.shape[0], ndvi.shape[1]), -1)
for i, j, p in zip(vess['i'], vess["j"], vess["Pred_Generalist"]):
map[i, j] = p
def save_raster(array, meta, output):
meta.update({"count":1})
new_raster = rasterio.open(output, "w", **meta)
new_raster.write(array, 1)
new_raster.close()
return
print(f"Save")
save_raster(map, metadata, data_path+"/Generalist_Pred.tif")
次にスペシャリストの予測を行います。
複数モデルによる予測であるため、各モデルの予測確率から最も高い確率の土地被覆を選択する必要があります。
以下のコードで予測します。
import pandas as pd
import numpy as np
import geopandas as gpd
import rasterio
from xgboost import XGBClassifier
from tqdm import tqdm
data_path = "path"
model_path = "path"
tif = rasterio.open(data_path+"/Landsat.tif")
metadata = tif.meta # 土地被覆分類図出力用
ndvi = tif.read(1)
tcg = tif.read(3)
tcw = tif.read(2)
model0 = XGBClassifier(booster = "gbtree", objective = "binary:logistic", random_state = 42, n_estimators = 10000)
model1 = XGBClassifier(booster = "gbtree", objective = "binary:logistic", random_state = 42, n_estimators = 10000)
model2 = XGBClassifier(booster = "gbtree", objective = "binary:logistic", random_state = 42, n_estimators = 10000)
model3 = XGBClassifier(booster = "gbtree", objective = "binary:logistic", random_state = 42, n_estimators = 10000)
model4 = XGBClassifier(booster = "gbtree", objective = "binary:logistic", random_state = 42, n_estimators = 10000)
model0.load_model(model_path+"/Specialist_0.json")
model1.load_model(model_path+"/Specialist_1.json")
model2.load_model(model_path+"/Specialist_2.json")
model3.load_model(model_path+"/Specialist_3.json")
model4.load_model(model_path+"/Specialist_4.json")
# 各モデルの特徴量を含むテーブルデータを作成
vess = []
print(f"Make Data")
for i in range(ndvi.shape[0]):
for j in range(ndvi.shape[1]):
# array.shape -> [bands, height, width]
n = ndvi[i, j]
tg = tcg[i, j]
tw = tcw[i, j]
vess.append([i, j, n, tg, tw])
vess = pd.DataFrame(vess, columns=["i", "j", "NDVI", "TCG", "TCW"])
vess = vess.dropna()
x = vess[["NDVI", "TCG", "TCW"]].values
print("Prediction")
pred0= model0.predict_proba(x)
pred1 = model1.predict_proba(x)
pred2 = model2.predict_proba(x)
pred3 = model3.predict_proba(x)
pred4= model4.predict_proba(x)
# Save Result
print(f"Write")
map = np.full((ndvi.shape[0], ndvi.shape[1]), -1)
for n, i, j in tqdm(zip(range(len(vess)), vess['i'], vess["j"]), total = len(vess)):
p0, p1, p2, p3, p4 = pred0[n, 1], pred1[n, 1], pred2[n, 1], pred3[n, 1], pred4[n, 1]
p = np.argmax([p0, p1, p2, p3, p4])
map[i, j] = p
def save_raster(array, meta, output):
meta.update({"count":1})
new_raster = rasterio.open(output, "w", **meta)
new_raster.write(array, 1)
new_raster.close()
return
print(f"Save")
save_raster(map, metadata, data_path+"/Specialist_Pred.tif")
5.後処理
内容は前回記事と同様です。
def postprocess(tif_path, output_path):
tif = rasterio.open(tif_path)
metadata = tif.meta
array = tif.read(1) # band 1の読み込み
labels = np.unique(array)
for l in labels:
mask = np.where(array == l, 1, 0) # 対象ラベルを1、それ以外を0とする
s = ndimage.generate_binary_structure(2, 2)
patch_mask, patch_num = ndimage.label(mask, s)
patch_unique, patch_size = np.unique(patch_mask, return_counts = True)
patch_arr = np.array([patch_unique, patch_size]).T
noises = patch_arr[patch_arr[:, 1] == 1]
with tqdm(total = mask.shape[0]) as pbar:
for i in range(mask.shape[0]):
pbar.set_description(f"label: {l}")
for j in range(mask.shape[1]):
# あるピクセルの場所はゴマ塩ノイズであるかを判定
# 判定がTrueなら、周辺ピクセルの最頻値に置き換え
if np.any(noises == patch_mask[i, j]):
# ラスタの四隅である場合
if (i == 0)&(j == 0):
p1, p2, p3 = array[i, j+1], array[i+1,j+1], array[i+1, j]
array[i, j] = statistics.mode([p1, p2, p3])
elif (i == array.shape[0]-1)&(j == array.shape[1]-1):
p1, p2, p3 = array[i, j-1], array[i-1,j-1], array[i-1, j]
array[i, j] = statistics.mode([p1, p2, p3])
elif (i == 0)&(j == array.shape[1]-1):
p1, p2, p3 = array[i-1, j], array[i+1,j-1], array[i+1, j]
array[i, j] = statistics.mode([p1, p2, p3])
elif (i == array.shape[0]-1)&(j == 0):
p1, p2, p3 = array[i-1, j], array[i-1,j+1], array[i, j+1]
array[i, j] = statistics.mode([p1, p2, p3])
# もし、ラスタの端のラインなら
elif (i == 0)&(j != 0)&(j != array.shape[1]-1):
p1, p2, p3, p4, p5 = array[i, j-1], array[i,j+1], array[i+1, j-1], array[i+1, j], array[i+1, j+1]
array[i, j] = statistics.mode([p1, p2, p3, p4, p5])
elif (i == array.shape[0]-1)&(j != 0)&(j != array.shape[1]-1):
p1, p2, p3, p4, p5 = array[i, j-1], array[i,j+1], array[i-1, j-1], array[i-1, j], array[i-1, j+1]
array[i, j] = statistics.mode([p1, p2, p3, p4, p5])
elif (i != 0)&(i != array.shape[0-1])&(j == 0):
p1, p2, p3, p4, p5 = array[i-1, j], array[i+1,j], array[i-1, j+1], array[i, j+1], array[i+1, j+1]
array[i, j] = statistics.mode([p1, p2, p3, p4, p5])
elif (i != 0)&(i != array.shape[0]-1)&(j == array.shape[1]-1):
p1, p2, p3, p4, p5 = array[i-1, j], array[i+1,j], array[i-1, j-1], array[i, j-1], array[i+1, j-1]
array[i, j] = statistics.mode([p1, p2, p3, p4, p5])
# それ以外の場所
else:
p1, p2, p3, p4, p5, p6, p7, p8 = array[i-1, j-1], array[i-1,j], array[i-1, j+1], array[i, j-1], array[i, j+1], array[i+1, j-1], array[i+1, j], array[i+1, j+1]
array[i, j] = statistics.mode([p1, p2, p3, p4, p5, p6, p7, p8])
pbar.update(1)
new_raster = rasterio.open(output_path, "w", **metadata)
new_raster.write(array, 1)
new_raster.close()
return
tif_path = "your_path/LULC_Map.tif"
output_path ="your_path/LULC_Map_postprocessed.tif"
postprocess(tif_path, output_path)
以下のような結果になりました。
調査地中央は大きく結果が異なります。
6.精度検証
学習データセットと同様に点地物を作成し、土地被覆のラベルを与えることで検証データセットを作成し、検証用データセットを作成します。
検証データの点数は100点×各土地被覆で計500個です。
point_path = "your_path/val_points.shp"
output_path = "your_path/Validation_Dataset_labels.xlsx"
point = gpd.read_file(point_path)
lulc_path = "your_path/Generalist_postprocessed.tif"
# interpolate='nearest'にすることで、点直下の値をそのまま取得できる
point["Generalist"] = point_query(point_path, lulc_path, band = 1, interpolate='nearest')
lulc_path = "your_path/Specialist_postprocessed.tif"
point = gpd.read_file(point_path)
point["Specialist"] = point_query(point_path, lulc_path, band = 1, interpolate='nearest')
point.to_excel(output_path)
出力したexcelファイルを用いてクロス集計表を作成します。
精度指標は前回記事と同じく User's Accuracy (UA) と Producer's Accuracy (PA) と Overall Accuracy (OA) の三つです。
それぞれの指標の概要は以下です。
UA: 土地被覆図の分類がどれだけの割合で実際を表現しているか (予測->実際)
PA: 地上の地物の再現率 (実際->予測)
OA: 全体の精度
以下、結果です。
草地・農地PAと都市部UAを除いて、スペシャリストのほうが僅かに高精度な結果となりました。
ただし、今回の実験では検証データセットを目視で作成しているため、判読に主観が含まれており、ノイズを生んでいる可能性があります。
これを解決するために、より厳密な検証データセットを作成する必要があります。
例えば、調査地内にランダムに点を落とし、その点直下の土地被覆を判定する方法。
今回は簡単な実験として行っているため、この方法でやっています。
製品の作成や論文投稿など精密さが求められる成果物の際は注意してください。
あくまでも参考までに今回の実験結果を読んでいただければと思います。
おわりに
今回は、機械学習を勉強していて疑問に思っていたことを実験しました。
ゼネラリストとスペシャリストの精度には、僅かな差がありましたが、工夫次第でさらに開くかもしれません。
また、今回の考え方は、深層学習でも扱えるものだと思います。
私は深層学習を構築する機材を持っていないため、そのうち取り組みたいと思います。
記事に不備やご質問、ご指摘がありばご連絡いただければ幸いです。