0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Kaggleじゃないけど】CNNで可変長データを分類する【Python】

Posted at

はじめに

前回、Human Activity Recognition with SmartphonesとCNNを使って分類問題を解きました。センサーデータから得られた561個の特徴量を固定長のシーケンスに見立てましたが、実際の問題だとシーケンス長が固定じゃない場合が多そうだなと思い、可変長データの分類をやってみようと思います。

ただ、可変長のデータセットがうまく見つからなかったので、適当にデータを作り、それを分類するという自作自演になってしまいました…。

とりあえずやったことの紹介

sinとcosをベースに適当に振幅を変えたり、ノイズを加えたりして、3種類のデータを作りました。
生成したデータ - コピー.png

CNNは前回とほぼ同じです。ただ、可変長シーケンスに対応するため、細かいところを修正しなければならず、苦労しました。学習曲線は以下の通りで、初期の学習がうまく進んでいませんが、お許しください(誰にw?)。

初期状態: 損失: 1.06633 精度: 0.72667
最終状態: 損失: 0.04075 精度: 0.99667

学習曲線_損失 - コピー.png
学習曲線_精度 - コピー.png

そもそもデータが適当なので、正解率などの数値は参考程度かと思います。個別の予測結果(の一部)を図示すると以下のような感じです。タイトル欄にある左側の数字は正解ラベル、右側が予測値です。上の図の赤い線が0、青い線が1、緑の線が2です。
結果 - コピー.png

4行4列目が予測を外していますが、これは人の目で見ても間違えそうです。正解が1なので、本来は振幅がもっと小さいはずですが、乱数の影響でたまたま振幅が大きくなってしまったようです。ちなみに、波形の右側が平らになっているのは、元のデータが可変長だからです。0埋めされています。

ライブラリをインポート

では、実装です。

import numpy as np
import random
from tqdm.notebook import tqdm

# torch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split

# sklearn
from sklearn.metrics import accuracy_score

# matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import japanize_matplotlib

# warnings
import warnings
warnings.filterwarnings('ignore')

準備

乱数固定や定数の設定、GPUのチェックを行います。

SEED = 123
def seed_everything(seed=42):
    random.seed(seed)
    #np.random.seed(seed)
    np.random.RandomState(seed)
    # os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.Generator().manual_seed(seed)
    torch.backends.cudnn.daterministic = True
    torch.use_deterministic_algorithms = True
    # torch.backends.cudnn.benchmark = False
seed_everything(SEED)
# データ生成
subject = 1000
min_seq_len = 24
max_seq_len = 48
num_channel = 1
num_class = 3

# 訓練
batch_size = 32 # バッチサイズ
n_hidden = 16 # 全結合層の隠れ層のノード数
lr = 0.0002 #0.0001 # 学習率
num_epochs = 30 # 30 # エポック数
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

データ生成

データを作成します。今回3種類の波形データを作ります。sinやcosに適当な振幅を掛け、ノイズを加えただけです。

def wave_0(T):
    a = np.random.uniform(0.8, 1.2)
    return [(a*np.sin(t))+(np.random.randn()/8) for t in T]

def wave_1(T):
    a = np.random.uniform(0.3, 0.7)
    return [(a*np.sin(t))+(np.random.randn()/8) for t in T]

def wave_2(T):
    a = np.random.uniform(0.8, 1.2)
    return [(a*np.cos(t))+(np.random.randn()/8) for t in T]

3種類の波形を1000通り、合計3000個のデータを作成しました。

X = []
y = []
seq_lengths = []
for i in range(subject):
    seq_len = np.random.randint(min_seq_len, max_seq_len)
    T = np.linspace(0, 2*2*np.pi, max_seq_len+1)[:seq_len]
    # wave_0
    X.append(np.array(wave_0(T)).reshape(num_channel, -1)) # (channel, sequence_length)
    y.append(0)
    seq_lengths.append(seq_len)
    # wave_2
    X.append(np.array(wave_1(T)).reshape(num_channel, -1))
    y.append(1)
    seq_lengths.append(seq_len)
    # wave_3
    X.append(np.array(wave_2(T)).reshape(num_channel, -1))
    y.append(2)
    seq_lengths.append(seq_len)

print(f"""
X length: {len(X)},
y length: {len(y)},
sequence lenght: {len(seq_lengths)}
""")

作成した波形データをプロットすると以下のようになります。赤い線がwave_0、青い線がwave_1、緑の線がwave_2です。今回シーケンス長を24~48にしたので、線は途中で途切れてます。後半につれ、線の色が薄くなっているのはそのためです。

# 可視化
for i, label in enumerate(y):
    if label==0: plt.plot(X[i][0], alpha=0.01, color='red')
    elif label==1: plt.plot(X[i][0], alpha=0.01, color='blue')
    elif label==2: plt.plot(X[i][0], alpha=0.01, color='green')
plt.xticks(np.linspace(0, max_seq_len, 9))
plt.xlabel('sequence')
plt.grid()
plt.show()

生成したデータ.png

データローダーを作成

データローダーを作ります。

# データセットクラスを定義
class TimeSeriesDataset(Dataset):
    def __init__(self, data, labels):
        self.data = [torch.tensor(d, dtype=torch.float32) for d in data]
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# インスタンス化
dataset = TimeSeriesDataset(X, y)
print(len(dataset))
3000

(繰り返しになりますが)今回のデータは可変長なので、シーケンス長をそろえるための関数を定義します。このあたりは生成AIに相談しながら実装したので、もっと良い解法があるのかもしれません…。

# 可変シーケンス対応
# パディングを適用してすべてのシーケンスを同じ長さにする関数
def pad_collate_fn(batch):
    max_length = max([x[0].shape[1] for x in batch])
    #print(f"max_length: {max_length}")
    padded_data = []
    labels = []
    for data, label in batch:
        # シーケンスの最後に0を追加してパディング
        #print(data.shape)
        #print(data[0])
        padded_seq = nn.functional.pad(data, (0, max_length - data.shape[1], 0, 0))  # (channels, padding_length)
        padded_data.append(padded_seq)
        labels.append(label)
    return torch.stack(padded_data), torch.tensor(labels)

# 訓練用と検証用に分割する
train_size = int(len(dataset) * 0.8)
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# データローダーを作成
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=pad_collate_fn)

データローダーから最初の1バッチを取り出し、グラフ化します。

# 最初の1セットを取り出し、可視化
for X_train, y_train in train_loader:
    break
    
plt.figure()
for i, label in enumerate(y_train):
    if label==0: plt.plot(X_train[i][0], alpha=0.1, color='red')
    elif label==1: plt.plot(X_train[i][0], alpha=0.1, color='blue')
    elif label==2: plt.plot(X_train[i][0], alpha=0.1, color='green')
plt.xticks(np.linspace(0, max_seq_len, 9))
plt.xlabel('sequence')
plt.grid()
plt.show()

データローダーから取り出したデータ.png

シーケンス方向のパディングは出来てそうですし、大丈夫な気がします。

関数を定義

訓練に入る前に、必要な処理を関数化しておきます。

def eval_loss(loader, device, net, criterion):
    """
    損失関数を計算するための関数

    Parameters
    ----------
    loader: torch.utils.data.dataloader.DataLoader
        データローダー
    device: torch.device
        テンソルをどのデバイスに割り当てるか(cuda:0, cpu)
    net:
        学習ネットワーク
    criterion: 
        損失関数(交差エントロピー誤差など)

    Return
    ---------
    loss:
        損失値
    """
    # データローダーから最初の1セットを取得する
    for images, labels in loader:
        break
    # デバイスの割り当て
    inputs = images.to(device)
    labels = labels.to(device)
    # 予測計算
    outputs = net(inputs)
    #  損失計算
    loss = criterion(outputs, labels)
    return loss
def fit(net, optimizer, criterion, num_epochs, train_loader, val_loader, device, history):
    """
    分類問題に対する訓練用の関数。base_epochsを設定することで、追加訓練ができるようになっている。

    Parameters
    -------------
    net: 
        訓練ネットワーク
    optimizer:
        最適化手法(SGD, Adamなど)
    criterion:
        損失関数(交差エントロピー誤差など)
    num_epochs: int
        エポック数
    train_loader: torch.utils.data.dataloader.DataLoader
        訓練用のデータローダー
    val_loader: torch.utils.data.dataloader.DataLoader
        検証用のデータローダー
    device: 
        テンソルを割り当てるデバイス(cpu, cuda:0)
    history:
        訓練&検証結果が格納された配列。最初の訓練の場合は空の配列。例: (エポック数, 訓練損失, 訓練精度, 検証損失, 検証精度)

    Returns
    --------------
    history:
        今回の訓練&検証結果を追加した配列
    """
    # 既に訓練済みのエポック数
    base_epochs = len(history)

    # 訓練
    for epoch in range(base_epochs, num_epochs+base_epochs):
        n_train_acc, n_val_acc = 0, 0 # 1エポックあたりの正解数(精度計算用)
        train_loss, val_loss = 0, 0 # 1エポックあたりの累積損失(平均化前)
        n_train, n_val = 0, 0 # 1エポックあたりのデータ累積件数

        #訓練フェーズ
        net.train()
        for inputs, labels in tqdm(train_loader):
            # データ件数
            train_batch_size = len(labels) # 1バッチあたりのデータ件数
            n_train += train_batch_size  # データ累積件数
            
            # デバイス(cpu, gpu)に割り当てる
            inputs = inputs.to(device)
            labels = labels.to(device)

            # 勾配の初期化
            optimizer.zero_grad()

            # 予測計算
            #print(inputs.shape, labels.shape)
            outputs = net(inputs)

            # 損失計算
            loss = criterion(outputs, labels)

            # 勾配計算
            loss.backward()

            # パラメータ修正
            optimizer.step()

            # 予測ラベル導出
            predicted = torch.max(outputs, 1)[1]

            # 平均前の損失と正解数の計算
            # lossは平均計算が行われているので平均前の損失に戻して加算
            train_loss += loss.item() * train_batch_size 
            n_train_acc += (predicted == labels).sum().item() 

        #予測フェーズ
        net.eval()
        for inputs_val, labels_val in val_loader:
            # データ件数
            val_batch_size = len(labels_val) # 1バッチあたりのデータ件数
            n_val += val_batch_size # データ累積件数

            # GPUヘ転送
            inputs_val = inputs_val.to(device)
            labels_val = labels_val.to(device)

            # 予測計算
            outputs_val = net(inputs_val)

            # 損失計算
            loss_val = criterion(outputs_val, labels_val)
 
            # 予測ラベル導出
            predict_val = torch.max(outputs_val, 1)[1]

            #  平均前の損失と正解数の計算
            # lossは平均計算が行われているので平均前の損失に戻して加算
            val_loss +=  loss_val.item() * val_batch_size
            n_val_acc +=  (predict_val == labels_val).sum().item()

        # 精度計算
        train_acc = n_train_acc / n_train
        val_acc = n_val_acc / n_val
        # 損失計算
        avg_train_loss = train_loss / n_train
        avg_val_loss = val_loss / n_val
        # 結果表示
        print (f'Epoch [{(epoch+1)}/{num_epochs+base_epochs}], tr_loss: {avg_train_loss:.5f} tr_acc: {train_acc:.5f} val_loss: {avg_val_loss:.5f}, val_acc: {val_acc:.5f}')
        # 記録
        item = np.array([epoch+1, avg_train_loss, train_acc, avg_val_loss, val_acc])
        history = np.vstack((history, item))
    return history
def check_history(history):
    """
    historyを確認するための関数

    Parameters
    ------------------
    history: np.array: (繰り返し数, 訓練損失, 訓練精度, 検証損失, 検証精度)
        訓練結果が格納されたnumpy配列

    Return
    ------------------
    None

    Display
    ------------------
    訓練開始時の損失&精度と、訓練終了時の損失&精度
    損失に関する学習曲線
    精度に関する学習曲線
    """
    #損失と精度の確認
    print(f'初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}') 
    print(f'最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}' )

    num_epochs = len(history)
    ticks_interval = 5 #num_epochs // 10

    # 学習曲線の表示 (損失)
    plt.figure(figsize=(8,8))
    plt.plot(history[:,0], history[:,1], 'b', label='訓練')
    plt.plot(history[:,0], history[:,3], 'k', label='検証')
    plt.xticks(np.arange(0, num_epochs+1, ticks_interval))
    plt.xlabel('繰り返し回数')
    plt.ylabel('損失')
    plt.title('学習曲線(損失)')
    plt.legend()
    plt.grid()
    plt.show()

    # 学習曲線の表示 (精度)
    plt.figure(figsize=(8,8))
    plt.plot(history[:,0], history[:,2], 'b', label='訓練')
    plt.plot(history[:,0], history[:,4], 'k', label='検証')
    plt.xticks(np.arange(0, num_epochs+1, ticks_interval))
    plt.xlabel('繰り返し回数')
    plt.ylabel('精度')
    plt.title('学習曲線(精度)')
    plt.legend()
    plt.grid()
    plt.show()

問題特有のパラメータ

in_channel = num_channel
sequence_length = max_seq_len-1
n_output = num_class

# 表示
print(f'in_channel: {in_channel}, sequence_length: {sequence_length}, n_output: {n_output}')
in_channel: 1, sequence_length: 47, n_output: 3

モデル作成

モデルを定義します。畳み込み層が2つ、活性化関数はReLU、プーリング層にはAdaptive Max Poolingを使います。最初は全結合層を2層にしていたのですが、学習が早く終わりすぎてしまったので、1層に減らしました。

class CNN(nn.Module):
    def __init__(self, in_channel, sequence_length, n_hidden, n_output):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channel, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU(inplace=True)
        #self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.adpool = nn.AdaptiveMaxPool1d(1)  # 最後のプーリング層
        self.flatten = nn.Flatten()
        #self.l1 = nn.Linear(64*(sequence_length//2), n_hidden)
        self.l1 = nn.Linear(64, n_hidden)
        self.l2 = nn.Linear(n_hidden, n_output)
        self.l3 = nn.Linear(32, n_output)
    
        self.features = nn.Sequential(
            self.conv1,
            self.relu,
            self.conv2,
            self.relu,
            self.adpool
        )
        
        self.classifier = nn.Sequential(
            # case 1
            # self.l1,
            # self.relu,
            # self.l2
            
            # case 2
            self.l3
        )

    def forward(self, x):
        x1 = self.features(x)
        x2 = self.flatten(x1)
        x3 = self.classifier(x2)
        return x3  

損失関数は交差エントロピー誤差、最適化手法はAdamです。

# インスタンス作成
net = CNN(in_channel, sequence_length, n_hidden, n_output).to(device)

# 損失関数
criterion = nn.CrossEntropyLoss() # 交差エントロピー誤差

# 最適化関数と学習率
optimizer = optim.Adam(net.parameters(), lr=lr)

訓練

訓練と検証を行います。

# 訓練&検証結果記録用の配列
history = np.zeros((0, 5)) # (繰り返し数, 訓練損失, 訓練精度, 検証損失, 検証精度)

history = fit(
    net=net,
    optimizer=optimizer,
    criterion=criterion,
    num_epochs=num_epochs,
    train_loader=train_loader,
    val_loader=val_loader,
    device=device,
    history=history
)

学習曲線

学習曲線を表示させます。

check_history(history)
初期状態: 損失: 1.06633 精度: 0.72667
最終状態: 損失: 0.04075 精度: 0.99667

学習曲線_損失.png

学習曲線_精度.png

初期段階の学習がうまく進んでおらず、損失の減少ペースが緩やかなのが気になりますが、とりあえずコードは実行できました。今回、検証データと訓練データの分割はちゃんとできていない(そもそもデータ自体が適当に自作したもの…)ので、過学習についてはわかりませんでした。

予測結果

予測結果を見てみます。検証データを使って、波形と正解ラベルと予測ラベルを表示します。

# 可視化
for inputs_val, labels_val in val_loader:
    

    # GPUヘ転送
    inputs_val = inputs_val.to(device)
    labels_val = labels_val.to(device)
    
    # 予測計算
    outputs_val = net(inputs_val)
    predicts_val = torch.max(outputs_val, 1)[1]
        
    plt.figure(figsize=(16, 8))
    for i in range(batch_size):
        y_pred = predicts_val[i]
        y_true = labels_val[i]
        
        ax = plt.subplot(4, 8, i+1)
        ax.tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
        plt.plot(inputs_val[i][0].cpu())
        plt.ylim(-1.5, 1.5)
        if y_pred == y_true:
            c = 'k'
        else:
            c = 'b'
        plt.title(f"{y_true}:{y_pred}", color=c)
    plt.show()

結果.png
タイトル欄にある左側の数字が正解ラベルで、右側の数字が予測ラベルです。4行4列目のデータは予測が外れています。が、これは人の目で見ても間違えそうな気がします。理由は冒頭に書いたので、割愛。

おわりに

可変シーケンスのデータをCNNで分類する問題に取り組みました。いい感じのデータが見つからず、データを自作するところから始めたため、精度に関してはわかりませんが、とりあえず実装は出来たような気がしています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?