1
0

More than 1 year has passed since last update.

Azure Databricksでpetastormやーる

Last updated at Posted at 2022-04-23

はじめに

petastormやっていきまーす

開発環境

10.4 LTS, Standard_DS3_v2
image.png

10.4 LTS ML, Standard_NC6s_v3(※クォータ制限の緩和申請が必要です)
image.png

Spark から TensorFlow へのデータ変換を簡略化する

1.こちらのノートブックをやっていきます

2.ライブラリをインストールします

%pip install petastorm
%pip install tensorflow
%pip install hyperopt
%pip install horovod
%pip install sparkdl
%pip install tensorframes

3.ライブラリをインポートします

from pyspark.sql.functions import col 
from petastorm.spark import SparkDatasetConverter, make_spark_converter
 
import io
import numpy as np
import tensorflow as tf
from PIL import Image
from petastorm import TransformSpec
from tensorflow import keras
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK 

import horovod.tensorflow.keras as hvd
from sparkdl import HorovodRunner

エラー出た

/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:167: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  original_result = python_builtin_import(name, globals, locals, fromlist, level)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-30dab267-e5ed-4c28-9b42-2c7adb85cdb4/lib/python3.8/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  from pyarrow import LocalFileSystem
ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-30dab267-e5ed-4c28-9b42-2c7adb85cdb4/lib/python3.8/site-packages/keras/applications/__init__.py)

4.HorovodRunnerでエラーが出るので、一旦無視させてください

from pyspark.sql.functions import col 
from petastorm.spark import SparkDatasetConverter, make_spark_converter
 
import io
import numpy as np
import tensorflow as tf
from PIL import Image
from petastorm import TransformSpec
from tensorflow import keras
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK 

# import horovod.tensorflow.keras as hvd
# from sparkdl import HorovodRunner
IMG_SHAPE = (224, 224, 3)
BATCH_SIZE = 32
NUM_EPOCHS = 5

5.flowers datasetを読み込みます

df = spark.read.format("delta").load("/databricks-datasets/flowers/delta") \
  .select(col("content"), col("label"))
  
labels = df.select(col("label")).distinct().collect()
label_to_idx = {label: index for index, (label, ) in enumerate(sorted(labels))}
num_classes = len(label_to_idx)
df_train, df_val = df.limit(100).randomSplit([0.9, 0.1], seed=12345)
 
# Make sure the number of partitions is at least the number of workers which is required for distributed training.
df_train = df_train.repartition(2)
df_val = df_val.repartition(2)

6.Petastorm Spark converterを用いて、Spark DataFrameをキャッシュします

# Set a cache directory on DBFS FUSE for intermediate data.
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
 
converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-ddc618cd-8ee3-4cca-80e1-acab2cf64950/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  self._filesystem = pyarrow.localfs
Converting floating-point columns to float32
The median size 5843009 B (< 50 MB) of the parquet files is too small. Total size: 11231784 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428034250-appid-app-20220428025739-0000-2c111a69-ebdf-4693-a4ff-6d86504c7e4e/part-00000-tid-8225040675238723687-b6257c8e-9b27-4889-9f19-3e301eccb855-7-1-c000.parquet, ...
Converting floating-point columns to float32
The median size 640883 B (< 50 MB) of the parquet files is too small. Total size: 1256216 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428034301-appid-app-20220428025739-0000-1fc4f83a-4281-4d37-8b87-24c9fd7b6f6b/part-00000-tid-7248952313364288857-b84c597c-c3c8-431d-a182-82336fb9f2b5-15-1-c000.parquet, ...
print(f"train: {len(converter_train)}, val: {len(converter_val)}")
train: 92, val: 8

7.MobileNetV2モデルを読み込みます

# First, load the model and inspect the structure of the model.
MobileNetV2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet').summary()

image.png

def get_model(lr=0.001):
 
  # Create the base model from the pre-trained model MobileNet V2
  base_model = MobileNetV2(input_shape=IMG_SHAPE, include_top=False, weights='imagenet')
  # Freeze parameters in the feature extraction layers
  base_model.trainable = False
  
  # Add a new classifier layer for transfer learning
  global_average_layer = keras.layers.GlobalAveragePooling2D()
  prediction_layer = keras.layers.Dense(num_classes)
  
  model = keras.Sequential([
    base_model,
    global_average_layer,
    prediction_layer
  ])
  return model
 
def get_compiled_model(lr=0.001):
  model = get_model(lr=lr)
  model.compile(optimizer=keras.optimizers.SGD(lr=lr, momentum=0.9),
                loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])
  return model
get_compiled_model().summary()
/local_disk0/.ephemeral_nfs/envs/pythonEnv-ddc618cd-8ee3-4cca-80e1-acab2cf64950/lib/python3.8/site-packages/keras/optimizer_v2/gradient_descent.py:102: UserWarning: The `lr` argument is deprecated, use `learning_rate` instead.
  super(SGD, self).__init__(name, **kwargs)
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 mobilenetv2_1.00_224 (Funct  (None, 7, 7, 1280)       2257984   
 ional)                                                          
                                                                 
 global_average_pooling2d (G  (None, 1280)             0         
 lobalAveragePooling2D)                                          
                                                                 
 dense (Dense)               (None, 5)                 6405      
                                                                 
=================================================================
Total params: 2,264,389
Trainable params: 6,405
Non-trainable params: 2,257,984
_________________________________________________________________

8.ImageNet用に画像を処理します

def preprocess(content):
  """
  Preprocess an image file bytes for MobileNetV2 (ImageNet).
  """
  image = Image.open(io.BytesIO(content)).resize([224, 224])
  image_array = keras.preprocessing.image.img_to_array(image)
  return preprocess_input(image_array)
 
def transform_row(pd_batch):
  """
  The input and output of this function are pandas dataframes.
  """
  pd_batch['features'] = pd_batch['content'].map(lambda x: preprocess(x))
  pd_batch['label_index'] = pd_batch['label'].map(lambda x: label_to_idx[x])
  pd_batch = pd_batch.drop(labels=['content', 'label'], axis=1)
  return pd_batch
 
# The output shape of the `TransformSpec` is not automatically known by petastorm, 
# so you need to specify the shape for new columns in `edit_fields` and specify the order of 
# the output columns in `selected_fields`.
transform_spec_fn = TransformSpec(
  transform_row, 
  edit_fields=[('features', np.float32, IMG_SHAPE, False), ('label_index', np.int32, (), False)], 
  selected_fields=['features', 'label_index']
)

9.学習と評価を行います

def train_and_evaluate(lr=0.001):
  model = get_compiled_model(lr)
  
  with converter_train.make_tf_dataset(transform_spec=transform_spec_fn, 
                                       batch_size=BATCH_SIZE) as train_dataset, \
       converter_val.make_tf_dataset(transform_spec=transform_spec_fn, 
                                     batch_size=BATCH_SIZE) as val_dataset:
    # tf.keras only accept tuples, not namedtuples
    train_dataset = train_dataset.map(lambda x: (x.features, x.label_index))
    steps_per_epoch = len(converter_train) // BATCH_SIZE
 
    val_dataset = val_dataset.map(lambda x: (x.features, x.label_index))
    validation_steps = max(1, len(converter_val) // BATCH_SIZE)
    
    print(f"steps_per_epoch: {steps_per_epoch}, validation_steps: {validation_steps}")
 
    hist = model.fit(train_dataset, 
                     steps_per_epoch=steps_per_epoch,
                     epochs=NUM_EPOCHS,
                     validation_data=val_dataset,
                     validation_steps=validation_steps,
                     verbose=2)
  return hist.history['val_loss'][-1], hist.history['val_accuracy'][-1] 
  
loss, accuracy = train_and_evaluate()
print("Validation Accuracy: {}".format(accuracy))
steps_per_epoch: 2, validation_steps: 1
Epoch 1/5
2/2 - 11s - loss: 1.9956 - accuracy: 0.1719 - val_loss: 2.0597 - val_accuracy: 0.1562 - 11s/epoch - 5s/step
Epoch 2/5
2/2 - 3s - loss: 1.9334 - accuracy: 0.1719 - val_loss: 2.1116 - val_accuracy: 0.1250 - 3s/epoch - 1s/step
Epoch 3/5
2/2 - 3s - loss: 1.7277 - accuracy: 0.2969 - val_loss: 2.0909 - val_accuracy: 0.1250 - 3s/epoch - 1s/step
Epoch 4/5
2/2 - 2s - loss: 1.5971 - accuracy: 0.2969 - val_loss: 2.0318 - val_accuracy: 0.1250 - 2s/epoch - 1s/step
Epoch 5/5
2/2 - 2s - loss: 1.6667 - accuracy: 0.2969 - val_loss: 1.9095 - val_accuracy: 0.1250 - 2s/epoch - 1s/step
Validation Accuracy: 0.125

10.ハイパーパラメータをチューニングします

def train_fn(lr):
  import tensorflow as tf
  from tensorflow import keras
  loss, accuracy = train_and_evaluate()
  return {'loss': loss, 'status': STATUS_OK}
 
search_space = hp.loguniform('lr', -10, -4)
 
argmin = fmin(
  fn=train_fn,
  space=search_space,
  algo=tpe.suggest,
  max_evals=2,
  trials=SparkTrials(parallelism=2))
/databricks/spark/python/pyspark/rdd.py:980: FutureWarning: Deprecated in 3.1, Use pyspark.InheritableThread with the pinned thread mode enabled.
  warnings.warn(

100%|██████████| 2/2 [00:31<00:00, 15.52s/trial, best loss: 1.800481915473938]
Total Trials: 2: 2 succeeded, 0 failed, 0 cancelled.
# See optimized hyperparameters
argmin
Out[12]: {'lr': 0.0024977970136746686}

11.HorovodRunnerでエラーが出ているので、割愛します

def train_and_evaluate_hvd(lr=0.001):
  
  hvd.init()  # Initialize Horovod.
  
  # Horovod: pin GPU to be used to process local rank (one GPU per process)
  gpus = tf.config.experimental.list_physical_devices('GPU')
  for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
  if gpus:
      tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
 
  model = get_model(lr)
  
  # Horovod: adjust learning rate based on number of GPUs.
  optimizer = keras.optimizers.SGD(lr=lr * hvd.size(), momentum=0.9)
  dist_optimizer = hvd.DistributedOptimizer(optimizer)
  
  callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    hvd.callbacks.MetricAverageCallback(),
  ]
  
  # Set experimental_run_tf_function=False in TF 2.x
  model.compile(optimizer=dist_optimizer, 
                loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
                metrics=["accuracy"],
                experimental_run_tf_function=False)
    
  with converter_train.make_tf_dataset(transform_spec=transform_spec_fn, 
                                       cur_shard=hvd.rank(), shard_count=hvd.size(),
                                       batch_size=BATCH_SIZE) as train_dataset, \
       converter_val.make_tf_dataset(transform_spec=transform_spec_fn, 
                                     cur_shard=hvd.rank(), shard_count=hvd.size(),
                                     batch_size=BATCH_SIZE) as val_dataset:
    # tf.keras only accept tuples, not namedtuples
    train_dataset = train_dataset.map(lambda x: (x.features, x.label_index))
    steps_per_epoch = len(converter_train) // (BATCH_SIZE * hvd.size())
 
    val_dataset = val_dataset.map(lambda x: (x.features, x.label_index))
    validation_steps = max(1, len(converter_val) // (BATCH_SIZE * hvd.size()))
    
    hist = model.fit(train_dataset, 
                     steps_per_epoch=steps_per_epoch,
                     epochs=NUM_EPOCHS,
                     validation_data=val_dataset,
                     validation_steps=validation_steps,
                     callbacks=callbacks,
                     verbose=2)
 
  return hist.history['val_loss'][-1], hist.history['val_accuracy'][-1]
hr = HorovodRunner(np=2)  # This assumes the cluster consists of two workers.
hr.run(train_and_evaluate_hvd)

Spark から PyTorch ノートブックへのデータ変換を簡略化する

1.こちらのノートブックをやっていきます

2.ライブラリをインストールします

%pip install petastorm
%pip install torch==1.5.0
%pip install torchvision==0.6.0
%pip install hyperopt
%pip install horovod
%pip install sparkdl
%pip install tensorflow
%pip install tensorframes

3.ライブラリをインポートします

from pyspark.sql.functions import col
 
from petastorm.spark import SparkDatasetConverter, make_spark_converter
 
import io
import numpy as np
import torch
import torchvision
from PIL import Image
from functools import partial 
from petastorm import TransformSpec
from torchvision import transforms
 
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
 
import horovod.torch as hvd
from sparkdl import HorovodRunner

エラー出た

/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:167: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  original_result = python_builtin_import(name, globals, locals, fromlist, level)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/petastorm/spark/spark_dataset_converter.py:28: FutureWarning: pyarrow.LocalFileSystem is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  from pyarrow import LocalFileSystem
Extension horovod.torch has not been built: /local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/horovod/torch/mpi_lib/_mpi_lib.cpython-38-x86_64-linux-gnu.so not found
If this is not expected, reinstall Horovod with HOROVOD_WITH_PYTORCH=1 to debug the build error.
Warning! MPI libs are missing, but python applications are still available.
ImportError: cannot import name 'resnet50' from 'keras.applications' (/local_disk0/.ephemeral_nfs/envs/pythonEnv-ba09db16-2f6f-4866-949c-4d5aa9fbed19/lib/python3.8/site-packages/keras/applications/__init__.py)

4.HorovodRunnerでエラーが出るので、一旦無視させてください

from pyspark.sql.functions import col
 
from petastorm.spark import SparkDatasetConverter, make_spark_converter
 
import io
import numpy as np
import torch
import torchvision
from PIL import Image
from functools import partial 
from petastorm import TransformSpec
from torchvision import transforms
 
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
 
# import horovod.torch as hvd
# from sparkdl import HorovodRunner
BATCH_SIZE = 32
NUM_EPOCHS = 5

5.flowers datasetを読み込みます

df = spark.read.format("delta").load("/databricks-datasets/flowers/delta") \
  .select(col("content"), col("label"))
  
labels = df.select(col("label")).distinct().collect()
label_to_idx = {label: index for index, (label, ) in enumerate(sorted(labels))}
num_classes = len(label_to_idx)
df_train, df_val = df.limit(100).randomSplit([0.9, 0.1], seed=12345)
 
# Make sure the number of partitions is at least the number of workers which is required for distributed training.
df_train = df_train.repartition(2)
df_val = df_val.repartition(2)

6.Petastorm Spark converterを用いて、Spark DataFrameをキャッシュします

# Set a cache directory on DBFS FUSE for intermediate data.
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
 
converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-9cd71ee0-02a4-4aa1-9d51-c404f4a73ae6/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  self._filesystem = pyarrow.localfs
Converting floating-point columns to float32
The median size 5843009 B (< 50 MB) of the parquet files is too small. Total size: 11231784 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428040238-appid-app-20220428025739-0000-74a50fe2-1e22-48fd-ac20-cd321bf563c1/part-00000-tid-4917360287480491282-13eb128f-db70-4f2c-99fc-144f79beeb8a-30-1-c000.parquet, ...
Converting floating-point columns to float32
The median size 640883 B (< 50 MB) of the parquet files is too small. Total size: 1256216 B. Increase the median file size by calling df.repartition(n) or df.coalesce(n), which might help improve the performance. Parquet files: file:/dbfs/tmp/petastorm/cache/20220428040246-appid-app-20220428025739-0000-30d0febe-56cd-4dee-a9c0-40239b04c2c3/part-00000-tid-3222136911280864081-12c37ba4-265e-4d47-b460-9e07ce45f6fd-38-1-c000.parquet, ...
print(f"train: {len(converter_train)}, val: {len(converter_val)}")

7.MobileNetV2モデルを読み込みます

# First, load the model and inspect the structure of the model.
torchvision.models.mobilenet_v2(pretrained=True)

image.png

def get_model(lr=0.001):
  # Load a MobileNetV2 model from torchvision
  model = torchvision.models.mobilenet_v2(pretrained=True)
  # Freeze parameters in the feature extraction layers
  for param in model.parameters():
    param.requires_grad = False
    
  # Add a new classifier layer for transfer learning
  num_ftrs = model.classifier[1].in_features
  # Parameters of newly constructed modules have requires_grad=True by default
  model.classifier[1] = torch.nn.Linear(num_ftrs, num_classes)
  
  return model

8.モデルの学習と評価の関数を定義します

def train_one_epoch(model, criterion, optimizer, scheduler, 
                    train_dataloader_iter, steps_per_epoch, epoch, 
                    device):
  model.train()  # Set model to training mode
 
  # statistics
  running_loss = 0.0
  running_corrects = 0
 
  # Iterate over the data for one epoch.
  for step in range(steps_per_epoch):
    pd_batch = next(train_dataloader_iter)
    inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)
    
    # Track history in training
    with torch.set_grad_enabled(True):
      # zero the parameter gradients
      optimizer.zero_grad()
 
      # forward
      outputs = model(inputs)
      _, preds = torch.max(outputs, 1)
      loss = criterion(outputs, labels)
 
      # backward + optimize
      loss.backward()
      optimizer.step()
 
    # statistics
    running_loss += loss.item() * inputs.size(0)
    running_corrects += torch.sum(preds == labels.data)
  
  scheduler.step()
 
  epoch_loss = running_loss / (steps_per_epoch * BATCH_SIZE)
  epoch_acc = running_corrects.double() / (steps_per_epoch * BATCH_SIZE)
 
  print('Train Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
  return epoch_loss, epoch_acc
 
def evaluate(model, criterion, val_dataloader_iter, validation_steps, device, 
             metric_agg_fn=None):
  model.eval()  # Set model to evaluate mode
 
  # statistics
  running_loss = 0.0
  running_corrects = 0
 
  # Iterate over all the validation data.
  for step in range(validation_steps):
    pd_batch = next(val_dataloader_iter)
    inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)
 
    # Do not track history in evaluation to save memory
    with torch.set_grad_enabled(False):
      # forward
      outputs = model(inputs)
      _, preds = torch.max(outputs, 1)
      loss = criterion(outputs, labels)
 
    # statistics
    running_loss += loss.item()
    running_corrects += torch.sum(preds == labels.data)
  
  # Average the losses across observations for each minibatch.
  epoch_loss = running_loss / validation_steps
  epoch_acc = running_corrects.double() / (validation_steps * BATCH_SIZE)
  
  # metric_agg_fn is used in the distributed training to aggregate the metrics on all workers
  if metric_agg_fn is not None:
    epoch_loss = metric_agg_fn(epoch_loss, 'avg_loss')
    epoch_acc = metric_agg_fn(epoch_acc, 'avg_acc')
 
  print('Validation Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
  return epoch_loss, epoch_acc

9.ImageNet用に画像を処理します

def transform_row(is_train, pd_batch):
  """
  The input and output of this function must be pandas dataframes.
  Do data augmentation for the training dataset only.
  """
  transformers = [transforms.Lambda(lambda x: Image.open(io.BytesIO(x)))]
  if is_train:
    transformers.extend([
      transforms.RandomResizedCrop(224),
      transforms.RandomHorizontalFlip(),
    ])
  else:
    transformers.extend([
      transforms.Resize(256),
      transforms.CenterCrop(224),
    ])
  transformers.extend([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
  ])
  
  trans = transforms.Compose(transformers)
  
  pd_batch['features'] = pd_batch['content'].map(lambda x: trans(x).numpy())
  pd_batch['label_index'] = pd_batch['label'].map(lambda x: label_to_idx[x])
  pd_batch = pd_batch.drop(labels=['content', 'label'], axis=1)
  return pd_batch
 
def get_transform_spec(is_train=True):
  # The output shape of the `TransformSpec` is not automatically known by petastorm, 
  # so you need to specify the shape for new columns in `edit_fields` and specify the order of 
  # the output columns in `selected_fields`.
  return TransformSpec(partial(transform_row, is_train), 
                       edit_fields=[('features', np.float32, (3, 224, 224), False), ('label_index', np.int32, (), False)], 
                       selected_fields=['features', 'label_index'])

10.学習と評価を行います

def train_and_evaluate(lr=0.001):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  
  model = get_model(lr=lr)
  model = model.to(device)
 
  criterion = torch.nn.CrossEntropyLoss()
 
  # Only parameters of final layer are being optimized.
  optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=lr, momentum=0.9)
 
  # Decay LR by a factor of 0.1 every 7 epochs
  exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
  
  with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(is_train=True), 
                                             batch_size=BATCH_SIZE) as train_dataloader, \
       converter_val.make_torch_dataloader(transform_spec=get_transform_spec(is_train=False), 
                                           batch_size=BATCH_SIZE) as val_dataloader:
    
    train_dataloader_iter = iter(train_dataloader)
    steps_per_epoch = len(converter_train) // BATCH_SIZE
    
    val_dataloader_iter = iter(val_dataloader)
    validation_steps = max(1, len(converter_val) // BATCH_SIZE)
    
    for epoch in range(NUM_EPOCHS):
      print('Epoch {}/{}'.format(epoch + 1, NUM_EPOCHS))
      print('-' * 10)
 
      train_loss, train_acc = train_one_epoch(model, criterion, optimizer, exp_lr_scheduler, 
                                              train_dataloader_iter, steps_per_epoch, epoch, 
                                              device)
      val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps, device)
 
  return val_loss
  
loss = train_and_evaluate()
Epoch 1/5
----------
Train Loss: 1.9021 Acc: 0.0938
Validation Loss: 1.4643 Acc: 0.5000
Epoch 2/5
----------
Train Loss: 1.6339 Acc: 0.2031
Validation Loss: 1.5759 Acc: 0.2500
Epoch 3/5
----------
Train Loss: 1.5566 Acc: 0.2969
Validation Loss: 1.7568 Acc: 0.0000
Epoch 4/5
----------
Train Loss: 1.4249 Acc: 0.3906
Validation Loss: 2.0182 Acc: 0.0000
Epoch 5/5
----------
Train Loss: 1.3955 Acc: 0.4219
Validation Loss: 2.1157 Acc: 0.0000

11.ハイパーパラメータをチューニングします

def train_fn(lr):
  loss = train_and_evaluate(lr)
  return {'loss': loss, 'status': STATUS_OK}
 
search_space = hp.loguniform('lr', -10, -4)
 
argmin = fmin(
  fn=train_fn,
  space=search_space,
  algo=tpe.suggest,
  max_evals=2,
  trials=SparkTrials(parallelism=2))
/databricks/spark/python/pyspark/rdd.py:980: FutureWarning: Deprecated in 3.1, Use pyspark.InheritableThread with the pinned thread mode enabled.
  warnings.warn(

100%|██████████| 2/2 [00:28<00:00, 14.01s/trial, best loss: 1.7172338962554932]
Total Trials: 2: 2 succeeded, 0 failed, 0 cancelled.
# See optimized hyperparameters
argmin

12.HorovodRunnerでエラーが出ているので、割愛します

def metric_average(val, name):
  tensor = torch.tensor(val)
  avg_tensor = hvd.allreduce(tensor, name=name)
  return avg_tensor.item()
 
def train_and_evaluate_hvd(lr=0.001):
  hvd.init()  # Initialize Horovod.
  
  # Horovod: pin GPU to local rank.
  if torch.cuda.is_available():
    torch.cuda.set_device(hvd.local_rank())
    device = torch.cuda.current_device()
  else:
    device = torch.device("cpu")
  
  model = get_model(lr=lr)
  model = model.to(device)
 
  criterion = torch.nn.CrossEntropyLoss()
  
  # Effective batch size in synchronous distributed training is scaled by the number of workers.
  # An increase in learning rate compensates for the increased batch size.
  optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=lr * hvd.size(), momentum=0.9)
  
  # Broadcast initial parameters so all workers start with the same parameters.
  hvd.broadcast_parameters(model.state_dict(), root_rank=0)
  hvd.broadcast_optimizer_state(optimizer, root_rank=0)
  
  # Wrap the optimizer with Horovod's DistributedOptimizer.
  optimizer_hvd = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
 
  exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer_hvd, step_size=7, gamma=0.1)
 
  with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(is_train=True), 
                                             cur_shard=hvd.rank(), shard_count=hvd.size(),
                                             batch_size=BATCH_SIZE) as train_dataloader, \
       converter_val.make_torch_dataloader(transform_spec=get_transform_spec(is_train=False),
                                           cur_shard=hvd.rank(), shard_count=hvd.size(),
                                           batch_size=BATCH_SIZE) as val_dataloader:
    
    train_dataloader_iter = iter(train_dataloader)
    steps_per_epoch = len(converter_train) // (BATCH_SIZE * hvd.size())
    
    val_dataloader_iter = iter(val_dataloader)
    validation_steps = max(1, len(converter_val) // (BATCH_SIZE * hvd.size()))
    
    for epoch in range(NUM_EPOCHS):
      print('Epoch {}/{}'.format(epoch + 1, NUM_EPOCHS))
      print('-' * 10)
 
      train_loss, train_acc = train_one_epoch(model, criterion, optimizer_hvd, exp_lr_scheduler, 
                                              train_dataloader_iter, steps_per_epoch, epoch, 
                                              device)
      val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps,
                                   device, metric_agg_fn=metric_average)
 
  return val_loss
hr = HorovodRunner(np=2)   # This assumes the cluster consists of two workers.
hr.run(train_and_evaluate_hvd)

Spark と Petastorm を使用してディープ ラーニング ノートブック用のデータを準備する

1.こちらのノートブックをやっていきます

2.ライブラリをインストールします

%pip install tensorflow
%pip install petastorm

3.ライブラリをインポートします

import os
import subprocess
import uuid

3.ディレクトリを作成します

# Set a unique working directory for this notebook.
work_dir = os.path.join("/ml/tmp/petastorm", str(uuid.uuid4()))
dbutils.fs.mkdirs(work_dir)
 
def get_local_path(dbfs_path):
  return os.path.join("/dbfs", dbfs_path.lstrip("/"))

4.データセット(mnist)をダウンロードします

data_url = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2"
libsvm_path = os.path.join(work_dir, "mnist.bz2")
subprocess.check_output(["wget", data_url, "-O", get_local_path(libsvm_path)])
Out[13]: b''

5.データセットを読み込みます

df = spark.read.format("libsvm") \
  .option("numFeatures", "784") \
  .load(libsvm_path)

6.データを配列に格納します

%scala
 
import org.apache.spark.ml.linalg.Vector
 
val toArray = udf { v: Vector => v.toArray }
spark.udf.register("toArray", toArray)
import org.apache.spark.ml.linalg.Vector
toArray: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$10093/495988232@1dd62a2c,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),Some(class[value[0]: array<double>]),None,true,true)
res1: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$10093/495988232@1dd62a2c,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),Some(class[value[0]: array<double>]),Some(toArray),true,true)

7.Parquet形式へ変換します

# Convert sparse vectors to dense arrays and write the data as Parquet.
# Petastorm will sample Parquet row groups into batches.
# Batch size is important for the utilization of both I/O and compute.
# You can use parquet.block.size to control the size.
parquet_path = os.path.join(work_dir, "parquet")
df.selectExpr("toArray(features) AS features", "int(label) AS label") \
  .repartition(10) \
  .write.mode("overwrite") \
  .option("parquet.block.size", 1024 * 1024) \
  .parquet(parquet_path)

8.ライブラリをインポートします

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models, layers
 
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset

9.モデルを定義します

def get_model():
  model = models.Sequential()
  model.add(layers.Conv2D(32, kernel_size=(3, 3),
                          activation='relu',
                          input_shape=(28, 28, 1)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D(pool_size=(2, 2)))
  model.add(layers.Dropout(0.25))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dropout(0.5))
  model.add(layers.Dense(10, activation='softmax'))
  return model

10.parquet形式のデータのパスを定義します

import os
import pyarrow.parquet as pq

underscore_files = [f for f in os.listdir(get_local_path(parquet_path)) if f.startswith("_")]
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)

11.Parquet形式のデータをmake_batch_readerで読み取り、学習します

# We use make_batch_reader to load Parquet row groups into batches.
# HINT: Use cur_shard and shard_count params to shard data in distributed training.
petastorm_dataset_url = "file://" + get_local_path(parquet_path)
with make_batch_reader(petastorm_dataset_url, num_epochs=100) as reader:
  dataset = make_petastorm_dataset(reader) \
    .map(lambda x: (tf.reshape(x.features, [-1, 28, 28, 1]), tf.one_hot(x.label, 10)))
  model = get_model()
  optimizer = keras.optimizers.Adadelta()
  model.compile(optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy'])
  model.fit(dataset, steps_per_epoch=10, epochs=10)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-480ab404-1e1a-48b7-b616-46583213ddef/lib/python3.8/site-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
  self._filesystem = pyarrow.localfs
Epoch 1/10
10/10 [==============================] - 3s 159ms/step - loss: 42.4297 - accuracy: 0.0952
Epoch 2/10
...

12.ディレクトリを削除します

# Clean up the working directory.
dbutils.fs.rm(work_dir, recurse=True)

13.ディレクトリが残っていた場合の確認

%fs ls /ml/tmp/petastorm
path,name,size,modificationTime
dbfs:/ml/tmp/petastorm/7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/,7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/,0,1650708482000

14.uuidを指定して削除します

dbutils.fs.rm("/ml/tmp/petastorm/7f3d8aa0-0ecf-4b75-b649-f13a8259f2bd/", recurse=True)

参考文献

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0