はじめに
MNISTやっていきます
開発環境
mnist-tensorflow-keras
1.こちらのノートブックをやっていきます
2.ライブラリをインストール
%pip install tensorflow
3.関数を定義
def get_dataset(num_classes, rank=0, size=1):
from tensorflow import keras
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data('MNIST-data-%d' % rank)
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
return (x_train, y_train), (x_test, y_test)
def get_model(num_classes):
from tensorflow.keras import models
from tensorflow.keras import layers
model = models.Sequential()
model.add(layers.Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(num_classes, activation='softmax'))
return model
# Specify training parameters
batch_size = 128
epochs = 2
num_classes = 10
def train(learning_rate=1.0):
from tensorflow import keras
(x_train, y_train), (x_test, y_test) = get_dataset(num_classes)
model = get_model(num_classes)
# Specify the optimizer (Adadelta in this example), using the learning rate input parameter of the function so that Horovod can adjust the learning rate during training
optimizer = keras.optimizers.Adadelta(lr=learning_rate)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model
4.学習
model = train(learning_rate=0.1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
/local_disk0/.ephemeral_nfs/envs/pythonEnv-107bf0ee-693b-4dea-8790-78f3dc70991f/lib/python3.8/site-packages/keras/optimizer_v2/adadelta.py:74: UserWarning: The `lr` argument is deprecated, use `learning_rate` instead.
super(Adadelta, self).__init__(name, **kwargs)
Epoch 1/2
469/469 - 64s - loss: 0.6030 - accuracy: 0.8152 - val_loss: 0.2076 - val_accuracy: 0.9394 - 64s/epoch - 137ms/step
Epoch 2/2
469/469 - 64s - loss: 0.2775 - accuracy: 0.9175 - val_loss: 0.1417 - val_accuracy: 0.9584 - 64s/epoch - 136ms/step
5.モデル評価
_, (x_test, y_test) = get_dataset(num_classes)
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)
79/79 [==============================] - 2s 29ms/step - loss: 0.1417 - accuracy: 0.9584
loss: 0.1416797637939453
accuracy: 0.9584000110626221
6.HorovodRunnerを用いた学習
import os
import time
# Remove any existing checkpoint files
dbutils.fs.rm(("/ml/MNISTDemo/train"), recurse=True)
# Create directory
checkpoint_dir = '/dbfs/ml/MNISTDemo/train/{}/'.format(time.time())
os.makedirs(checkpoint_dir)
print(checkpoint_dir)
def train_hvd(checkpoint_path, learning_rate=1.0):
# Import tensorflow modules to each worker
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# Call the get_dataset function you created, this time with the Horovod rank and size
(x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size())
model = get_model(num_classes)
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint(checkpoint_path, save_weights_only = True))
model.fit(x_train, y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
sparkdlをインストール
%pip install sparkdl
%pip install tensorframes
from sparkdl import HorovodRunner
checkpoint_path = checkpoint_dir + '/checkpoint-{epoch}.ckpt'
learning_rate = 0.1
hr = HorovodRunner(np=2)
hr.run(train_hvd, checkpoint_path=checkpoint_path, learning_rate=learning_rate)
エラー
ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-107bf0ee-693b-4dea-8790-78f3dc70991f/lib/python3.8/site-packages/keras/applications/__init__.py)
mnist-pytorch
1.こちらのノートブックをやっていきます
2.ライブラリをインストールします
%pip install torch==1.5.0
%pip install torchvision==0.6.0
3.
import torch
import torch.nn as nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
# Specify training parameters
batch_size = 100
num_epochs = 3
momentum = 0.5
log_interval = 100
def train_one_epoch(model, device, data_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(data_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(data_loader) * len(data),
100. * batch_idx / len(data_loader), loss.item()))
def save_checkpoint(log_dir, model, optimizer, epoch):
filepath = log_dir + '/checkpoint-{epoch}.pth.tar'.format(epoch=epoch)
state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
}
torch.save(state, filepath)
def load_checkpoint(log_dir, epoch=num_epochs):
filepath = log_dir + '/checkpoint-{epoch}.pth.tar'.format(epoch=epoch)
return torch.load(filepath)
def create_log_dir():
log_dir = os.path.join(PYTORCH_DIR, str(time()), 'MNISTDemo')
os.makedirs(log_dir)
return log_dir
import torch.optim as optim
from torchvision import datasets, transforms
from time import time
import os
PYTORCH_DIR = '/dbfs/ml/horovod_pytorch'
single_node_log_dir = create_log_dir()
print("Log directory:", single_node_log_dir)
def train(learning_rate):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_dataset = datasets.MNIST(
'data',
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
for epoch in range(1, num_epochs + 1):
train_one_epoch(model, device, data_loader, optimizer, epoch)
save_checkpoint(single_node_log_dir, model, optimizer, epoch)
def test(log_dir):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loaded_model = Net().to(device)
checkpoint = load_checkpoint(log_dir)
loaded_model.load_state_dict(checkpoint['model'])
loaded_model.eval()
test_dataset = datasets.MNIST(
'data',
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(test_dataset)
test_loss = 0
for data, target in data_loader:
data, target = data.to(device), target.to(device)
output = loaded_model(data)
test_loss += F.nll_loss(output, target)
test_loss /= len(data_loader.dataset)
print("Average test loss: {}".format(test_loss.item()))
Log directory: /dbfs/ml/horovod_pytorch/1651122294.296862/MNISTDemo
train(learning_rate = 0.001)
test(single_node_log_dir)
<command-2886931701770921>:21: UserWarning: Implicit dimension choice for log_softmax has been deprecated. Change the call to include dim=X as an argument.
return F.log_softmax(x)
Average test loss: 0.9360446333885193
HorovodRunnerでエラーが出るため、割愛します
import horovod.torch as hvd
from sparkdl import HorovodRunner
hvd_log_dir = create_log_dir()
print("Log directory:", hvd_log_dir)
def train_hvd(learning_rate):
# Initialize Horovod
hvd.init()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type == 'cuda':
# Pin GPU to local rank
torch.cuda.set_device(hvd.local_rank())
train_dataset = datasets.MNIST(
# Use different root directory for each worker to avoid conflicts
root='data-%d'% hvd.rank(),
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
)
from torch.utils.data.distributed import DistributedSampler
# Configure the sampler so that each worker gets a distinct sample of the input dataset
train_sampler = DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
# Use train_sampler to load a different sample of data on each worker
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
model = Net().to(device)
# The effective batch size in synchronous distributed training is scaled by the number of workers
# Increase learning_rate to compensate for the increased batch size
optimizer = optim.SGD(model.parameters(), lr=learning_rate * hvd.size(), momentum=momentum)
# Wrap the local optimizer with hvd.DistributedOptimizer so that Horovod handles the distributed optimization
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# Broadcast initial parameters so all workers start with the same parameters
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
for epoch in range(1, num_epochs + 1):
train_one_epoch(model, device, train_loader, optimizer, epoch)
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
save_checkpoint(hvd_log_dir, model, optimizer, epoch)
hr = HorovodRunner(np=2)
hr.run(train_hvd, learning_rate = 0.001)