はじめに
petastormやっていきまーす
開発環境
10.4 LTS ML, Standard_NC6s_v3(※クォータ制限の緩和申請が必要です)
Spark から TensorFlow へのデータ変換を簡略化する
1.こちらのノートブックをやっていきます
2.ライブラリをインストールします
%pip install petastorm
%pip install tensorflow
%pip install hyperopt
%pip install horovod
%pip install sparkdl
%pip install tensorframes
3.ライブラリをインポートします
from pyspark.sql.functions import col
from petastorm.spark import SparkDatasetConverter, make_spark_converter
import io
import numpy as np
import tensorflow as tf
from PIL import Image
from petastorm import TransformSpec
from tensorflow import keras
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import horovod.tensorflow.keras as hvd
from sparkdl import HorovodRunner
エラー出た
/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:167: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
original_result = python_builtin_import(name, globals, locals, fromlist, level)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-30dab267-e5ed-4c28-9b42-2c7adb85cdb4/lib/python3.8/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
from pyarrow import LocalFileSystem
ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-30dab267-e5ed-4c28-9b42-2c7adb85cdb4/lib/python3.8/site-packages/keras/applications/__init__.py)
4.HorovodRunnerでエラーが出るので、一旦無視させてください
from pyspark.sql.functions import col
from petastorm.spark import SparkDatasetConverter, make_spark_converter
import io
import numpy as np
import tensorflow as tf
from PIL import Image
from petastorm import TransformSpec
from tensorflow import keras
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
# import horovod.tensorflow.keras as hvd
# from sparkdl import HorovodRunner
IMG_SHAPE = (224, 224, 3)
BATCH_SIZE = 32
NUM_EPOCHS = 5
5.flowers datasetを読み込みます
df = spark.read.format("delta").load("/databricks-datasets/flowers/delta") \
.select(col("content"), col("label"))
labels = df.select(col("label")).distinct().collect()
label_to_idx = {label: index for index, (label, ) in enumerate(sorted(labels))}
num_classes = len(label_to_idx)
df_train, df_val = df.limit(100).randomSplit([0.9, 0.1], seed=12345)
# Make sure the number of partitions is at least the number of workers which is required for distributed training.
df_train = df_train.repartition(2)
df_val = df_val.repartition(2)
6.Petastorm Spark converterを用いて、Spark DataFrameをキャッシュします
# Set a cache directory on DBFS FUSE for intermediate data.
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-ddc618cd-8ee3-4cca-80e1-acab2cf64950/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
self._filesystem = pyarrow.localfs
Converting floating-point columns to float32
The median size 5843009 B (< 50 MB) of the parquet files is too small. Total size: 11231784 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428034250-appid-app-20220428025739-0000-2c111a69-ebdf-4693-a4ff-6d86504c7e4e/part-00000-tid-8225040675238723687-b6257c8e-9b27-4889-9f19-3e301eccb855-7-1-c000.parquet, ...
Converting floating-point columns to float32
The median size 640883 B (< 50 MB) of the parquet files is too small. Total size: 1256216 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428034301-appid-app-20220428025739-0000-1fc4f83a-4281-4d37-8b87-24c9fd7b6f6b/part-00000-tid-7248952313364288857-b84c597c-c3c8-431d-a182-82336fb9f2b5-15-1-c000.parquet, ...
print(f"train: {len(converter_train)}, val: {len(converter_val)}")
train: 92, val: 8
7.MobileNetV2モデルを読み込みます
# First, load the model and inspect the structure of the model.
MobileNetV2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet').summary()
def get_model(lr=0.001):
# Create the base model from the pre-trained model MobileNet V2
base_model = MobileNetV2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet')
# Freeze parameters in the feature extraction layers
base_model.trainable = False
# Add a new classifier layer for transfer learning
global_average_layer = keras.layers.GlobalAveragePooling2D()
prediction_layer = keras.layers.Dense(num_classes)
model = keras.Sequential([
base_model,
global_average_layer,
prediction_layer
])
return model
def get_compiled_model(lr=0.001):
model = get_model(lr=lr)
model.compile(optimizer=keras.optimizers.SGD(lr=lr, momentum=0.9),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
return model
get_compiled_model().summary()
/local_disk0/.ephemeral_nfs/envs/pythonEnv-ddc618cd-8ee3-4cca-80e1-acab2cf64950/lib/python3.8/site-packages/keras/optimizer_v2/gradient_descent.py:102: UserWarning: The `lr` argument is deprecated, use `learning_rate` instead.
super(SGD, self).__init__(name, **kwargs)
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
mobilenetv2_1.00_224 (Funct (None, 7, 7, 1280) 2257984
ional)
global_average_pooling2d (G (None, 1280) 0
lobalAveragePooling2D)
dense (Dense) (None, 5) 6405
=================================================================
Total params: 2,264,389
Trainable params: 6,405
Non-trainable params: 2,257,984
_________________________________________________________________
8.ImageNet用に画像を処理します
def preprocess(content):
"""
Preprocess an image file bytes for MobileNetV2 (ImageNet).
"""
image = Image.open(io.BytesIO(content)).resize([224, 224])
image_array = keras.preprocessing.image.img_to_array(image)
return preprocess_input(image_array)
def transform_row(pd_batch):
"""
The input and output of this function are pandas dataframes.
"""
pd_batch['features'] = pd_batch['content'].map(lambda x: preprocess(x))
pd_batch['label_index'] = pd_batch['label'].map(lambda x: label_to_idx[x])
pd_batch = pd_batch.drop(labels=['content', 'label'], axis=1)
return pd_batch
# The output shape of the `TransformSpec` is not automatically known by petastorm,
# so you need to specify the shape for new columns in `edit_fields` and specify the order of
# the output columns in `selected_fields`.
transform_spec_fn = TransformSpec(
transform_row,
edit_fields=[('features', np.float32, IMG_SHAPE, False), ('label_index', np.int32, (), False)],
selected_fields=['features', 'label_index']
)
9.学習と評価を行います
def train_and_evaluate(lr=0.001):
model = get_compiled_model(lr)
with converter_train.make_tf_dataset(transform_spec=transform_spec_fn,
batch_size=BATCH_SIZE) as train_dataset, \
converter_val.make_tf_dataset(transform_spec=transform_spec_fn,
batch_size=BATCH_SIZE) as val_dataset:
# tf.keras only accept tuples, not namedtuples
train_dataset = train_dataset.map(lambda x: (x.features, x.label_index))
steps_per_epoch = len(converter_train) // BATCH_SIZE
val_dataset = val_dataset.map(lambda x: (x.features, x.label_index))
validation_steps = max(1, len(converter_val) // BATCH_SIZE)
print(f"steps_per_epoch: {steps_per_epoch}, validation_steps: {validation_steps}")
hist = model.fit(train_dataset,
steps_per_epoch=steps_per_epoch,
epochs=NUM_EPOCHS,
validation_data=val_dataset,
validation_steps=validation_steps,
verbose=2)
return hist.history['val_loss'][-1], hist.history['val_accuracy'][-1]
loss, accuracy = train_and_evaluate()
print("Validation Accuracy: {}".format(accuracy))
steps_per_epoch: 2, validation_steps: 1
Epoch 1/5
2/2 - 11s - loss: 1.9956 - accuracy: 0.1719 - val_loss: 2.0597 - val_accuracy: 0.1562 - 11s/epoch - 5s/step
Epoch 2/5
2/2 - 3s - loss: 1.9334 - accuracy: 0.1719 - val_loss: 2.1116 - val_accuracy: 0.1250 - 3s/epoch - 1s/step
Epoch 3/5
2/2 - 3s - loss: 1.7277 - accuracy: 0.2969 - val_loss: 2.0909 - val_accuracy: 0.1250 - 3s/epoch - 1s/step
Epoch 4/5
2/2 - 2s - loss: 1.5971 - accuracy: 0.2969 - val_loss: 2.0318 - val_accuracy: 0.1250 - 2s/epoch - 1s/step
Epoch 5/5
2/2 - 2s - loss: 1.6667 - accuracy: 0.2969 - val_loss: 1.9095 - val_accuracy: 0.1250 - 2s/epoch - 1s/step
Validation Accuracy: 0.125
10.ハイパーパラメータをチューニングします
def train_fn(lr):
import tensorflow as tf
from tensorflow import keras
loss, accuracy = train_and_evaluate()
return {'loss': loss, 'status': STATUS_OK}
search_space = hp.loguniform('lr', -10, -4)
argmin = fmin(
fn=train_fn,
space=search_space,
algo=tpe.suggest,
max_evals=2,
trials=SparkTrials(parallelism=2))
/databricks/spark/python/pyspark/rdd.py:980: FutureWarning: Deprecated in 3.1, Use pyspark.InheritableThread with the pinned thread mode enabled.
warnings.warn(
100%|██████████| 2/2 [00:31<00:00, 15.52s/trial, best loss: 1.800481915473938]
Total Trials: 2: 2 succeeded, 0 failed, 0 cancelled.
# See optimized hyperparameters
argmin
Out[12]: {'lr': 0.0024977970136746686}
11.HorovodRunnerでエラーが出ているので、割愛します
def train_and_evaluate_hvd(lr=0.001):
hvd.init() # Initialize Horovod.
# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
model = get_model(lr)
# Horovod: adjust learning rate based on number of GPUs.
optimizer = keras.optimizers.SGD(lr=lr * hvd.size(), momentum=0.9)
dist_optimizer = hvd.DistributedOptimizer(optimizer)
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
hvd.callbacks.MetricAverageCallback(),
]
# Set experimental_run_tf_function=False in TF 2.x
model.compile(optimizer=dist_optimizer,
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=["accuracy"],
experimental_run_tf_function=False)
with converter_train.make_tf_dataset(transform_spec=transform_spec_fn,
cur_shard=hvd.rank(), shard_count=hvd.size(),
batch_size=BATCH_SIZE) as train_dataset, \
converter_val.make_tf_dataset(transform_spec=transform_spec_fn,
cur_shard=hvd.rank(), shard_count=hvd.size(),
batch_size=BATCH_SIZE) as val_dataset:
# tf.keras only accept tuples, not namedtuples
train_dataset = train_dataset.map(lambda x: (x.features, x.label_index))
steps_per_epoch = len(converter_train) // (BATCH_SIZE * hvd.size())
val_dataset = val_dataset.map(lambda x: (x.features, x.label_index))
validation_steps = max(1, len(converter_val) // (BATCH_SIZE * hvd.size()))
hist = model.fit(train_dataset,
steps_per_epoch=steps_per_epoch,
epochs=NUM_EPOCHS,
validation_data=val_dataset,
validation_steps=validation_steps,
callbacks=callbacks,
verbose=2)
return hist.history['val_loss'][-1], hist.history['val_accuracy'][-1]
hr = HorovodRunner(np=2) # This assumes the cluster consists of two workers.
hr.run(train_and_evaluate_hvd)
Spark から PyTorch ノートブックへのデータ変換を簡略化する
1.こちらのノートブックをやっていきます
2.ライブラリをインストールします
%pip install petastorm
%pip install torch==1.5.0
%pip install torchvision==0.6.0
%pip install hyperopt
%pip install horovod
%pip install sparkdl
%pip install tensorflow
%pip install tensorframes
3.ライブラリをインポートします
from pyspark.sql.functions import col
from petastorm.spark import SparkDatasetConverter, make_spark_converter
import io
import numpy as np
import torch
import torchvision
from PIL import Image
from functools import partial
from petastorm import TransformSpec
from torchvision import transforms
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import horovod.torch as hvd
from sparkdl import HorovodRunner
エラー出た
/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:167: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
original_result = python_builtin_import(name, globals, locals, fromlist, level)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
from pyarrow import LocalFileSystem
Extension horovod.torch has not been built: /local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/horovod/torch/mpi_lib/_mpi_lib.cpython-38-x86_64-linux-gnu.so not found
If this is not expected, reinstall Horovod with HOROVOD_WITH_PYTORCH=1 to debug the build error.
Warning! MPI libs are missing, but python applications are still available.
ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/keras/applications/__init__.py)
4.HorovodRunnerでエラーが出るので、一旦無視させてください
from pyspark.sql.functions import col
from petastorm.spark import SparkDatasetConverter, make_spark_converter
import io
import numpy as np
import torch
import torchvision
from PIL import Image
from functools import partial
from petastorm import TransformSpec
from torchvision import transforms
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
# import horovod.torch as hvd
# from sparkdl import HorovodRunner
BATCH_SIZE = 32
NUM_EPOCHS = 5
5.flowers datasetを読み込みます
df = spark.read.format("delta").load("/databricks-datasets/flowers/delta") \
.select(col("content"), col("label"))
labels = df.select(col("label")).distinct().collect()
label_to_idx = {label: index for index, (label, ) in enumerate(sorted(labels))}
num_classes = len(label_to_idx)
df_train, df_val = df.limit(100).randomSplit([0.9, 0.1], seed=12345)
# Make sure the number of partitions is at least the number of workers which is required for distributed training.
df_train = df_train.repartition(2)
df_val = df_val.repartition(2)
6.Petastorm Spark converterを用いて、Spark DataFrameをキャッシュします
# Set a cache directory on DBFS FUSE for intermediate data.
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-9cd71ee0-02a4-4aa1-9d51-c404f4a73ae6/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
self._filesystem = pyarrow.localfs
Converting floating-point columns to float32
The median size 5843009 B (< 50 MB) of the parquet files is too small. Total size: 11231784 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428040238-appid-app-20220428025739-0000-74a50fe2-1e22-48fd-ac20-cd321bf563c1/part-00000-tid-4917360287480491282-13eb128f-db70-4f2c-99fc-144f79beeb8a-30-1-c000.parquet, ...
Converting floating-point columns to float32
The median size 640883 B (< 50 MB) of the parquet files is too small. Total size: 1256216 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428040246-appid-app-20220428025739-0000-30d0febe-56cd-4dee-a9c0-40239b04c2c3/part-00000-tid-3222136911280864081-12c37ba4-265e-4d47-b460-9e07ce45f6fd-38-1-c000.parquet, ...
print(f"train: {len(converter_train)}, val: {len(converter_val)}")
7.MobileNetV2モデルを読み込みます
# First, load the model and inspect the structure of the model.
torchvision.models.mobilenet_v2(pretrained=True)
def get_model(lr=0.001):
# Load a MobileNetV2 model from torchvision
model = torchvision.models.mobilenet_v2(pretrained=True)
# Freeze parameters in the feature extraction layers
for param in model.parameters():
param.requires_grad = False
# Add a new classifier layer for transfer learning
num_ftrs = model.classifier[1].in_features
# Parameters of newly constructed modules have requires_grad=True by default
model.classifier[1] = torch.nn.Linear(num_ftrs, num_classes)
return model
8.モデルの学習と評価の関数を定義します
def train_one_epoch(model, criterion, optimizer, scheduler,
train_dataloader_iter, steps_per_epoch, epoch,
device):
model.train() # Set model to training mode
# statistics
running_loss = 0.0
running_corrects = 0
# Iterate over the data for one epoch.
for step in range(steps_per_epoch):
pd_batch = next(train_dataloader_iter)
inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)
# Track history in training
with torch.set_grad_enabled(True):
# zero the parameter gradients
optimizer.zero_grad()
# forward
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# backward + optimize
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
scheduler.step()
epoch_loss = running_loss / (steps_per_epoch * BATCH_SIZE)
epoch_acc = running_corrects.double() / (steps_per_epoch * BATCH_SIZE)
print('Train Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
return epoch_loss, epoch_acc
def evaluate(model, criterion, val_dataloader_iter, validation_steps, device,
metric_agg_fn=None):
model.eval() # Set model to evaluate mode
# statistics
running_loss = 0.0
running_corrects = 0
# Iterate over all the validation data.
for step in range(validation_steps):
pd_batch = next(val_dataloader_iter)
inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)
# Do not track history in evaluation to save memory
with torch.set_grad_enabled(False):
# forward
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# statistics
running_loss += loss.item()
running_corrects += torch.sum(preds == labels.data)
# Average the losses across observations for each minibatch.
epoch_loss = running_loss / validation_steps
epoch_acc = running_corrects.double() / (validation_steps * BATCH_SIZE)
# metric_agg_fn is used in the distributed training to aggregate the metrics on all workers
if metric_agg_fn is not None:
epoch_loss = metric_agg_fn(epoch_loss, 'avg_loss')
epoch_acc = metric_agg_fn(epoch_acc, 'avg_acc')
print('Validation Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
return epoch_loss, epoch_acc
9.ImageNet用に画像を処理します
def transform_row(is_train, pd_batch):
"""
The input and output of this function must be pandas dataframes.
Do data augmentation for the training dataset only.
"""
transformers = [transforms.Lambda(lambda x: Image.open(io.BytesIO(x)))]
if is_train:
transformers.extend([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
])
else:
transformers.extend([
transforms.Resize(256),
transforms.CenterCrop(224),
])
transformers.extend([
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
trans = transforms.Compose(transformers)
pd_batch['features'] = pd_batch['content'].map(lambda x: trans(x).numpy())
pd_batch['label_index'] = pd_batch['label'].map(lambda x: label_to_idx[x])
pd_batch = pd_batch.drop(labels=['content', 'label'], axis=1)
return pd_batch
def get_transform_spec(is_train=True):
# The output shape of the `TransformSpec` is not automatically known by petastorm,
# so you need to specify the shape for new columns in `edit_fields` and specify the order of
# the output columns in `selected_fields`.
return TransformSpec(partial(transform_row, is_train),
edit_fields=[('features', np.float32, (3, 224, 224), False), ('label_index', np.int32, (), False)],
selected_fields=['features', 'label_index'])
10.学習と評価を行います
def train_and_evaluate(lr=0.001):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = get_model(lr=lr)
model = model.to(device)
criterion = torch.nn.CrossEntropyLoss()
# Only parameters of final layer are being optimized.
optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=lr, momentum=0.9)
# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(is_train=True),
batch_size=BATCH_SIZE) as train_dataloader, \
converter_val.make_torch_dataloader(transform_spec=get_transform_spec(is_train=False),
batch_size=BATCH_SIZE) as val_dataloader:
train_dataloader_iter = iter(train_dataloader)
steps_per_epoch = len(converter_train) // BATCH_SIZE
val_dataloader_iter = iter(val_dataloader)
validation_steps = max(1, len(converter_val) // BATCH_SIZE)
for epoch in range(NUM_EPOCHS):
print('Epoch {}/{}'.format(epoch + 1, NUM_EPOCHS))
print('-' * 10)
train_loss, train_acc = train_one_epoch(model, criterion, optimizer, exp_lr_scheduler,
train_dataloader_iter, steps_per_epoch, epoch,
device)
val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps, device)
return val_loss
loss = train_and_evaluate()
Epoch 1/5
----------
Train Loss: 1.9021 Acc: 0.0938
Validation Loss: 1.4643 Acc: 0.5000
Epoch 2/5
----------
Train Loss: 1.6339 Acc: 0.2031
Validation Loss: 1.5759 Acc: 0.2500
Epoch 3/5
----------
Train Loss: 1.5566 Acc: 0.2969
Validation Loss: 1.7568 Acc: 0.0000
Epoch 4/5
----------
Train Loss: 1.4249 Acc: 0.3906
Validation Loss: 2.0182 Acc: 0.0000
Epoch 5/5
----------
Train Loss: 1.3955 Acc: 0.4219
Validation Loss: 2.1157 Acc: 0.0000
11.ハイパーパラメータをチューニングします
def train_fn(lr):
loss = train_and_evaluate(lr)
return {'loss': loss, 'status': STATUS_OK}
search_space = hp.loguniform('lr', -10, -4)
argmin = fmin(
fn=train_fn,
space=search_space,
algo=tpe.suggest,
max_evals=2,
trials=SparkTrials(parallelism=2))
/databricks/spark/python/pyspark/rdd.py:980: FutureWarning: Deprecated in 3.1, Use pyspark.InheritableThread with the pinned thread mode enabled.
warnings.warn(
100%|██████████| 2/2 [00:28<00:00, 14.01s/trial, best loss: 1.7172338962554932]
Total Trials: 2: 2 succeeded, 0 failed, 0 cancelled.
# See optimized hyperparameters
argmin
12.HorovodRunnerでエラーが出ているので、割愛します
def metric_average(val, name):
tensor = torch.tensor(val)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
def train_and_evaluate_hvd(lr=0.001):
hvd.init() # Initialize Horovod.
# Horovod: pin GPU to local rank.
if torch.cuda.is_available():
torch.cuda.set_device(hvd.local_rank())
device = torch.cuda.current_device()
else:
device = torch.device("cpu")
model = get_model(lr=lr)
model = model.to(device)
criterion = torch.nn.CrossEntropyLoss()
# Effective batch size in synchronous distributed training is scaled by the number of workers.
# An increase in learning rate compensates for the increased batch size.
optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=lr * hvd.size(), momentum=0.9)
# Broadcast initial parameters so all workers start with the same parameters.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Wrap the optimizer with Horovod's DistributedOptimizer.
optimizer_hvd = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer_hvd, step_size=7, gamma=0.1)
with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(is_train=True),
cur_shard=hvd.rank(), shard_count=hvd.size(),
batch_size=BATCH_SIZE) as train_dataloader, \
converter_val.make_torch_dataloader(transform_spec=get_transform_spec(is_train=False),
cur_shard=hvd.rank(), shard_count=hvd.size(),
batch_size=BATCH_SIZE) as val_dataloader:
train_dataloader_iter = iter(train_dataloader)
steps_per_epoch = len(converter_train) // (BATCH_SIZE * hvd.size())
val_dataloader_iter = iter(val_dataloader)
validation_steps = max(1, len(converter_val) // (BATCH_SIZE * hvd.size()))
for epoch in range(NUM_EPOCHS):
print('Epoch {}/{}'.format(epoch + 1, NUM_EPOCHS))
print('-' * 10)
train_loss, train_acc = train_one_epoch(model, criterion, optimizer_hvd, exp_lr_scheduler,
train_dataloader_iter, steps_per_epoch, epoch,
device)
val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps,
device, metric_agg_fn=metric_average)
return val_loss
hr = HorovodRunner(np=2) # This assumes the cluster consists of two workers.
hr.run(train_and_evaluate_hvd)
Spark と Petastorm を使用してディープ ラーニング ノートブック用のデータを準備する
1.こちらのノートブックをやっていきます
2.ライブラリをインストールします
%pip install tensorflow
%pip install petastorm
3.ライブラリをインポートします
import os
import subprocess
import uuid
3.ディレクトリを作成します
# Set a unique working directory for this notebook.
work_dir = os.path.join("/ml/tmp/petastorm", str(uuid.uuid4()))
dbutils.fs.mkdirs(work_dir)
def get_local_path(dbfs_path):
return os.path.join("/dbfs", dbfs_path.lstrip("/"))
4.データセット(mnist)をダウンロードします
data_url = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2"
libsvm_path = os.path.join(work_dir, "mnist.bz2")
subprocess.check_output(["wget", data_url, "-O", get_local_path(libsvm_path)])
Out[13]: b''
5.データセットを読み込みます
df = spark.read.format("libsvm") \
.option("numFeatures", "784") \
.load(libsvm_path)
6.データを配列に格納します
%scala
import org.apache.spark.ml.linalg.Vector
val toArray = udf { v: Vector => v.toArray }
spark.udf.register("toArray", toArray)
import org.apache.spark.ml.linalg.Vector
toArray: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$10093/495988232@1dd62a2c,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),Some(class[value[0]: array<double>]),None,true,true)
res1: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$10093/495988232@1dd62a2c,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),Some(class[value[0]: array<double>]),Some(toArray),true,true)
7.Parquet形式へ変換します
# Convert sparse vectors to dense arrays and write the data as Parquet.
# Petastorm will sample Parquet row groups into batches.
# Batch size is important for the utilization of both I/O and compute.
# You can use parquet.block.size to control the size.
parquet_path = os.path.join(work_dir, "parquet")
df.selectExpr("toArray(features) AS features", "int(label) AS label") \
.repartition(10) \
.write.mode("overwrite") \
.option("parquet.block.size", 1024 * 1024) \
.parquet(parquet_path)
8.ライブラリをインポートします
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models, layers
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset
9.モデルを定義します
def get_model():
model = models.Sequential()
model.add(layers.Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10, activation='softmax'))
return model
10.parquet形式のデータのパスを定義します
import os
import pyarrow.parquet as pq
underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")]
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)
11.Parquet形式のデータをmake_batch_readerで読み取り、学習します
# We use make_batch_reader to load Parquet row groups into batches.
# HINT: Use cur_shard and shard_count params to shard data in distributed training.
petastorm_dataset_url = "file://" + get_local_path(parquet_path)
with make_batch_reader(petastorm_dataset_url, num_epochs=100) as reader:
dataset = make_petastorm_dataset(reader) \
.map(lambda x: (tf.reshape(x.features, [-1, 28, 28, 1]), tf.one_hot(x.label, 10)))
model = get_model()
optimizer = keras.optimizers.Adadelta()
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(dataset, steps_per_epoch=10, epochs=10)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-480ab404-1e1a-48b7-b616-46583213ddef/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
self._filesystem = pyarrow.localfs
Epoch 1/10
10/10 [==============================] - 3s 159ms/step - loss: 42.4297 - accuracy: 0.0952
Epoch 2/10
...
12.ディレクトリを削除します
# Clean up the working directory.
dbutils.fs.rm(work_dir, recurse=True)
13.ディレクトリが残っていた場合の確認
%fs ls /ml/tmp/petastorm
path,name,size,modificationTime
dbfs:/ml/tmp/petastorm/7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/,7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/,0,1650708482000
14.uuidを指定して削除します
dbutils.fs.rm("/ml/tmp/petastorm/7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/", recurse=True)
参考文献