2
2

More than 1 year has passed since last update.

SageMaker Studio上でGluonTSを使った時系列予測の解説(前半)

Last updated at Posted at 2022-02-02

今回はAmazon SageMaker Studio上で、GluonTSというPythonの時系列予測用ライブラリにある複数のアルゴリズムを使い、比較検証する方法について紹介したいと思います。
AWSが用意しているチュートリアルについて自分なりに噛み砕きたいので、コードの中身を一部アレンジして記事にしてみました。以下のGitがチュートリアルになります。

GluonTSとは

AmazonがリリースしたPythonライブラリで、深層学習をベースにした時系列モデルを構築、評価できます。

GluonTSの特徴は以下の4つになります。

・新規モデル構築のための高レベルなコンポーネント。Sequence to Sequence モデルなどの一般的なニューラルネットワーク構造、モデリングや変動確率分布のためのコンポーネントを含みます
・時系列データのためのデータ取り込みと反復処理。モデル適用前のデータを変換する機構を含みます
・複数の先進的ニューラル予測モデルの実装参考例
・予測モデルの評価および比較を行うツール

また、GlueonTSでは数えた限り24個の様々な時系列予測モデルが用意されています。

GluonTSの予測モデル一覧

今回はDeepAR、SFeedFwd、LSTNet、Seq2Seqの4つのアルゴリズムを使用するので、それぞれのアルゴリズムについて簡単に説明します。

DeepAR

再帰型ニューラルネットワーク (RNN) を使用してスカラー (1 次元) 時系列を予測するための、教師あり学習アルゴリズム

SFeedFwd(Simple Feed Foward)

前のタイムステップが与えられたとき、次のターゲットタイムステップを予測するというとても単純なMLP(多層パーセプトロン)モデル

LSTNet(Long and Short term Time-series Network)

CNNとRNNを用いて、短期的な変数間の局所依存パターンを抽出し、時系列トレンドの長期的パターンを発見することができる多変量時系列予測アルゴリズム

Seq2Seq(Sequence To Sequence)

こちらもRNNを利用したアルゴリズムで、ある系列を受け取り(エンコーダー)、別の系列を返す(デコーダー)という構成のモデル

これらのアルゴリズムの理解は重要ですが、理解する前にひとまず手を動かしてみましょう。

ハンズオン

ここからは、SageMaker Studio上でAWSで用意されているサンプルコードを一つずつ実行していきます。その際に、わからない部分については随時補足していきたいと思います。サンプルデータは、2014年の1月~11月末までのとある各家庭3世帯の1時間ごとの電力使用量になります。

前半のハンズオンの流れ

チュートリアルの数字に合わせています。

0.環境構築
1.データの準備
2.アルゴリズムとハイパーパラメータ行列の作成
3.GluonTSのトレーニングスクリプトのセットアップ
4.SageMaker Experimentのセットアップ

0. 環境構築

AWSのSageMaker Studioを開き、ターミナル上で以下を実行してください。なお、Kernelは「Python3 (Data Science)」で良いです。

git clone https://github.com/aws-samples/amazon-sagemaker-gluonts-timeseriesforecasting-with-debuggerandexperiments

すると、「amazon-sagemaker-gluonts-timeseriesforecasting-with-debuggerandexperiments」というフォルダが作られるので、その中にある「Amazon SageMaker GluonTS time series forecasting.ipynb」を開いてJupyter Labを起動しましょう。

次に、以下のコードを実行して必要なパッケージをインストールしてください。

! pip install gluonts
! pip install --upgrade sagemaker
! pip install sagemaker-experiments
! pip install --upgrade smdebug-rulesconfig

1. データの準備

PandasのDataframeを使って、電力需要データをデータフレーム化します。

import pandas as pd
url = "https://raw.githubusercontent.com/aws-samples/amazon-forecast-samples/master/notebooks/common/data/item-demand-time.csv"
raw_df = pd.read_csv(url, header=None, names=["date", "usage", "client"])
raw_df.to_csv("item-demand-time.csv")
raw_df

image.png
Clientは12,10,111の3世帯です。

次に、トレーニングデータ、テストデータをS3に保存します。ここでは、SageMakerで使っているデフォルトのS3バケットを利用します。

import boto3
import sagemaker 

s3_client = boto3.client('s3')
s3res = boto3.resource('s3')

sess = sagemaker.Session()
bucket = sess.default_bucket()

pref = 'electricity-forecast-experiment/gluonts'
s3_train_channel = "s3://" + bucket + "/" + pref + "/train.csv"
s3_test_channel = "s3://" + bucket + "/" + pref + "/test.csv"
print(s3_train_channel)
print(s3_test_channel)

最後に、10/31までのトレーニングデータと11月のみのテストデータに分けて、s3に保存します。

df_train = raw_df.query('date <= "2014-10-31 23:00:00"').copy()
df_train.to_csv("train.csv")
s3_client.upload_file("train.csv", bucket, pref+"/train.csv")

df_test = raw_df.query('date >= "2014-11-01 00:00:00"').copy()
df_test.to_csv("test.csv")
s3_client.upload_file("test.csv", bucket, pref+"/test.csv")

2. アルゴリズムとハイパーパラメータ行列の作成

次に、使用するアルゴリズムとそのパラメータの組み合わせをDataFrameで定義します。
・epochs:学習回数
・num_batches_per_epoch:一回の学習に対してどれだけのデータを用いるか
今回は24時間周期で見ていきます。
・leraning_rate:学習率、重みづけの係数
値が大きいほど過学習を起こしやすい。1e-2と1e-4を採用
・hybridize:命令型プログラミング(python)でコーディングし、実行はシンボリックプログラミングという抽象的で効率的なものに変換
・prediction_length:今回は24h*7 = 168h = 1週間の予測を行う

import pandas as pd
d = {'epochs': [25,25,50,50], 'algo': ["DeepAR", "seq2seq","SFeedFwd","lstnet"], 'num_batches_per_epoch': [24, 24, 24, 24], 'learning_rate':[1e-2,1e-2,1e-4,1e-4], 'hybridize':[True, True,True,True]}
df_hps = pd.DataFrame(data=d)
df_hps['prediction_length'] = [168, 168, 168, 168]

その後、product関数によって定義したデータフレームの全ての組み合わせを生成し、各行がトレーニングに使う設定に対応します。
今回は16通りの設定が生成されます。

from itertools import product

prod = product(df_hps['epochs'].unique(), df_hps['algo'].unique(), df_hps['num_batches_per_epoch'].unique(), df_hps['learning_rate'].unique(), df_hps['hybridize'].unique(), df_hps['prediction_length'].unique())

df_hps_combo = pd.DataFrame([list(p) for p in prod],
                   columns=list(['epochs', 'algo', 'num_batches_per_epoch', 'learning_rate', 'hybridize', 'prediction_length']))

df_hps_combo['jobnumber'] = df_hps_combo.index
df_hps_combo

image.png

3. GluonTSのトレーニングスクリプトのセットアップ

Pythonのエントリースクリプトを使用して、必要なGluonTSライブラリをインポートし、使いたいアルゴリズムのパッケージを使用してgluonts estimatorを設定します。ここで注意が必要なので、チュートリアルに書かれているモジュールをインポートするコードにミスが存在します。こういったスクリプトのミスがあると、後にトレーニングを実行するとAmazon SageMakerのコンソールサイドバーにある「処理中」、「ジョブの処理」を見るとジョブが「failed」になります。どの部分でエラーが発生しているのか、詳細を見たい場合は各ジョブの詳細に飛び、「ログを表示」からCloudWatch Logsを見ればわかります。

私が実行した時に間違っていた部分は「from gluonts.mx.trainer import Trainer」で、チュートリアルでは「from gluonts.trainer import Trainer」になっていました。

blog_train_algos.py
import os
os.system('pip install pandas')
os.system('pip install gluonts')
import pandas as pd
import pathlib
import gluonts
import numpy as np
import argparse
import json
import boto3
from mxnet import gpu, cpu
from mxnet.context import num_gpus
from gluonts.dataset.util import to_pandas
from gluonts.model.deepar import DeepAREstimator
from gluonts.model.simple_feedforward import SimpleFeedForwardEstimator
from gluonts.model.lstnet import LSTNetEstimator
from gluonts.model.seq2seq import MQCNNEstimator
from gluonts.model.transformer import TransformerEstimator
from gluonts.evaluation.backtest import make_evaluation_predictions, backtest_metrics
from gluonts.evaluation import Evaluator
from gluonts.model.predictor import Predictor
from gluonts.dataset.common import ListDataset
from gluonts.mx.trainer import Trainer#←ここ
from gluonts.dataset.multivariate_grouper import MultivariateGrouper
from smdebug.mxnet import Hook

s3 = boto3.client("s3")

def uploadDirectory(model_dir,prefix,bucket):
    for root,dirs,files in os.walk(model_dir):
        for file in files:
            print(os.path.join(root,file))
            print(prefix+file)
            s3.upload_file(os.path.join(root,file),bucket,prefix+file)




def train(bucket, seq, algo, freq, prediction_length, epochs, learning_rate, hybridize, num_batches_per_epoch):

    #create train dataset
    df = pd.read_csv(filepath_or_buffer=os.environ['SM_CHANNEL_TRAIN'] + "/train.csv", header=0, index_col=0)

    training_data = ListDataset([{"start": df.index[0], 
                                  "target": df.usage[:],
                                 "item_id": df.client[:]}], 
                                  freq=freq)


    #create test dataset
    df = pd.read_csv(filepath_or_buffer=os.environ['SM_CHANNEL_TEST'] + "/test.csv", header=0, index_col=0)

    test_data = ListDataset([{"start": df.index[0], 
                              "target": df.usage[:],
                              "item_id": 'client_12'}], 
                              freq=freq)

    hook = Hook.create_from_json_file()
    #determine estimators##################################
    if algo == "DeepAR":
        estimator = DeepAREstimator(freq=freq,  
                                       prediction_length=prediction_length,
                                       context_length=1,
                                       trainer=Trainer(ctx="cpu",
                                            epochs=epochs,
                                            learning_rate=learning_rate,
                                            hybridize=hybridize,
                                            num_batches_per_epoch=num_batches_per_epoch
                                         ))

        #train the model
        predictor = estimator.train(training_data=training_data)
        print("DeepAR training is complete SUCCESS")
    elif algo == "SFeedFwd":
        estimator = SimpleFeedForwardEstimator(freq=freq,  
                                       prediction_length=prediction_length,
                                       trainer=Trainer(ctx="cpu",
                                            epochs=epochs,
                                            learning_rate=learning_rate,
                                            hybridize=hybridize,
                                            num_batches_per_epoch=num_batches_per_epoch
                                         ))

        #train the model
        predictor = estimator.train(training_data=training_data)
        print("training is complete SUCCESS")
    elif algo == "lstnet":
        # Needed for LSTNet ONLY
        grouper = MultivariateGrouper(max_target_dim=6)
        training_data = grouper(training_data)
        test_data = grouper(test_data)
        context_length = prediction_length
        num_series = 1
        skip_size = 1
        ar_window = 1
        channels = 4

        estimator = LSTNetEstimator(freq=freq,  
                                       prediction_length=prediction_length,
                                       context_length=context_length,
                                       num_series=num_series,
                                       skip_size=skip_size,
                                       ar_window=ar_window,
                                       channels=channels, 
                                       trainer=Trainer(ctx="cpu",
                                            epochs=epochs,
                                            learning_rate=learning_rate,
                                            hybridize=hybridize,
                                            num_batches_per_epoch=num_batches_per_epoch
                                         ))

        #train the model
        predictor = estimator.train(training_data=training_data)
        print("training is complete SUCCESS")
    elif algo == "seq2seq":
        estimator = MQCNNEstimator(freq=freq,  
                                       prediction_length=prediction_length,
                                       trainer=Trainer(ctx="cpu",
                                            epochs=epochs,
                                            learning_rate=learning_rate,
                                            hybridize=hybridize,
                                            num_batches_per_epoch=num_batches_per_epoch
                                         ))

        #train the model
        predictor = estimator.train(training_data=training_data)
        print("training is complete SUCCESS")
    else:
        estimator = TransformerEstimator(freq=freq,  
                                       prediction_length=prediction_length,
                                       trainer=Trainer(ctx="cpu",
                                            epochs=epochs,
                                            learning_rate=learning_rate,
                                            hybridize=hybridize,
                                            num_batches_per_epoch=num_batches_per_epoch
                                         ))

        #train the model
        predictor = estimator.train(training_data=training_data)
        print("training is complete SUCCESS")

    ###################################################

    #evaluate trained model on test data
    forecast_it, ts_it = make_evaluation_predictions(test_data, predictor,  num_samples=100)
    print("EVALUATION is complete SUCCESS")
    forecasts = list(forecast_it)
    tss = list(ts_it)
    evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
    agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(test_data))
    print("METRICS retrieved SUCCESS")
    #bucket = "bwp-sandbox"

    mainpref = "gluonts/blog-models/"
    prefix = mainpref + str(seq) + "/"
    agg_df = pd.DataFrame(agg_metrics, index=[0])
    file = "metrics"+str(seq)+".csv"
    os.system('mkdir metrics')
    cspath = os.path.join('metrics', file)
    agg_df.to_csv(cspath)
    s3.upload_file(cspath,bucket,mainpref+"metrics/"+file)


    hook.save_scalar("MAPE", agg_metrics["MAPE"], sm_metric=True)
    hook.save_scalar("RMSE", agg_metrics["RMSE"], sm_metric=True)
    hook.save_scalar("MASE", agg_metrics["MASE"], sm_metric=True)
    hook.save_scalar("MSE", agg_metrics["MSE"], sm_metric=True)

    print("MAPE:", agg_metrics["MAPE"])

    #save the model
    predictor.serialize(pathlib.Path(os.environ['SM_MODEL_DIR'])) 


    uploadDirectory(os.environ['SM_MODEL_DIR'], prefix, bucket)

    return predictor





def model_fn(model_dir):
    path = pathlib.Path(model_dir)   
    predictor = Predictor.deserialize(path)

    return predictor


def transform_fn(model, data, content_type, output_content_type):

    data = json.loads(data)
    df = pd.DataFrame(data)

    test_data = ListDataset([{"start": df.index[0], 
                              "target": df.usage[:]}], 
                               freq=freq)

    forecast_it, ts_it = make_evaluation_predictions(test_data, model,  num_samples=100)  
    #agg_metrics, item_metrics = Evaluator()(ts_it, forecast_it, num_series=len(test_data))
    #response_body = json.dumps(agg_metrics)
    response_body = json.dumps({'predictions':list(forecast_it)[0].samples.tolist()[0]})
    return response_body, output_content_type


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket', type=str, default="")
    parser.add_argument('--seq', type=str, default="sample")
    parser.add_argument('--algo', type=str, default="DeepAR")
    parser.add_argument('--freq', type=str, default='D')
    parser.add_argument('--prediction_length', type=int, default=30)
    parser.add_argument('--epochs', type=int, default=10)
    parser.add_argument('--learning_rate', type=float, default=1e-3)
    parser.add_argument('--hybridize', type=bool, default=True)
    parser.add_argument('--num_batches_per_epoch', type=int, default=10)    

    return parser.parse_args()

if __name__ == '__main__':
    args = parse_args()
    train(args.bucket, args.seq, args.algo, args.freq, args.prediction_length, args.epochs, args.learning_rate, args.hybridize, args.num_batches_per_epoch)

こちらのpythonコードをimportします

```py
#Let's take a look at the training script
!pygmentize blog_train_algos.py

このスクリプトによってS3にアップロードしたtrain.csvとtest.csvのデータを使用して、学習と評価に使用するgluontsデータセットを作成します。学習が完了すると評価を行い、後に評価指標の最も良いモデルを選択するためにSageMaker Debugger Hook関数を使用してs3に保存されます。さらに、分析するためにSageMaker Trial Component analyticsからもメトリクスを入手できます。モデルは検索しやすいようにシリアル化されてs3に保存されます。

4. SageMaker Experimentのセットアップ

SageMaker Experimentとは、モデルのバージョン、評価の管理を行う機能で、どのモデルが良いか、その時の設定は何だったかを管理できます。
各モデルの性能を検証するために、smexperimentsというライブラリを使用します。

from datetime import datetime
from smexperiments.experiment import Experiment

sagemaker_boto_client = boto3.client("sagemaker")

#name of experiment
timestep = datetime.now()
timestep = timestep.strftime("%d-%m-%Y-%H-%M-%S")
experiment_name = timestep + "-timeseries-models"

#create experiment
Experiment.create(
    experiment_name=experiment_name, 
    description="Timeseries models", 
    sagemaker_boto_client=sagemaker_boto_client)

また、ワークフロー内の個々のアクティビティを保存するために以下を設定します。

from smexperiments.trial import Trial

trial = Trial.create(
    experiment_name=experiment_name,
    sagemaker_boto_client=sagemaker_boto_client
)
print(trial)

最後にexperimentの設定を定義します。

experiment_config = { "ExperimentName": experiment_name, 
                      "TrialName":  trial.trial_name,
                      "TrialComponentDisplayName": "Training"}

最後に

以上が前半の解説になります。
ここまではモデル評価のための準備みたいなものですね。
次回は、実際にトレーニングをして、どのモデルが一番良いかを検証したいと思います。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2