# 0.概要

CNNを用いて心電図の1拍分の波形から不整脈を分類します。(参考論文で紹介されていたモデルの実験的な実装です。実用性等は考えていないのでモデルのチューニングや比較検証はしません。)

# 1.不整脈とECG

### Dataset

• Python: 3.6.7

0: N, 1: S, 2: V, 3: F, 4: Q
それぞれのクラスに以下のようにいくつかの不整脈が含まれます。

Code
```import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import copy
​
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
​
from sklearn.metrics import f1_score, confusion_matrix, roc_curve, auc
​
import warnings
warnings.filterwarnings('ignore')
​
plt.style.use('default')
sns.set()
sns.set_style('whitegrid')

print('Train data')
print(train_df.info())
print('Type Count')
print(train_df.iloc[:, 187].value_counts())
print('--------------------')
print('Test data')
print(test_df.info())
print('Type Count')
print(test_df.iloc[:, 187].value_counts())
```

Class 0 1 2 3 4
Train 72471 2223 5788 641 6431
Test 18118 556 1448 162 1608

Code
```train_X = train_df.values[:, :-1]
train_y = train_df.values[:, -1].astype(int)
test_X = test_df.values[:, :-1]
test_y = test_df.values[:, -1].astype(int)

train_X = np.concatenate([train_X, np.zeros((train_X.shape[0], 5))], axis=1)
test_X = np.concatenate([test_X, np.zeros((test_X.shape[0], 5))], axis=1)

train_y_onehot = np.zeros((train_X.shape[0], 5))
for i in range(train_X.shape[0]):
train_y_onehot[i, train_y[i]] = 1
test_y_onehot = np.zeros((test_X.shape[0], 5))
for i in range(test_X.shape[0]):
test_y_onehot[i, test_y[i]] = 1

x = np.arange(0, 192)*8/1000
classes = ['N', 'S', 'V', 'F', 'Q']

fig = plt.figure(figsize=(8, 16))

for i, ax in enumerate([ax1, ax2, ax3, ax4, ax5]):
ax.plot(x, train_X[train_y==i, :][0], label=classes[i], color='green')
ax.set_xlabel("Time")
ax.set_ylabel("Amplitude")
ax.set_title(classes[i])
ax.legend()
fig.tight_layout()
plt.savefig('ecg_beats.png')
```

それでは早速分類していきましょう。

# 2.CNN

• Pytorch: 1.1.0
• 12クラスのECG波形からの不整脈分類の研究[３]で用いられていた1d-CNNモデルを参考にします。

### Dataset

• 検証セットは無し(訓練とテストのみ)。
• Data Augmentationは無し。
• Test Time Augmentationを用いる。

Code
```# ---Hyperparameters---
num_tta = 5
ngpu = 1
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")

# ---Transforms---
## If needed.

# --- Dataset---
class TrainECGDataset(Dataset):
### In order to inherit Dataset, define __len__ and __getitem__!

def __init__(self, train_X, train_y, transforms=None):
super().__init__()
self.X = train_X
self.y = train_y
self.transforms = transforms

def __len__(self):
return len(self.X)

def __getitem__(self, idx):
sequence = self.X[idx]
if self.transforms is not None:
sequence = self.transforms(sequence)
sequence = torch.unsqueeze(torch.from_numpy(sequence).float(), 0)
target = torch.tensor(self.y[idx]).long()

return sequence, target

class TestECGDataset(Dataset):
### Implement TTA (Test Time Augmentation).

def __init__(self, test_X, test_y, transforms=None, tta=num_tta):
super().__init__()
self.X = test_X
self.y = test_y
self.transforms = transforms
self.tta = tta

def __len__(self):
return len(self.X) * self.tta

def __getitem__(self, idx):
new_idx = idx % len(self.X)
sequence = self.X[new_idx]
if self.transforms is not None:
sequence = self.transforms(sequence)
sequence = torch.unsqueeze(torch.from_numpy(sequence).float(), 0)
target = torch.tensor(self.y[new_idx]).long()

return sequence, target

## If needed.
```

### Model

• Residual connectionを組み込む(今回のネットワークは深くないので実際不要。)。
• Pre-activationとDropoutを用いる。
• Shortcut部分はダウンサンプリングのために、[1x1Conv1d, BatchNormalization, Max Pooling]を用いる。

Code
```def weights_init(m):
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)

class BaseBlock(nn.Module):

def __init__(self, in_channels, out_channels):
super(BaseBlock, self).__init__()
self.deeppath = nn.Sequential(
nn.BatchNorm1d(in_channels),
nn.PReLU(),
nn.Conv1d(in_channels, out_channels, kernel_size=4, stride=2, padding=1, bias=False),
nn.BatchNorm1d(out_channels),
nn.PReLU(),
nn.Dropout(p=0.2, inplace=True),
nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
)
self.shortcut = nn.Sequential(
nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
nn.BatchNorm1d(out_channels),
nn.MaxPool1d(kernel_size=2)
)

def forward(self, x):
dp = self.deeppath(x)
sc = self.shortcut(x)
return dp + sc

class Flatten(nn.Module):

def __init__(self):
super(Flatten, self).__init__()

def forward(self, x):
return x.view(x.size(0), -1)

class ECGResNet(nn.Module):

def __init__(self, block, num_classes=5):
super(ECGResNet, self).__init__()
self.net = nn.Sequential(
nn.Conv1d(1, 16,　kernel_size=4, stride=2, padding=1, bias=False),
nn.BatchNorm1d(16),
nn.PReLU(),
nn.Conv1d(16, 64, kernel_size=4, stride=2, padding=1, bias=False),
nn.BatchNorm1d(64),
nn.PReLU(),
nn.Dropout(p=0.2, inplace=True),
nn.Conv1d(64, 64, kernel_size=3, stride=1, padding=1, bias=False),
block(64, 128),
nn.BatchNorm1d(128),
nn.PReLU(),
Flatten(),
nn.Linear(128 * 24, num_classes)
)

def forward(self, x):
return self.net(x)

model = ECGResNet(BaseBlock).to(device)
model.apply(weights_init)
```

### Training

• 今回はハイパーパラメータのチューニング等はしないので、検証データセットは作成せずに訓練させ、テストデータのF1スコアが最大になるようなモデルを求めます(K-Fold CVをする場合は訓練データのクラスが偏っているので、scikit-learnの`StratifiedKFold`などを使って層別分割します(`random_split`ダメ、絶対))。
• loss: `CrossEntropyLoss`
• optimizer: `Adam`
• schedular: `ReduceLROnPlateau`

Code
```lr = 0.01
num_epochs = 4
batch_size=128

softmax = nn.Softmax()
criterion = nn.CrossEntropyLoss()
schedular = ReduceLROnPlateau(optimizer, 'min', factor=0.1, patience=2, verbose=True)

train_losses = []
test_losses =[]
train_f1s = []
test_f1s = []

train_preds = []
test_preds = []

best_model_wts = copy.deepcopy(model.state_dict())
best_f1 = 0
best_probs = None
best_preds = None

datasets = {
'train': TrainECGDataset(train_X, train_y),
'test': TestECGDataset(test_X, test_y)
}

}

# ---Training and Evaluation---
start_time = time.time()

for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 20)

for phase in ['test', 'train']:
# 訓練無しでの性能を調べたいので先にテストセットの評価
running_loss = 0 # of train or test
epoch_loss = 0
epoch_train_f1 = 0
epoch_test_f1 = 0
epoch_probs = None
epoch_preds = None
labels_list = None

if phase == 'train':
model.train()
else:
model.eval()

inputs = inputs.to(device)
labels = labels.to(device)

outputs = model(inputs)
loss = criterion(outputs, labels)
if phase == 'train':
loss.backward()
optimizer.step()

_, preds = torch.max(outputs, 1)
probs = softmax(outputs).detach().cpu().numpy()
preds = preds.detach().cpu().numpy()
labels = labels.cpu().numpy()
if epoch_probs is None:
epoch_probs = probs
epoch_preds = preds
epoch_labels = labels
else:
epoch_probs = np.concatenate([epoch_probs, probs])
epoch_preds = np.concatenate([epoch_preds, preds])
epoch_labels = np.concatenate([epoch_labels, labels])

running_loss += loss.item() * inputs.size(0)

epoch_loss = running_loss / len(datasets[phase])

if phase == 'train':
schedular.step(epoch_test_f1)
train_losses.append(epoch_loss)
epoch_train_preds = epoch_preds
train_preds.append(epoch_train_preds)
epoch_train_f1 = f1_score(epoch_labels, epoch_train_preds, average='macro')
train_f1s.append(epoch_train_f1)
print('{} | Loss: {} | F1: {}'.format(phase, epoch_loss, epoch_train_f1))
else:
test_losses.append(epoch_loss)
tta_probs = np.zeros((int(len(datasets[phase]) / num_tta), 5))
for i in range(num_tta):
tta_probs += epoch_probs[int(len(datasets[phase]) / num_tta) * i:int(len(datasets[phase]) / num_tta) * (i + 1)]
tta_probs /= num_tta
tta_preds = np.argmax(tta_probs, axis=1)
test_preds.append(tta_preds)
epoch_test_f1 = f1_score(test_y, tta_preds, average='macro')
test_f1s.append(epoch_test_f1)
print('{} | Loss: {} | F1: {}'.format(phase, epoch_loss, epoch_test_f1))

if epoch_test_f1 > best_f1:
best_f1 = epoch_test_f1
best_probs = tta_probs
best_preds = tta_preds
best_model_wts = copy.deepcopy(model.state_dict())

time_elapsed  = time.time() - start_time
print('Training complete in {}m {}s.'.format(time_elapsed // 60, time_elapsed % 60))
print('Best f1: {}'.format(best_f1))

```

### Results

Code
```#---Analysis---

## Loss and F1
fig = plt.figure(figsize=(10, 5))

ax1.plot(train_losses, label='train')
ax1.plot(test_losses, label='test')
ax2.plot(train_f1s, label='train')
ax2.plot(test_f1s, label='test')
ax1.set_xlabel("Epoch")
ax2.set_xlabel("Epoch")
ax1.set_ylabel("Loss")
ax2.set_ylabel("F1-score")
ax1.legend()
ax2.legend()
fig.tight_layout()
plt.savefig('loss_f1.png')

## Prediction
fig = plt.figure(figsize=(10, 8))

for i, ax in enumerate([ax1, ax2, ax3, ax4]):
ax.plot(test_preds[i], label='pred')
ax.plot(test_y, label='true')
ax.set_xlabel('Data')
ax.set_ylabel('Class')
ax.set_title('Epoch: {}'.format(str(i)))
ax.legend()
fig.tight_layout()
plt.savefig('test_classes.png')

## ROC
from sklearn.metrics import auc
classes = ['N', 'S', 'V', 'F', 'Q']

fig = plt.figure(figsize=(12, 8))

for i, ax in enumerate([ax1, ax2, ax3, ax4, ax5]):
fpr, tpr, _ = roc_curve(test_y_onehot[:, i], tta_probs[:, i])
score = round(auc(fpr, tpr), 3)
ax.plot(fpr, tpr, label='ROC curve (auc = {})'.format(score), color='red')
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title(classes[i])
ax.legend()
fig.tight_layout()
plt.savefig('roc.png')

## Confusion matrix
cm = confusion_matrix(test_y, tta_preds)
cm = cm / np.sum(test_y_onehot, axis=0).reshape(-1, 1)
df_cm = pd.DataFrame(cm, index=classes, columns=classes)
plt.figure(figsize = (6,5))
sns.heatmap(df_cm, cmap='Blues')
plt.show()
plt.savefig('confusion_matrix.png')
```

# 5.次回予告

• 今回作成したモデルの解釈(Activation mapping)
• GBDTでECGデータ分類(CatBoost)

# 6.参考

