LoginSignup
18
11

More than 3 years have passed since last update.

Robust PCAで動体分離

Last updated at Posted at 2020-11-07

はじめに

 以前の記事で動的モード分解(Dyanmic Mode Decomposition: DMD)で、動画内の動体を分離した例を示しました。今回の記事では、Principal Component Pursuit(PCP)アルゴリズム[1]のRobust Principal Component Analysis(Robust PCA) で動体分離を試します。

環境

  • Windows 10 home edition
  • Python (3.7.6)
  • Numpy(1.18.5)
  • Opencv(4.1.1)

Robust PCA via PCP

 Robust PCAは、低ランク構造と、スパース構造に分解する手法です。動体の場合、背景が低ランク構造で、動体がスパース構造となります。今回試すのは、PCPアルゴリズムのRobust PCAです。元のデータ行列$M$を低ランク行列$L$とスパース行列$S$に分解します。

M = L + S

 PCPでは、次の拡張ラグランジアン$l$を最小化することで、$L$と$S$に分解します。

l(L, S, Y) = \| L \|_* + \lambda\|S\|_1 +<Y, M - L - S> + \frac{\mu}{2}\|M-L-s\|^2_F

 $L$は低ランク行列、$S$はスパース行列、$Y$はラグランジュ乗数行列、$\lambda$と$\mu$はPCPのパラメータです。$< * , * >$は内積です。各変数の次元は、$M,L,S,Y\in\mathbb{R}^{n_1\times n_2}$です。
 それぞれのノルムは、$||\cdot||_*$ はトレースノルム、$||\cdot|| _1$はL1ノルム、$||\cdot||^2_F$はフロベニウスノルムで、$A\in\mathbb{R}^{n_1 \times n_2}$の特異値成分を$\sigma$とすると、

\begin{align}
&||A||_* = \mathrm{tr}(\sqrt{A^\mathrm{T}A})= \sum^{\mathrm{min}(n_1,n_2)}_{i=1} \sigma_i\\
&||A||_1 = \underset{1\leq j \leq n_2}{\mathrm{max}}\sum^{n_1}_{i=1}|a_{ij}|\\
&||A||_F = \sqrt{\sum^{n_1}_{i=1}\sum^{n_2}_{j=1}|a_{ij}|^2} = \sqrt{\mathrm{tr}(A^\mathrm{T}A)} = \sqrt{\sum^{\mathrm{min}(n_1,n_2)}_{i=1}\sigma^2_i}
\end{align}

となります。添え字の$\mathrm{T}$は転置行列を表します。ノルムを述する式は、Wikipediaの記載を参考にしました。
 スパース行列$S$は、shrinkage operatorで、低ランク行列$L$は、singular value thresholding operatorで、更新します。パラメータ$\tau$のshrinkage opertor $S_\tau$は、

\begin{align}
&S_\tau(x) = \mathrm{sgn}(x) \mathrm{max}(|x|-\tau, 0)
\end{align}

と表します。singular value thresholding operator $D_\tau$は、shrinkage oprator $S_\tau$と、特異値分解を利用して、

D_{\tau}(X) = US_\tau(\Sigma)V^\mathrm{T}, \
\mathrm{where} \ X = U\Sigma V^\mathrm{T}

となります。$U, \Sigma, V$はそれぞれ、$X$の左特異ベクトル、特異値、右特異ベクトルです。
 shrinkage operator $S_\tau$と、singular value thresholding operator $D_\tau$を利用して、低ランク行列$L$とスパース行列$S$ を更新します。更新の式は、変数$X$の更新前を$X_k$、更新後を$X_{k+1}$と表記し、PCPのパラメータ$\tau, \mu$を入力して書くと、

\begin{align}
&L_{k+1} = D_{1/\mu}(M - S_k - \mu^{-1}Y_k) \\
&S_{k+1} = S_{\lambda/\mu}(M - L_{k+1} + \mu^{-1}Y_k)
\end{align}

となります。$S$記号が紛らわしいですが、随伴行列を転置行列と言い換えている以外は、文献[1]の表記に従っています。あと、shrinkage operator $S_\tau$とsingular value thresholding operator $D_\tau$のパラメータ$\mu, \lambda$の設定方法が、arXivに置かれている2009年版のPCPの論文は間違っているので、注意が必要です(2009年arXiv版は、$D_\mu, S_{\lambda\mu}$となっていますが、正しくは$D_{1/\mu},S_{\lambda/\mu}$のようです)。
 ラグランジュ乗数行列$Y$は、拡張ラグランジュ法の計算方法により、

Y_{k+1} = Y_k + \mu(M - L_{k+1} - S_{k+1})

と更新します。
 PCPのパラメータ$\lambda$と$\mu$は、文献[1]では、データ行列$M \in\mathbb{R}^{n_1 \times n_2}$として、それぞれ、

\begin{align}
&\lambda = \frac{1}{\sqrt{\mathrm{max}(n_1,n_2)}} \\
&\mu = \frac{n_1 n_2}{4\|M\|_1}
\end{align}

と設定しています。
 PCPの計算手順をまとめると、以下になります。

\begin{align}
&\mathrm{initialize:} S_0 = 0, Y_0 =0 \\
& \mathrm{while}\  \mathrm{not}\ \mathrm{converged}:\\
&\qquad L_{k+1} = D_{1/\mu}(M - S_k + \mu^{-1}Y_k) \\
&\qquad S_{k+1} = S_{\lambda/\mu}(M - L_{k+1} + \mu^{-1}Y_k) \\
&\qquad Y_{k+1} = Y_k + \mu(M - L_{k+1} - S_{k+1}) \\
&\mathrm{end} \ \mathrm{while} \\
&\mathrm{output:}L,S.
\end{align}

 収束の判定は、次のフロベニウスノルムで行います。

\|M -L - S\|_F \leq \delta\|M\|_F \quad \mathrm{with} \ \delta=10^{-7}

 以上のPCPアルゴリズムのRobust PCAをPythonでコードにします。

import numpy as np
import numpy.linalg as LA
def rpca(M, max_iter=800,p_interval=50):
    def shrinkage_operator(x, tau):
        return np.sign(x) * np.maximum((np.abs(x) - tau), np.zeros_like(x))

    def svd_thresholding_operator(X, tau):
        U, S, Vh = LA.svd(X, full_matrices=False)
        return U @ np.diag(shrinkage_operator(S, tau)) @ Vh

    i = 0
    S = np.zeros_like(M)
    Y = np.zeros_like(M)
    error = np.Inf
    tol = 1e-7 * LA.norm(M, ord="fro")
    mu = M.shape[0] * M.shape[1]/(4 * LA.norm(M, ord=1))
    mu_inv = 1/mu
    lam = 1/np.sqrt(np.max(M.shape))

    while i < max_iter:
        L = svd_thresholding_operator(M - S + mu_inv * Y, mu_inv)
        S = shrinkage_operator(M - L + mu_inv * Y, lam * mu_inv)
        Y = Y + mu * (M - L - S)
        error = LA.norm(M - L - S, ord='fro')
        if i % p_interval == 0:
            print("step:{} error:{}".format(i, error))

        if error <= tol:
            print("converted! error:{}".format(error))
            break
        i+=1
    else:
        print("Not converged")

    return L, S

動画

 前回の記事同様に、ここのdatasetのAtriumを利用します。120frameから170frameを利用しています。歩いている人と背景をRobust PCAを使って分離します。
original.png

import cv2
def prepare_frames_to_use(video_path="./atrium_video.avi",#画像のパス
                          start_frame=120,#最初のフレーム
                          end_frame=170,#最後のフレーム
                          r=4,#画像縮小パラメータ
                          ):

    #画像読み込み
    cap = cv2.VideoCapture(video_path)

    #画像の解像度、フレームレート、フレーム数取得
    wid = cap.get(cv2.CAP_PROP_FRAME_WIDTH)#横
    wid_r = int(wid/r)
    hei = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)#縦
    hei_r = int(hei/r)
    fps = cap.get(cv2.CAP_PROP_FPS)#フレームレート
    count = cap.get(cv2.CAP_PROP_FRAME_COUNT)#フレーム数
    dt = 1/fps#フレーム間秒数
    print(f"wid:{wid}", f" hei:{hei}", f" fps:{fps}", f" count:{count}", f"dt:{dt}")

    #対象フレーム抜き出し
    cat_frames = []
    cap.set(cv2.CAP_PROP_POS_FRAMES,start_frame)
    for i in range(end_frame - start_frame):
        ret, img = cap.read()
        if not ret:
            print("no image")
            break
        buf = cv2.cvtColor(cv2.resize(img,(wid_r, hei_r)),
                           cv2.COLOR_BGR2GRAY).flatten()#
        cat_frames.append(buf)

    cap.release()
    cat_frames = np.array(cat_frames).T
    return cat_frames, wid_r, hei_r, fps

Robust PCAによる動体分離

 Robust PCAを利用して、低ランク行列とスパース行列を求めます。動画のデータ行列をrpca関数に入力するだけです。

def detect_moving_object():
    #計算に利用するフレームの取得
    frames, wid, hei, fps = prepare_frames_to_use()
    #Robust pcaの実行
    L, S = rpca(frames)

    #スパース構造の動画出力
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter("sparse_video.mp4",
                             fourcc, fps, (int(wid), int(hei)))
    for i in range(S.shape[1]):
        s_buf = S[:,i]
        #輝度調整
        lum_min = np.abs(s_buf.min())
        lum_max = np.abs(s_buf.max())
        lum_w = lum_max + lum_min
        s_buf = (s_buf + lum_min)/lum_w * 255
        s_buf = cv2.cvtColor(s_buf.reshape(int(hei), -1).astype('uint8'),
                             cv2.COLOR_GRAY2BGR)
        writer.write(s_buf)
    writer.release()

結果

右図のスパース行列を動画にしたものを見ると、歩いている人がきれいに分離できていることがわかります。

参考文献

  • [1]: Candes, E. J., Li, X., Ma, Y., and Wright, J. 2011. Robust principal component analysis? J. ACM 58, 3, ` Article 11 (May 2011), 37 pages.
  • [2]: Jodoin, J.-P., Bilodeau, G.-A., Saunier, N., Urban Tracker: Multiple Object Tracking in Urban Mixed Traffic, Accepted for IEEE Winter conference on Applications of Computer Vision (WACV14), Steamboat Springs, Colorado, USA, March 24-26, 2014.
  • [3]: https://ksknw.hatenablog.com/entry/2018/11/17/182112
  • [4]: https://github.com/dganguli/robust-pca

全ソースコード

import numpy as np
import numpy.linalg as LA
def rpca(M, max_iter=800,p_interval=50):
    def shrinkage_operator(x, tau):
        return np.sign(x) * np.maximum((np.abs(x) - tau), np.zeros_like(x))

    def svd_thresholding_operator(X, tau):
        U, S, Vh = LA.svd(X, full_matrices=False)
        return U @ np.diag(shrinkage_operator(S, tau)) @ Vh

    i = 0
    S = np.zeros_like(M)
    Y = np.zeros_like(M)
    error = np.Inf
    tol = 1e-7 * LA.norm(M, ord="fro")
    mu = M.shape[0] * M.shape[1]/(4 * LA.norm(M, ord=1))
    mu_inv = 1/mu
    lam = 1/np.sqrt(np.max(M.shape))

    while i < max_iter:
        L = svd_thresholding_operator(M - S + mu_inv * Y, mu_inv)
        S = shrinkage_operator(M - L + mu_inv * Y, lam * mu_inv)
        Y = Y + mu * (M - L - S)
        error = LA.norm(M - L - S, ord='fro')
        if i % p_interval == 0:
            print("step:{} error:{}".format(i, error))

        if error <= tol:
            print("converted! error:{}".format(error))
            break
        i+=1
    else:
        print("Not converged")

    return L, S

import cv2
def prepare_frames_to_use(video_path="./atrium_video.avi",#画像のパス
                          start_frame=120,#最初のフレーム
                          end_frame=170,#最後のフレーム
                          r=4,#画像縮小パラメータ
                          ):

    #画像読み込み
    cap = cv2.VideoCapture(video_path)

    #画像の解像度、フレームレート、フレーム数取得
    wid = cap.get(cv2.CAP_PROP_FRAME_WIDTH)#横
    wid_r = int(wid/r)
    hei = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)#縦
    hei_r = int(hei/r)
    fps = cap.get(cv2.CAP_PROP_FPS)#フレームレート
    count = cap.get(cv2.CAP_PROP_FRAME_COUNT)#フレーム数
    dt = 1/fps#フレーム間秒数
    print(f"wid:{wid}", f" hei:{hei}", f" fps:{fps}", f" count:{count}", f"dt:{dt}")

    #対象フレーム抜き出し
    cat_frames = []
    cap.set(cv2.CAP_PROP_POS_FRAMES,start_frame)
    for i in range(end_frame - start_frame):
        ret, img = cap.read()
        if not ret:
            print("no image")
            break
        buf = cv2.cvtColor(cv2.resize(img,(wid_r, hei_r)),
                           cv2.COLOR_BGR2GRAY).flatten()#
        cat_frames.append(buf)

    cap.release()
    cat_frames = np.array(cat_frames).T
    return cat_frames, wid_r, hei_r, fps

def detect_moving_object():
    #計算に利用するフレームの取得
    frames, wid, hei, fps = prepare_frames_to_use()
    #Robust pcaの実行
    L, S = rpca(frames)

    #スパース構造の動画出力
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter("sparse_video.mp4",
                             fourcc, fps, (int(wid), int(hei)))
    for i in range(S.shape[1]):
        s_buf = S[:,i]
        #輝度調整
        lum_min = np.abs(s_buf.min())
        lum_max = np.abs(s_buf.max())
        lum_w = lum_max + lum_min
        s_buf = (s_buf + lum_min)/lum_w * 255
        s_buf = cv2.cvtColor(s_buf.reshape(int(hei), -1).astype('uint8'),
                             cv2.COLOR_GRAY2BGR)
        writer.write(s_buf)
    writer.release()

if __name__ == "__main__":
    detect_moving_object()
18
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
11