前回記事の補足
前回記事で書いたモデルのうち2つ目のPoseヒートマップの時系列予測モデルについて、説明が雑になっていたので、自分の中での整理を含めて追加メモを行います。また一部変更した箇所もあるので合わせてメモ。
前回記事:https://qiita.com/ISakony/items/bce3a84efb3bcf7f0ec0
モデル説明
②時系列予測を行うモデルは、過去フレーム4枚から未来フレーム1枚を予測するモデルです。入力データはポーズ推定のヒートマップであり、出力はチャンネルごとに分かれているヒートマップを1チャンネルにまとめたものになります。
前回は出力が1チャンネルにまとまってしまうため、この出力をそのまま次のフレーム予測に使うことはできず、一度ヒートマップ未来フレームから画像の未来フレームを生成し、生成された画像をポーズ推定して次のヒートマップを得る方法を取っていましたが、これだと生成画像の綺麗さでポーズ推定の精度が変化してしまう問題がありました。そこで今回は1チャンネルにまとまった未来ヒートマップを身体パーツごとにチャンネル分けされたヒートマップに変換する処理を追記し、Poseヒートマップの時系列予測モデルだけで時系列処理が回せるように変更しました。
Relation処理部説明
Relationネットのような処理部に関しては、最初にマップごとに出力値をまとめた後、4次元ベクトルに変換し、途中の2個ずつ取り出してまとめる処理時は2次元ベクトルとした。これは処理が重いので、できるだけ次元数を下げたかったこと、座標情報を扱っていると考えれば、x軸、y軸を表せれば良いので、2次元あれば十分な表現力が得られるのではないかと予想したためです。結果としては十分学習データにフィッティングできる表現力はあったと考えています。
mlp1_1 = chainer.ChainList(
*[L.Linear(None,46*46)
for i in range(19)]),
mlp1_2 = chainer.ChainList(
*[L.Linear(None,4) #過去フレーム2枚分の座標として4次元
for i in range(19)]),
mlp2_1 = chainer.ChainList(
*[L.Linear(None,4)
for i in range(19*19)]),
mlp2_2 = chainer.ChainList(
*[L.Linear(None,2) #予測フレーム1枚分の座標として2次元
for i in range(19*19)]),
mlp3_1 = L.Linear(None,46*46),
mlp3_2 = L.Linear(None,46*46), #19パーツ分をまとめて46*46マップとする
入力データから出力データまでの行列のShape(先頭の1はバッチ数)
入力ヒートマップ:(1,19,46,46)
※チャンネル数19が各身体パーツの存在ヒートマップチャンネルに対応 マップサイズは46×46
出力ヒートマップ:(1,1,46,46)
※出力は1チャンネルに全ての身体パーツのヒートマップ分布が集まっている。
これは最後の変換で(1,19,46,46)を出力するための線形計算処理では使用GPUのメモリーをオーバーしてしまったため。
→改善出力ヒートマップ:(1,19,46,46)
※線形計算処理ではメモリーオーバーしてしまったので、畳み込み処理で1チャンネルを19チャンネルに変換するようにした。Network in Networkモデルで19チャンネルに拡大。
(Convolution以外での19チャンネル拡大方法はLinearで拡大した場合は、結合数が多すぎてメモリオーバーをし、最後のMLP処理時に19パーツをそれぞれ別々に処理し予測させた場合は、(1,19*2)を(19,2)として処理しましたが、ポーズ変化を上手く予測できない出力となりました。後述のAggregator部は19パーツのRelation処理結果をまとめてMLPで処理しなければ関連を推測することができないと予想されます。)
conv1 = L.Convolution2D(in_channels=1, out_channels=19, ksize=3, stride=1, pad=1),
conv1_a = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv1_b = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv2 = L.Convolution2D(in_channels=19, out_channels=19, ksize=3, stride=1, pad=1),
conv2_a = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv2_b = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv3 = L.Convolution2D(in_channels=19, out_channels=19, ksize=3, stride=1, pad=1),
conv3_a = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv3_b = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
#処理の解説
処理順序としては以下
def __call__(self, x1,x2,x3,x4):#1フレーム先の予測のために過去フレーム4枚を使用
#print(x1.shape)
h1 = self.Mix_pofe(x1,x2,x3) # 1~3枚目のフレームのPose推定結果を各チャンネルごとにconcat
h2 = self.Mix_pofe(x2,x3,x4) # 2~4枚目のフレームのPose推定結果を各チャンネルごとにconcat
#print(len(h1))
h1 = self.Interaction(h1) #h1の19パーツ(チャンネル)を2パーツずつ取り出してMLP処理:出力(1,2*19)
h2 = self.Interaction(h2) #h2の19パーツ(チャンネル)を2パーツずつ取り出してMLP処理:出力(1,2*19)
h = self.Aggregator(h1,h2) # (1,1,46,46),(1,19,46,46)
return h
各関数の処理は以下
def Mix_pofe(self,b_x,af_x,aff_x):
b_x_list = F.split_axis(b_x, 19, axis=0) #[(1,320,320),......]
af_x_list = F.split_axis(af_x, 19, axis=0) #[(1,320,320),......]
aff_x_list = F.split_axis(aff_x, 19, axis=0)
afb_x = []
for i in range(len(b_x_list)):
aff_af = F.concat((af_x_list[i],aff_x_list[i]),axis=1)
afb_x.append(F.expand_dims(F.concat((b_x_list[i],aff_af),axis=1),axis=0)) #(1,3,320,320)
return afb_x
def Interaction(self,afb_x):
F_mlp = []
for i in range(len(afb_x)):
img = F.resize_images(afb_x[i],(46,46))
pred = F.leaky_relu(self.mlp1_1[i](img))#(1,3,46,46) >> (1,46*46)
pred = F.leaky_relu(self.mlp1_2[i](pred))#(1,24*24) >> (1,4)
F_mlp.append(pred)
self_inter = []
inter = 0
for i in range(len(F_mlp)):
for ii in range(len(F_mlp)):
#print(i)
#print(ii)
if i == ii:
h = F.concat((F_mlp[i],F_mlp[ii]),axis=1)
h = F.leaky_relu(self.mlp2_1[i * 19 + ii](h)) #(1,4)
#print("chack",h)
self_inter.append(self.mlp2_2[i * 19 + ii](h)) #(1,2)
elif i != ii:
h = F.concat((F_mlp[i],F_mlp[ii]),axis=1)
h = F.leaky_relu(self.mlp2_1[i * 19 + ii](h)) #(1,4)
#print("test",h)
inter += F.leaky_relu(self.mlp2_2[i * 19 + ii](h))#(1,2)
for iii in range(len(self_inter)):
self_inter[iii] += inter
self_inter_add_inter = F.stack(self_inter, axis=1)
return self_inter_add_inter #(1,19*2)
def Aggregator(self,si_add1,si_add2):
h = F.concat((si_add1,si_add2),axis=1)#(19,2*2)
h = F.leaky_relu(self.mlp3_1(h)) #(1,46*46)
h = F.leaky_relu(self.mlp3_2(h)) #(1,46*46)
h = F.reshape(h,(1,1,46,46))
h2 = F.leaky_relu(self.conv1(h))
h2 = F.leaky_relu(self.conv1_a(h2))
h2 = F.leaky_relu(self.conv1_b(h2))
h3 = F.leaky_relu(self.conv2(h2))
h3 = F.leaky_relu(self.conv2_a(h3))
h3 = F.leaky_relu(self.conv2_b(h3))
h4 = F.leaky_relu(self.conv2(h3 + h2))
h4 = F.leaky_relu(self.conv2_a(h4))
h4 = F.leaky_relu(self.conv2_b(h4))
return h,h4
学習
学習時の処理は以下のようになっています。改善前の1チャンネルにまとめられた時の出力の予測誤差と19チャンネルに拡大したときの予測誤差の2つの損失を学習に用いています。
予測誤差はそれぞれ過去4枚(実データ)から未来1枚を予測して損失を取っています。
with chainer.using_config('train', 'True'):
result = model(Preds[0],Preds[1],Preds[2],Preds[3]) #(1,1,46,46)の出力と(1,19,46,46)の出力
loss1 = F.mean_squared_error(result[0],label1) #1チャンネルにまとめた時の損失
loss2 = F.mean_squared_error(result[1],label2) #19チャンネルに拡大した後の損失
loss = loss1 + loss2
学習結果
学習結果は以下
学習後の出力データ(それぞれ過去フレーム4枚から次の1枚を予測)
過去4枚が実データで未来1枚を予測する学習だと、未来動作を見てとれる結果を得ることができていると思われます。
予測
0フレームから4枚を取得し、以降のフレームを予測出力させてみました。
学習で行った1枚から先の予測については、だんだんとボケたヒートマップになっており、さらに生成していくと8フレーム程生成した後、ポーズがほぼ変化しなくなってしまいました。これは、より未来のフレームを予測しようとすると、だんだん過去に予測生成したフレームを次の予測に使っていくため、誤差が大きくなっていくのだと予想されます。
結局のところデータ数も少ないので、学習データに対してかなりオーバーフィッティングを起こしていると考えられます。
感想
注目物体間の関係性を考慮した予測モデルには興味がありましたので、何かしらやってみる過程の元勉強できたのは良かったです。もし物体間の物理表現を獲得できるのであれば、物理シミュレーターから出力されたデータを学習して、その結果を元に当てはまる数式を作り出すような処理が作れれば、人が物理理論を構築する過程が再現できたりするのでしょうか?などと妄想を抱いたりしてしまいます。
実際に汎化性能の出るように学習を行うには、データを集めたり、評価を行ったりが非常に大変になるかと思いますので、実用面で扱うにはもう少し理論的にはっきりしているモデルを利用したほうが良いかなとは思いました。
予測モデルのネットワーク部コードまとめ
class MEVIN(chainer.Chain):
insize = 320
def __init__(self):
super(MEVIN, self).__init__(
mlp1_1 = chainer.ChainList(
*[L.Linear(None,46*46)
for i in range(19)]),
mlp1_2 = chainer.ChainList(
*[L.Linear(None,4)
for i in range(19)]),
mlp2_1 = chainer.ChainList(
*[L.Linear(None,4)
for i in range(19*19)]),
mlp2_2 = chainer.ChainList(
*[L.Linear(None,2)
for i in range(19*19)]),
mlp3_1 = L.Linear(None,46*46),
mlp3_2 = L.Linear(None,46*46),
conv1 = L.Convolution2D(in_channels=1, out_channels=19, ksize=3, stride=1, pad=1),
conv1_a = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv1_b = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv2 = L.Convolution2D(in_channels=19, out_channels=19, ksize=3, stride=1, pad=1),
conv2_a = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv2_b = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv3 = L.Convolution2D(in_channels=19, out_channels=19, ksize=3, stride=1, pad=1),
conv3_a = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
conv3_b = L.Convolution2D(in_channels=19, out_channels=19, ksize=1, stride=1),
)
def __call__(self, x1,x2,x3,x4):#Pred = [x1,x2,x3,x4]
#print(x1.shape)
h1 = self.Mix_pofe(x1,x2,x3)
h2 = self.Mix_pofe(x2,x3,x4)
#print(len(h1))
h1 = self.Interaction(h1)
h2 = self.Interaction(h2)
h = self.Aggregator(h1,h2) # (1,1,42,42)
return h
def Mix_pofe(self,b_x,af_x,aff_x):
b_x_list = F.split_axis(b_x, 19, axis=0) #[(1,320,320),......]
af_x_list = F.split_axis(af_x, 19, axis=0) #[(1,320,320),......]
aff_x_list = F.split_axis(aff_x, 19, axis=0)
afb_x = []
for i in range(len(b_x_list)):
aff_af = F.concat((af_x_list[i],aff_x_list[i]),axis=1)
afb_x.append(F.expand_dims(F.concat((b_x_list[i],aff_af),axis=1),axis=0)) #(1,3,320,320)
return afb_x
def Interaction(self,afb_x):
F_mlp = []
for i in range(len(afb_x)):
img = F.resize_images(afb_x[i],(46,46))
pred = F.leaky_relu(self.mlp1_1[i](img))#(1,3,46,46) >> (1,46*46)
pred = F.leaky_relu(self.mlp1_2[i](pred))#(1,24*24) >> (1,4)
F_mlp.append(pred)
self_inter = []
inter = 0
for i in range(len(F_mlp)):
for ii in range(len(F_mlp)):
#print(i)
#print(ii)
if i == ii:
h = F.concat((F_mlp[i],F_mlp[ii]),axis=1)
h = F.leaky_relu(self.mlp2_1[i * 19 + ii](h)) #(1,4)
#print("chack",h)
self_inter.append(self.mlp2_2[i * 19 + ii](h)) #(1,2)
elif i != ii:
h = F.concat((F_mlp[i],F_mlp[ii]),axis=1)
h = F.leaky_relu(self.mlp2_1[i * 19 + ii](h)) #(1,4)
#print("test",h)
inter += F.leaky_relu(self.mlp2_2[i * 19 + ii](h))#(1,2)
for iii in range(len(self_inter)):
self_inter[iii] += inter
self_inter_add_inter = F.stack(self_inter, axis=1)
return self_inter_add_inter #(1,19*2)
def Aggregator(self,si_add1,si_add2):
h = F.concat((si_add1,si_add2),axis=1)#(19,2*2)
h = F.leaky_relu(self.mlp3_1(h)) #(1,46*46)
h = F.leaky_relu(self.mlp3_2(h)) #(1,46*46)
h = F.reshape(h,(1,1,46,46))
h2 = F.leaky_relu(self.conv1(h))
h2 = F.leaky_relu(self.conv1_a(h2))
h2 = F.leaky_relu(self.conv1_b(h2))
h3 = F.leaky_relu(self.conv2(h2))
h3 = F.leaky_relu(self.conv2_a(h3))
h3 = F.leaky_relu(self.conv2_b(h3))
h4 = F.leaky_relu(self.conv2(h3 + h2))
h4 = F.leaky_relu(self.conv2_a(h4))
h4 = F.leaky_relu(self.conv2_b(h4))
return h,h4
学習時および予測時のコード
参考