はじめに
以前の記事で動的モード分解(Dyanmic Mode Decomposition: DMD)で、動画内の動体を分離した例を示しました。今回の記事では、Principal Component Pursuit(PCP)アルゴリズム[1]のRobust Principal Component Analysis(Robust PCA) で動体分離を試します。
環境
- Windows 10 home edition
- Python (3.7.6)
- Numpy(1.18.5)
- Opencv(4.1.1)
Robust PCA via PCP
Robust PCAは、低ランク構造と、スパース構造に分解する手法です。動体の場合、背景が低ランク構造で、動体がスパース構造となります。今回試すのは、PCPアルゴリズムのRobust PCAです。元のデータ行列$M$を低ランク行列$L$とスパース行列$S$に分解します。
M = L + S
PCPでは、次の拡張ラグランジアン$l$を最小化することで、$L$と$S$に分解します。
l(L, S, Y) = \| L \|_* + \lambda\|S\|_1 +<Y, M - L - S> + \frac{\mu}{2}\|M-L-s\|^2_F
$L$は低ランク行列、$S$はスパース行列、$Y$はラグランジュ乗数行列、$\lambda$と$\mu$はPCPのパラメータです。$< * , * >$は内積です。各変数の次元は、$M,L,S,Y\in\mathbb{R}^{n_1\times n_2}$です。
それぞれのノルムは、$||\cdot||_*$ はトレースノルム、$||\cdot|| _1$はL1ノルム、$||\cdot||^2_F$はフロベニウスノルムで、$A\in\mathbb{R}^{n_1 \times n_2}$の特異値成分を$\sigma$とすると、
\begin{align}
&||A||_* = \mathrm{tr}(\sqrt{A^\mathrm{T}A})= \sum^{\mathrm{min}(n_1,n_2)}_{i=1} \sigma_i\\
&||A||_1 = \underset{1\leq j \leq n_2}{\mathrm{max}}\sum^{n_1}_{i=1}|a_{ij}|\\
&||A||_F = \sqrt{\sum^{n_1}_{i=1}\sum^{n_2}_{j=1}|a_{ij}|^2} = \sqrt{\mathrm{tr}(A^\mathrm{T}A)} = \sqrt{\sum^{\mathrm{min}(n_1,n_2)}_{i=1}\sigma^2_i}
\end{align}
となります。添え字の$\mathrm{T}$は転置行列を表します。ノルムを述する式は、Wikipediaの記載を参考にしました。
スパース行列$S$は、shrinkage operatorで、低ランク行列$L$は、singular value thresholding operatorで、更新します。パラメータ$\tau$のshrinkage opertor $S_\tau$は、
\begin{align}
&S_\tau(x) = \mathrm{sgn}(x) \mathrm{max}(|x|-\tau, 0)
\end{align}
と表します。singular value thresholding operator $D_\tau$は、shrinkage oprator $S_\tau$と、特異値分解を利用して、
D_{\tau}(X) = US_\tau(\Sigma)V^\mathrm{T}, \
\mathrm{where} \ X = U\Sigma V^\mathrm{T}
となります。$U, \Sigma, V$はそれぞれ、$X$の左特異ベクトル、特異値、右特異ベクトルです。
shrinkage operator $S_\tau$と、singular value thresholding operator $D_\tau$を利用して、低ランク行列$L$とスパース行列$S$ を更新します。更新の式は、変数$X$の更新前を$X_k$、更新後を$X_{k+1}$と表記し、PCPのパラメータ$\tau, \mu$を入力して書くと、
\begin{align}
&L_{k+1} = D_{1/\mu}(M - S_k - \mu^{-1}Y_k) \\
&S_{k+1} = S_{\lambda/\mu}(M - L_{k+1} + \mu^{-1}Y_k)
\end{align}
となります。$S$記号が紛らわしいですが、随伴行列を転置行列と言い換えている以外は、文献[1]の表記に従っています。あと、shrinkage operator $S_\tau$とsingular value thresholding operator $D_\tau$のパラメータ$\mu, \lambda$の設定方法が、arXivに置かれている2009年版のPCPの論文は間違っているので、注意が必要です(2009年arXiv版は、$D_\mu, S_{\lambda\mu}$となっていますが、正しくは$D_{1/\mu},S_{\lambda/\mu}$のようです)。
ラグランジュ乗数行列$Y$は、拡張ラグランジュ法の計算方法により、
Y_{k+1} = Y_k + \mu(M - L_{k+1} - S_{k+1})
と更新します。
PCPのパラメータ$\lambda$と$\mu$は、文献[1]では、データ行列$M \in\mathbb{R}^{n_1 \times n_2}$として、それぞれ、
\begin{align}
&\lambda = \frac{1}{\sqrt{\mathrm{max}(n_1,n_2)}} \\
&\mu = \frac{n_1 n_2}{4\|M\|_1}
\end{align}
と設定しています。
PCPの計算手順をまとめると、以下になります。
\begin{align}
&\mathrm{initialize:} S_0 = 0, Y_0 =0 \\
& \mathrm{while}\ \mathrm{not}\ \mathrm{converged}:\\
&\qquad L_{k+1} = D_{1/\mu}(M - S_k + \mu^{-1}Y_k) \\
&\qquad S_{k+1} = S_{\lambda/\mu}(M - L_{k+1} + \mu^{-1}Y_k) \\
&\qquad Y_{k+1} = Y_k + \mu(M - L_{k+1} - S_{k+1}) \\
&\mathrm{end} \ \mathrm{while} \\
&\mathrm{output:}L,S.
\end{align}
収束の判定は、次のフロベニウスノルムで行います。
\|M -L - S\|_F \leq \delta\|M\|_F \quad \mathrm{with} \ \delta=10^{-7}
以上のPCPアルゴリズムのRobust PCAをPythonでコードにします。
import numpy as np
import numpy.linalg as LA
def rpca(M, max_iter=800,p_interval=50):
def shrinkage_operator(x, tau):
return np.sign(x) * np.maximum((np.abs(x) - tau), np.zeros_like(x))
def svd_thresholding_operator(X, tau):
U, S, Vh = LA.svd(X, full_matrices=False)
return U @ np.diag(shrinkage_operator(S, tau)) @ Vh
i = 0
S = np.zeros_like(M)
Y = np.zeros_like(M)
error = np.Inf
tol = 1e-7 * LA.norm(M, ord="fro")
mu = M.shape[0] * M.shape[1]/(4 * LA.norm(M, ord=1))
mu_inv = 1/mu
lam = 1/np.sqrt(np.max(M.shape))
while i < max_iter:
L = svd_thresholding_operator(M - S + mu_inv * Y, mu_inv)
S = shrinkage_operator(M - L + mu_inv * Y, lam * mu_inv)
Y = Y + mu * (M - L - S)
error = LA.norm(M - L - S, ord='fro')
if i % p_interval == 0:
print("step:{} error:{}".format(i, error))
if error <= tol:
print("converted! error:{}".format(error))
break
i+=1
else:
print("Not converged")
return L, S
動画
前回の記事同様に、ここのdatasetのAtriumを利用します。120frameから170frameを利用しています。歩いている人と背景をRobust PCAを使って分離します。
import cv2
def prepare_frames_to_use(video_path="./atrium_video.avi",#画像のパス
start_frame=120,#最初のフレーム
end_frame=170,#最後のフレーム
r=4,#画像縮小パラメータ
):
#画像読み込み
cap = cv2.VideoCapture(video_path)
#画像の解像度、フレームレート、フレーム数取得
wid = cap.get(cv2.CAP_PROP_FRAME_WIDTH)#横
wid_r = int(wid/r)
hei = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)#縦
hei_r = int(hei/r)
fps = cap.get(cv2.CAP_PROP_FPS)#フレームレート
count = cap.get(cv2.CAP_PROP_FRAME_COUNT)#フレーム数
dt = 1/fps#フレーム間秒数
print(f"wid:{wid}", f" hei:{hei}", f" fps:{fps}", f" count:{count}", f"dt:{dt}")
#対象フレーム抜き出し
cat_frames = []
cap.set(cv2.CAP_PROP_POS_FRAMES,start_frame)
for i in range(end_frame - start_frame):
ret, img = cap.read()
if not ret:
print("no image")
break
buf = cv2.cvtColor(cv2.resize(img,(wid_r, hei_r)),
cv2.COLOR_BGR2GRAY).flatten()#
cat_frames.append(buf)
cap.release()
cat_frames = np.array(cat_frames).T
return cat_frames, wid_r, hei_r, fps
Robust PCAによる動体分離
Robust PCAを利用して、低ランク行列とスパース行列を求めます。動画のデータ行列をrpca関数に入力するだけです。
def detect_moving_object():
#計算に利用するフレームの取得
frames, wid, hei, fps = prepare_frames_to_use()
#Robust pcaの実行
L, S = rpca(frames)
#スパース構造の動画出力
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter("sparse_video.mp4",
fourcc, fps, (int(wid), int(hei)))
for i in range(S.shape[1]):
s_buf = S[:,i]
#輝度調整
lum_min = np.abs(s_buf.min())
lum_max = np.abs(s_buf.max())
lum_w = lum_max + lum_min
s_buf = (s_buf + lum_min)/lum_w * 255
s_buf = cv2.cvtColor(s_buf.reshape(int(hei), -1).astype('uint8'),
cv2.COLOR_GRAY2BGR)
writer.write(s_buf)
writer.release()
結果
右図のスパース行列を動画にしたものを見ると、歩いている人がきれいに分離できていることがわかります。
参考文献
- [1]: Candes, E. J., Li, X., Ma, Y., and Wright, J. 2011. Robust principal component analysis? J. ACM 58, 3, ` Article 11 (May 2011), 37 pages.
- [2]: Jodoin, J.-P., Bilodeau, G.-A., Saunier, N., Urban Tracker: Multiple Object Tracking in Urban Mixed Traffic, Accepted for IEEE Winter conference on Applications of Computer Vision (WACV14), Steamboat Springs, Colorado, USA, March 24-26, 2014.
全ソースコード
import numpy as np
import numpy.linalg as LA
def rpca(M, max_iter=800,p_interval=50):
def shrinkage_operator(x, tau):
return np.sign(x) * np.maximum((np.abs(x) - tau), np.zeros_like(x))
def svd_thresholding_operator(X, tau):
U, S, Vh = LA.svd(X, full_matrices=False)
return U @ np.diag(shrinkage_operator(S, tau)) @ Vh
i = 0
S = np.zeros_like(M)
Y = np.zeros_like(M)
error = np.Inf
tol = 1e-7 * LA.norm(M, ord="fro")
mu = M.shape[0] * M.shape[1]/(4 * LA.norm(M, ord=1))
mu_inv = 1/mu
lam = 1/np.sqrt(np.max(M.shape))
while i < max_iter:
L = svd_thresholding_operator(M - S + mu_inv * Y, mu_inv)
S = shrinkage_operator(M - L + mu_inv * Y, lam * mu_inv)
Y = Y + mu * (M - L - S)
error = LA.norm(M - L - S, ord='fro')
if i % p_interval == 0:
print("step:{} error:{}".format(i, error))
if error <= tol:
print("converted! error:{}".format(error))
break
i+=1
else:
print("Not converged")
return L, S
import cv2
def prepare_frames_to_use(video_path="./atrium_video.avi",#画像のパス
start_frame=120,#最初のフレーム
end_frame=170,#最後のフレーム
r=4,#画像縮小パラメータ
):
#画像読み込み
cap = cv2.VideoCapture(video_path)
#画像の解像度、フレームレート、フレーム数取得
wid = cap.get(cv2.CAP_PROP_FRAME_WIDTH)#横
wid_r = int(wid/r)
hei = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)#縦
hei_r = int(hei/r)
fps = cap.get(cv2.CAP_PROP_FPS)#フレームレート
count = cap.get(cv2.CAP_PROP_FRAME_COUNT)#フレーム数
dt = 1/fps#フレーム間秒数
print(f"wid:{wid}", f" hei:{hei}", f" fps:{fps}", f" count:{count}", f"dt:{dt}")
#対象フレーム抜き出し
cat_frames = []
cap.set(cv2.CAP_PROP_POS_FRAMES,start_frame)
for i in range(end_frame - start_frame):
ret, img = cap.read()
if not ret:
print("no image")
break
buf = cv2.cvtColor(cv2.resize(img,(wid_r, hei_r)),
cv2.COLOR_BGR2GRAY).flatten()#
cat_frames.append(buf)
cap.release()
cat_frames = np.array(cat_frames).T
return cat_frames, wid_r, hei_r, fps
def detect_moving_object():
#計算に利用するフレームの取得
frames, wid, hei, fps = prepare_frames_to_use()
#Robust pcaの実行
L, S = rpca(frames)
#スパース構造の動画出力
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter("sparse_video.mp4",
fourcc, fps, (int(wid), int(hei)))
for i in range(S.shape[1]):
s_buf = S[:,i]
#輝度調整
lum_min = np.abs(s_buf.min())
lum_max = np.abs(s_buf.max())
lum_w = lum_max + lum_min
s_buf = (s_buf + lum_min)/lum_w * 255
s_buf = cv2.cvtColor(s_buf.reshape(int(hei), -1).astype('uint8'),
cv2.COLOR_GRAY2BGR)
writer.write(s_buf)
writer.release()
if __name__ == "__main__":
detect_moving_object()