はじめに
KaggleにHuman Activity Recognition with Smartphonesというデータセットがあったので、CNNを使って分類してみました。ざっくりいうと、被験者が身につけているスマートフォンから加速度データを取得し、被験者の動作が「歩く、階段を上がる、階段を下りる、座る、立つ、横たわる」のどれか分類するというタスクになります。
できたこと
1次元のCNNで、分類問題を解きました。テストデータに対する正解率は95.3%でした。ネットワークは適当に組んだので、この結果は意外でした…。過学習しているかもしれません…。
test accruracy: 0.95351
Kaggle Notebookを公開しています。間違いなどがありましたらごめんなさい。
HAR_EDA
HAR_baseline_CNN_acc0.953
データセットについて
- 被験者(ボランティア)は30人で、年齢は19歳から48歳(データセットには年齢情報なし)
- 腰にスマートフォンを装着し、内部の加速度計とジャイロセンサーを利用して、3軸の加速度と3軸の角速度を50Hzで記録したらしい
- ラベル付けのためにビデオ撮影も行われたらしい
- 動作の正解ラベルは以下の6種類
- WALKING(歩行)
- WALKING_UPSTAIRS(階段上り)
- WALKING_DOWNSTAIRS(階段下り)
- SITTING(座る)
- STANDING(立つ)
- LAYING(横たわる)
- センサー信号は、ノイズフィルターにて前処理済み
- 属性情報 ※サイトには各列の詳しい意味に関する記載は見当たらなかった…
- 加速度計による3軸加速度データ(総加速度)および推定された身体加速度
- ジャイロスコープによる3軸角速度データ
- 時間領域および周波数領域変数を含む561特徴量のベクトル
- 活動ラベル
- 実験を行った被験者の識別子
ライブラリをインポート
# general
import numpy as np
import os
import pandas as pd
import sys
from tqdm.notebook import tqdm
# sklearn
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
# torch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchinfo import summary
# matplotlib
import matplotlib.pyplot as plt
# %matplotlib inline
import japanize_matplotlib
# warnings
import warnings
warnings.filterwarnings('ignore')
# GPUチェック
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
cuda:0
# 乱数固定
SEED = 123
def seed_everything(seed=42):
#random.seed(seed)
#np.random.seed(seed)
# np.random.RandomState(seed)
# os.environ['PYTHONHASHSEED'] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.daterministic = True
torch.use_deterministic_algorithms = True
# torch.backends.cudnn.benchmark = False
seed_everything(SEED)
Load data
train = pd.read_csv('/kaggle/input/human-activity-recognition-with-smartphones/train.csv', )
test = pd.read_csv('/kaggle/input/human-activity-recognition-with-smartphones/test.csv')
EDA
訓練データに対して、Activity列の割合を円グラフにする。
plt.figure(figsize=(10, 6))
plt.pie(train['Activity'].value_counts(), labels=train['Activity'].value_counts().index, autopct='%1.1f%%')
plt.title('[Train] Activity')
plt.show()
テストデータに対して、Activity列の割合を円グラフにする。
plt.figure(figsize=(10, 6))
plt.pie(test['Activity'].value_counts(), labels=test['Activity'].value_counts().index, autopct='%1.1f%%')
plt.title('[Test] Activity')
plt.show()
Activityの偏りはなさそう。続いて、subject(被験者)毎のデータ頻度をヒストグラムで確認する。
col = 'subject'
plt.figure(figsize=(10, 6))
plt.hist([train[col], test[col]], stacked=True, bins=30, rwidth=0.8, label=['Train', 'Test'], range=(0.5, 30.5))
plt.title(f'{col}')
plt.xlabel(col)
plt.ylabel('Frequency')
plt.xticks(list(range(1, 31, 1)))
plt.legend()
plt.show()
訓練データは21人分、テストデータは9人分。被験者ごとにデータ数は多少ばらつきがあるが、不均衡とまでは言えないと思う。次に、目的変数(Activity)と説明変数の関係を可視化する。
for activity in train['Activity'].unique():
print(f"Activity: {activity}")
train_act = train[train['Activity']==activity].reset_index(drop=True)
plt.figure(figsize=(10, 6))
plt.plot(train_act.iloc[:, :-2].values.T, alpha=0.1, color='skyblue')
plt.title(f"{activity}")
plt.xlabel('columns index')
plt.ylabel('values')
plt.show()
横軸は時間ではないことに注意。ある瞬間にセンサーから得られたデータ+時間領域および周波数領域の変数を計算したものが横軸になる。波形に特徴がみられそうな箇所は以下の通り
- 300~350列あたり(fBodyAcc-bandsEnergy()-xx,xx)
- 382~423列あたり(fBodyAccJerk-bandsEnergy()-xx,xx)
- 461~502列あたり(fBodyGyro-bandsEnergy()-xx,xx)
- 0~300列あたりは、STANDINGやSITTING、LAYINGは下限に張り付いているデータが多くみられる。止まっているからか?
前処理
まずはtrainとtestを結合する。
# Concat: 結合
data = pd.concat([train, test], axis=0)
show_df(data)
Activity列を数値に変換したものを追加する。
# labelencoding: ラベルエンコーディング
le = LabelEncoder()
le.fit(data['Activity'])
data['Activity_label'] = le.transform(data['Activity'])
print(le.classes_)
['LAYING' 'SITTING' 'STANDING' 'WALKING' 'WALKING_DOWNSTAIRS' 'WALKING_UPSTAIRS']
データを分割する。
# Split train and test
train_preprocess = data[:len(train)]
test_preprocess = data[len(train):]
さらに訓練データを訓練用と検証用に分割する。今回、本当の意味での未知のデータは「被験者以外の動作データ」になるはずので、subject
列(被験者の識別子)をキーにデータを分割する。train.csv
は21人の被験者から得られたデータなので、そのうち4人分(2割弱)を検証用とする。
# Split tr and val by 'subject'
val_subject = [3, 7, 15, 26] # 被験者21人のうち、subjectが3, 7 , 15, 26の4人(2割弱)を検証用にする。数字は適当。
tr_preprocess = train_preprocess[~train_preprocess['subject'].isin(val_subject)]
val_preprocess = train_preprocess[train_preprocess['subject'].isin(val_subject)]
説明変数と目的変数に分割する。
X_tr = tr_preprocess.drop(columns=['subject', 'Activity', 'Activity_label'])
X_val = val_preprocess.drop(columns=['subject', 'Activity', 'Activity_label'])
X_test = test_preprocess.drop(columns=['subject', 'Activity', 'Activity_label'])
y_tr = tr_preprocess['Activity_label']
y_val = val_preprocess['Activity_label']
y_test = test_preprocess['Activity_label']
今回はPyTorchを使うので、データをTensor形式に変換する。
X_tr_tensor = torch.tensor(X_tr.values, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
y_tr_tensor = torch.tensor(y_tr.values, dtype=torch.int64)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.int64)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.int64)
データを変形する。今回、nn.Conv1d
を使うので、入力は(バッチサイズ, 入力チャネル数, シーケンス長)
にしなければならない。入力チャネル数は1とし, シーケンス長は特徴量と同じ561とする。上の方で描いた波形を1次元の画像に見立てたイメージ。入力チャネル数とシーケンス長は逆にしても計算は可能な気がするが、畳み込み計算はシーケンス長方向に行うので、シーケンス長を561とした。
# Reshape (n_sample, in_channel, sequence_length)
X_tr_tensor = X_tr_tensor.view(X_tr_tensor.shape[0], 1, -1)
X_val_tensor = X_val_tensor.view(X_val_tensor.shape[0], 1, -1)
X_test_tensor = X_test_tensor.view(X_test_tensor.shape[0], 1, -1)
print(X_tr_tensor.shape, X_val_tensor.shape, X_test_tensor.shape)
torch.Size([5983, 1, 561]) torch.Size([1369, 1, 561]) torch.Size([2947, 1, 561])
データローダーを作成する。バッチサイズは64とする。
# データローダー作成: Create dataloader
batch_size = 64
tr_dataset = TensorDataset(X_tr_tensor, y_tr_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
tr_loader = DataLoader(tr_dataset, batch_size=batch_size, shuffle=True) # train
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) # validation
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # test
訓練&評価用の関数を作成
まずは、損失関数を計算するための関数を作成する。
def eval_loss(loader, device, net, criterion):
"""
損失関数を計算するための関数
Parameters
----------
loader: torch.utils.data.dataloader.DataLoader
データローダー
device: torch.device
テンソルをどのデバイスに割り当てるか(cuda:0, cpu)
net:
学習ネットワーク
criterion:
損失関数(交差エントロピー誤差など)
Return
---------
loss:
損失値
"""
# データローダーから最初の1セットを取得する
for images, labels in loader:
break
# デバイスの割り当て
inputs = images.to(device)
labels = labels.to(device)
# 予測計算
outputs = net(inputs)
# 損失計算
loss = criterion(outputs, labels)
return loss
次に、訓練用の関数を作成する。
def fit(net, optimizer, criterion, num_epochs, train_loader, val_loader, device, history):
"""
分類問題に対する訓練用の関数。base_epochsを設定することで、追加訓練ができるようになっている。
Parameters
-------------
net:
訓練ネットワーク
optimizer:
最適化手法(SGD, Adamなど)
criterion:
損失関数(交差エントロピー誤差など)
num_epochs: int
エポック数
train_loader: torch.utils.data.dataloader.DataLoader
訓練用のデータローダー
val_loader: torch.utils.data.dataloader.DataLoader
検証用のデータローダー
device:
テンソルを割り当てるデバイス(cpu, cuda:0)
history:
訓練&検証結果が格納された配列。最初の訓練の場合は空の配列。例: (エポック数, 訓練損失, 訓練精度, 検証損失, 検証精度)
Returns
--------------
history:
今回の訓練&検証結果を追加した配列
"""
# 既に訓練済みのエポック数
base_epochs = len(history)
# 訓練
for epoch in range(base_epochs, num_epochs+base_epochs):
n_train_acc, n_val_acc = 0, 0 # 1エポックあたりの正解数(精度計算用)
train_loss, val_loss = 0, 0 # 1エポックあたりの累積損失(平均化前)
n_train, n_val = 0, 0 # 1エポックあたりのデータ累積件数
#訓練フェーズ
net.train()
for inputs, labels in tqdm(train_loader):
# データ件数
train_batch_size = len(labels) # 1バッチあたりのデータ件数
n_train += train_batch_size # データ累積件数
# デバイス(cpu, gpu)に割り当てる
inputs = inputs.to(device)
labels = labels.to(device)
# 勾配の初期化
optimizer.zero_grad()
# 予測計算
outputs = net(inputs)
# 損失計算
loss = criterion(outputs, labels)
# 勾配計算
loss.backward()
# パラメータ修正
optimizer.step()
# 予測ラベル導出
predicted = torch.max(outputs, 1)[1]
# 平均前の損失と正解数の計算
# lossは平均計算が行われているので平均前の損失に戻して加算
train_loss += loss.item() * train_batch_size
n_train_acc += (predicted == labels).sum().item()
#予測フェーズ
net.eval()
for inputs_val, labels_val in val_loader:
# データ件数
val_batch_size = len(labels_val) # 1バッチあたりのデータ件数
n_val += val_batch_size # データ累積件数
# GPUヘ転送
inputs_val = inputs_val.to(device)
labels_val = labels_val.to(device)
# 予測計算
outputs_val = net(inputs_val)
# 損失計算
loss_val = criterion(outputs_val, labels_val)
# 予測ラベル導出
predict_val = torch.max(outputs_val, 1)[1]
# 平均前の損失と正解数の計算
# lossは平均計算が行われているので平均前の損失に戻して加算
val_loss += loss_val.item() * val_batch_size
n_val_acc += (predict_val == labels_val).sum().item()
# 精度計算
train_acc = n_train_acc / n_train
val_acc = n_val_acc / n_val
# 損失計算
avg_train_loss = train_loss / n_train
avg_val_loss = val_loss / n_val
# 結果表示
print (f'Epoch [{(epoch+1)}/{num_epochs+base_epochs}], tr_loss: {avg_train_loss:.5f} tr_acc: {train_acc:.5f} val_loss: {avg_val_loss:.5f}, val_acc: {val_acc:.5f}')
# 記録
item = np.array([epoch+1, avg_train_loss, train_acc, avg_val_loss, val_acc])
history = np.vstack((history, item))
return history
三つ目に、訓練&検証の過程を学習曲線として可視化するための関数を作成する。
def check_history(history):
"""
historyを確認するための関数
Parameters
------------------
history: np.array: (繰り返し数, 訓練損失, 訓練精度, 検証損失, 検証精度)
訓練結果が格納されたnumpy配列
Return
------------------
None
Display
------------------
訓練開始時の損失&精度と、訓練終了時の損失&精度
損失に関する学習曲線
精度に関する学習曲線
"""
#損失と精度の確認
print(f'初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}')
print(f'最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}' )
num_epochs = len(history)
ticks_interval = 5 #num_epochs // 10
# 学習曲線の表示 (損失)
plt.figure(figsize=(8,8))
plt.plot(history[:,0], history[:,1], 'b', label='訓練')
plt.plot(history[:,0], history[:,3], 'k', label='検証')
plt.xticks(np.arange(0, num_epochs+1, ticks_interval))
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.legend()
plt.grid()
plt.show()
# 学習曲線の表示 (精度)
plt.figure(figsize=(8,8))
plt.plot(history[:,0], history[:,2], 'b', label='訓練')
plt.plot(history[:,0], history[:,4], 'k', label='検証')
plt.xticks(np.arange(0, num_epochs+1, ticks_interval))
plt.xlabel('繰り返し回数')
plt.ylabel('精度')
plt.title('学習曲線(精度)')
plt.legend()
plt.grid()
plt.show()
ネットワークを定義する
CNNを定義する。畳み込み層は2つ、プーリング層はMaxPooling、活性化関数はReLU、全結合層は2つとした。
class CNN(nn.Module):
def __init__(self, in_channel, sequence_length, n_hidden, n_output):
super().__init__()
self.conv1 = nn.Conv1d(in_channel, 32, 3, padding=1)
self.conv2 = nn.Conv1d(32, 64, 3, padding=1)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)
self.flatten = nn.Flatten()
self.l1 = nn.Linear(64*(sequence_length//2), n_hidden)
self.l2 = nn.Linear(n_hidden, n_output)
self.features = nn.Sequential(
self.conv1,
self.relu,
self.conv2,
self.relu,
self.maxpool)
self.classifier = nn.Sequential(
self.l1,
self.relu,
self.l2)
def forward(self, x):
x1 = self.features(x)
x2 = self.flatten(x1)
x3 = self.classifier(x2)
return x3
今回の問題特有のパラメータを取得する
入力チャネル数やシーケンス長(特徴量ベクトルの長さ)、出力次元数(分類クラス数)といった、今回の問題特有のパラメータを取得する。
# テンソルの形状を確認
print(X_tr_tensor.shape)
# 分類クラスを確認
classes = le.classes_
print(classes)
torch.Size([5983, 1, 561])
['LAYING' 'SITTING' 'STANDING' 'WALKING' 'WALKING_DOWNSTAIRS' 'WALKING_UPSTAIRS']
# 入力チャネル数
in_channel = X_tr_tensor.shape[1]
# シーケンス長
sequence_length = X_tr_tensor.shape[2]
# 出力次元数
n_output = len(classes)
# 表示
print(f'in_channel: {in_channel}, sequence_length: {sequence_length}, n_output: {n_output}')
in_channel: 1, sequence_length: 561, n_output: 6
ハイパーパラメータをセットする。
バッチサイズ(batch_size)は既に設定済みなので、それ以外のハイパーパラメータを設定する。
# 全結合層の隠れ層のノード数
n_hidden = 128
# エポック数
num_epochs = 30
# 学習率
lr = 0.0001
インスタンス化・損失関数・最適化手法
ネットワークをインスタンス化し、損失関数と最適化手法を設定する。最適化手法にはAdamを使った。
# ネットワークインスタンス作成
net = CNN(in_channel, sequence_length, n_hidden, n_output)
# 損失関数
criterion = nn.CrossEntropyLoss() # 交差エントロピー誤差
# 最適化関数と学習率
optimizer = optim.Adam(net.parameters(), lr=lr)
訓練
いよいよ訓練を行う。記録用の配列history
を作成し、自作関数fit()
を実行する。
# 訓練&検証結果記録用の配列
history = np.zeros((0, 5)) # (繰り返し数, 訓練損失, 訓練精度, 検証損失, 検証精度)
history = fit(
net=net,
optimizer=optimizer,
criterion=criterion,
num_epochs=num_epochs,
train_loader=tr_loader,
val_loader=val_loader,
device=device,
history=history
)
結果を確認する
結果を確認するため、学習曲線を描かせる。
## Train & validation result check
check_history(history)
初期状態: 損失: 0.67588 精度: 0.79182
最終状態: 損失: 0.07743 精度: 0.97297
学習曲線を見る限り、ちゃんと学習できてるっぽい。ネットワークもハイパーパラメータも超適当だったので、逆に不安…。検証データの精度は最終的に、97.2%になった。
テストデータで予測
テストデータ(test.csv)で予測させてみる。
pred_test = np.zeros((0, n_output))
for inputs_test, labels_test in test_loader:
# 予測モード
net.eval()
# デバイスへ割り当て(転送)
inputs_test = inputs_test.to(device)
labels_test = labels_test.to(device)
# 予測計算
outputs_test = net(inputs_test)
outputs_test = outputs_test.cpu().detach().numpy()
# 結果を格納
pred_test = np.vstack((pred_test, outputs_test))
print(f"pred_test shape: {pred_test.shape}")
print(pred_test)
pred_test shape: (2947, 6)
[[ -0.63694131 1.76891136 8.39913654 -6.22003174 -2.34908938
-7.08003855]
[ -0.52387488 3.97222996 8.97561169 -6.58308029 -6.14899445
-8.314394 ]
[ -1.5507524 6.42444754 9.12291622 -3.70531774 -5.36039734
-12.7016573 ]
...
[ -4.61544228 -5.56775904 -3.83927917 0.84662157 1.77309287
8.53026772]
[ -5.25395632 -2.8374095 -4.95572138 1.79179215 1.91084707
6.3403554 ]
[ -5.29117155 -3.92615199 -4.08838272 3.66079497 2.58698273
4.81902313]]
softmax関数を使って、出力値を確率に変換する。
# 確率に変換
proba_test = torch.softmax(torch.tensor(pred_test), dim=1).numpy()
np.set_printoptions(suppress=True, precision=4)
print(f"proba_test shape: {proba_test.shape}")
print(proba_test)
proba_test shape: (2947, 6)
[[0.0001 0.0013 0.9985 0. 0. 0. ]
[0.0001 0.0067 0.9933 0. 0. 0. ]
[0. 0.0631 0.9369 0. 0. 0. ]
...
[0. 0. 0. 0.0005 0.0012 0.9984]
[0. 0.0001 0. 0.0103 0.0117 0.9779]
[0. 0.0001 0.0001 0.2209 0.0755 0.7034]]
ラベルに変換する。
# ラベルに変換
y_pred_test = np.argmax(pred_test, 1)
print(f"y_pred_test shape: {y_pred_test.shape}")
print(y_pred_test)
y_pred_test shape: (2947,)
[2 2 2 ... 5 5 5]
test.csvの行数と同じ2947個の予測ラベルが得られた。最後に、正解率を計算する。
# 精度を確認
acc_test = accuracy_score(y_pred_test, y_test)
print(f'test accuracy: {acc_test:.5f}')
test accuracy: 0.95351
手元の結果では、テストデータの正解率は95.3%だった。検証データの正解率が97%くらいだったので、近い数字が出てしまった…。
おわりに
Kaggle上で公開されているHuman Activity Recognition with Smartphonesのデータセットを使って、動作分類をしてみた。分類クラスは6で、機械学習アルゴリズムにはCNNを使った結果、正解率は95.3%だった。