背景
AWS SageMakerを勉強していると、機械学習AIで利用するデータ(特徴量)を一元管理出来るものとして、SageMaker Feature Storeがありましたので、触ってみました。
環境
sagemaker:2.219.0
試した事(概要)
SageMaker StudioのJupyterLabにて、Nishikaのコンペの訓練(train)データを対象にして、SageMaker Feature Storeを触ってみました。
試した事(詳細)
1. 前準備
1.1. S3を用意
S3バケット内に、SageMaker Feature Store用のフォルダを作成します。
1.2. JupyterLabを起動
SageMaker Studioにアクセスします。
そして、JupyterLabにアクセスして、Notebookインスタンスを新規に作成、もしくは以前に作成済みのNotebookインスタンスをRunボタンで起動します。
起動したら、Openボタンをクリックして、NotebookインスタンスのJupyterLabの画面を開きます。
開いたら、まずは必要なライブラリーをインポートします。
from datetime import datetime
import os
import numpy as np
import pandas as pd
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
1.3. データを用意
こちらのサイトから、train.zipをダウンロードします。
zipファイルを解凍すると、trainフォルダ内に多数のcsvファイルが入っていますので、それらcsvファイルを、pandasを使って1つのcsvファイルに纏めて、train.csvとします。
train_df_list = []
for csv_file in glob.glob("./data/Nishika_ApartmentPrice/train/*"):
df = pd.read_csv(filepath_or_buffer=csv_file,
converters={"面積(㎡)": str},
encoding="utf-8")
train_df_list.append(df)
train_df = pd.concat(objs=train_df_list,
axis=0,
ignore_index=True)
print(train_df.columns)
Index(['ID', '種類', '地域', '市区町村コード', '都道府県名', '市区町村名', '地区名', '最寄駅:名称',
'最寄駅:距離(分)', '間取り', '面積(㎡)', '土地の形状', '間口', '延床面積(㎡)', '建築年', '建物の構造',
'用途', '今後の利用目的', '前面道路:方位', '前面道路:種類', '前面道路:幅員(m)', '都市計画', '建ぺい率(%)',
'容積率(%)', '取引時点', '改装', '取引の事情等', '取引価格(総額)_log'],
dtype='object')
1.4. データを前処理
見やすくするために、カラムを少し減らそうと思います。
今回はこちらの記事の1.4.と同じ形の前処理にします。
train_feature_df = train_df[["市区町村コード", "最寄駅:距離(分)", "面積(㎡)", "建ぺい率(%)", "容積率(%)", "取引価格(総額)_log"]].dropna(how="any").copy()
train_feature_df["面積(㎡)"] = train_feature_df["面積(㎡)"].apply(lambda x: "2000" if x == "2000㎡以上" else x)
train_feature_df["面積(㎡)"] = train_feature_df["面積(㎡)"].astype("int")
train_feature_df["最寄駅:距離(分)"] = train_feature_df["最寄駅:距離(分)"].apply(lambda x: "45" if x == "30分?60分" else "75" if x == "1H?1H30" else "105" if x == "1H30?2H" else "120" if x == "2H?" else x)
train_feature_df["最寄駅:距離(分)"] = train_feature_df["最寄駅:距離(分)"].astype("int")
train_feature_df["建ぺい率(%)"] = train_feature_df["建ぺい率(%)"].astype("int")
train_feature_df["容積率(%)"] = train_feature_df["容積率(%)"].astype("int")
print(train_feature_df)
市区町村コード 最寄駅:距離(分) 面積(㎡) 建ぺい率(%) 容積率(%) 取引価格(総額)_log
0 30201 45 45 80 300 6.875061
1 30201 8 75 80 400 7.397940
2 30201 6 75 80 400 6.880814
3 30201 29 60 80 300 6.869232
4 30201 9 65 80 400 7.255273
... ... ... ... ... ... ...
637346 41201 24 70 60 200 6.944483
637347 41201 6 65 80 400 7.000000
637348 41201 26 35 80 400 6.643453
637349 41201 28 15 80 400 6.431364
637350 41201 8 75 60 200 7.146128
[600296 rows x 6 columns]
続いて、ここからはSageMaker Feature Storeのお作法のための前処理になります。
まずは、SageMaker Feature Storeでは「レコードを一意に識別出来る特徴量」と「レコードの作成や更新日時を示す特徴量」の2つが必要になります。
前者は所謂「レコードID」のようなもので、後者は「レコード作成更新日時」のようなものになります。
(後者はyyyy-MM-ddTHH:MM:SSZもしくはyyyy-MM-ddTHH:MM:SS.SSSZの形式である必要がありそうです。)
この2つのカラムをデータに追加します。
train_feature_df["レコードID"] = train_feature_df.index
train_feature_df["レコード作成日時"] = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
print(train_feature_df.head(3))
市区町村コード 最寄駅:距離(分) 面積(㎡) 建ぺい率(%) 容積率(%) 取引価格(総額)_log レコードID \
0 30201 45 45 80 300 6.875061 0
1 30201 8 75 80 400 7.397940 1
2 30201 6 75 80 400 6.880814 2
レコード作成日時
0 2024-07-01T06:26:27Z
1 2024-07-01T06:26:27Z
2 2024-07-01T06:26:27Z
次に、データ型を変えます。
SageMaker Feature Storeでは、「string」「float」「integer」のデータ型がサポートされているようですが、今回のデータの文字列のカラムは「object」になっています。
print(train_feature_df.dtypes)
市区町村コード int64
最寄駅:距離(分) int64
面積(㎡) int64
建ぺい率(%) int64
容積率(%) int64
取引価格(総額)_log float64
レコードID int64
レコード作成日時 object
dtype: object
そのため、「object」を「string」に変えます。
for label in train_feature_df.columns:
if train_feature_df[label].dtype == "object":
train_feature_df[label] = train_feature_df[label].astype("string")
print(train_feature_df.dtypes)
市区町村コード int64
最寄駅:距離(分) int64
面積(㎡) int64
建ぺい率(%) int64
容積率(%) int64
取引価格(総額)_log float64
レコードID int64
レコード作成日時 string[python]
dtype: object
そして、さらにカラム名を変更します。
カラム名は英数字である必要があるらしく、このままのカラム名だと、SageMaker Feature Storeを触る際にエラーとなります。よって、カラム名を英数字の形に変更します。
train_feature_df = train_feature_df.rename(columns={"市区町村コード": "code",
"最寄駅:距離(分)": "distance",
"面積(㎡)": "area",
"建ぺい率(%)": "building_rate",
"容積率(%)": "volume",
"取引価格(総額)_log": "price",
"レコードID": "ID",
"レコード作成日時": "RECORD_DATE"})
print(train_feature_df.dtypes)
code int64
distance int64
area int64
building_rate int64
volume int64
price float64
ID int64
RECORD_DATE string[python]
dtype: object
データの前処理はここまでになります。
2. SageMaker Feature Storeを試す
2.1. 初期変数を設定
まずは初期変数として、いくつか変数を設定します。
ROLE = sagemaker.get_execution_role()
SAGEMAKER_SESSION = sagemaker.Session()
REGION = boto3.Session().region_name
S3_BUCKET = SAGEMAKER_SESSION.default_bucket()
S3_PREFIX = "data-for-machine-learning/for_sagemaker_feature_store"
FEATURE_STORE_GROUP_NAME = "test_20240701"
変数ROLEやSAGEMAKER_SESSIONやREGIONやS3_BUCKETはSageMakerを扱う時のお決まりの変数になります。
変数S3_PREFIXは先程作成したS3のフォルダ、変数FEATURE_STORE_GROUP_NAMEは今回SageMaker Feature Storeに設定する名前となります。
2.2. Feature Storeのグループを作成
SageMaker Feature Storeは「グループ」を一つの単位として扱います。
そのため、まずはFeatureGroupクラスからFeature Groupのオブジェクトを作成します。
feature_store_group = FeatureGroup(name=FEATURE_STORE_GROUP_NAME,
sagemaker_session=SAGEMAKER_SESSION)
続いて、作成したFeature Groupのオブジェクトに、今回のtrain.csvのスキーマを設定します。イメージとしては、train.csvのカラム情報を設定するイメージになります。ちなみに、この設定で、train.csvが持っている「600,296行 x 6列」のデータがFeature Groupのオブジェクトに保存されるような雰囲気はありますが、保存されません(笑)カラム情報だけが設定されて、中身は空っぽの形です。
feature_store_group.load_feature_definitions(data_frame=train_feature_df)
[FeatureDefinition(feature_name='code', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
FeatureDefinition(feature_name='distance', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
FeatureDefinition(feature_name='area', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
FeatureDefinition(feature_name='building_rate', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
FeatureDefinition(feature_name='volume', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
FeatureDefinition(feature_name='price', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>, collection_type=None),
FeatureDefinition(feature_name='ID', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>, collection_type=None),
FeatureDefinition(feature_name='RECORD_DATE', feature_type=<FeatureTypeEnum.STRING: 'String'>, collection_type=None)]
では、ここまでの設定を踏まえて、Feature Groupのオブジェクトのcreateメソッドを使って、Feature Storeにグループを作成します。
feature_store_group.create(s3_uri="s3://{a}".format(a=os.path.join(S3_BUCKET, S3_PREFIX)),
record_identifier_name="ID",
event_time_feature_name="RECORD_DATE",
role_arn=ROLE,
enable_online_store=True,
description="test for sagemaker feature store")
{'FeatureGroupArn': 'arn:aws:sagemaker:ap-northeast-1:*****:feature-group/test_20240701',
'ResponseMetadata': {'RequestId': '*****',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '*****',
'content-type': 'application/x-amz-json-1.1',
'content-length': '95',
'date': 'Mon, 01 Jul 2024 06:38:36 GMT'},
'RetryAttempts': 0}}
Feature Storeにグループが作成されました。作成されたグループは、Feature Groupのオブジェクトのdescribeメソッドで確認出来ます。
print(feature_store_group.describe())
{'FeatureGroupArn': 'arn:aws:sagemaker:ap-northeast-1:*****:feature-group/test_20240701', 'FeatureGroupName': 'test_20240701', 'RecordIdentifierFeatureName': 'ID', 'EventTimeFeatureName': 'RECORD_DATE', 'FeatureDefinitions': [{'FeatureName': 'code', 'FeatureType': 'Integral'}, {'FeatureName': 'distance', 'FeatureType': 'Integral'}, {'FeatureName': 'area', 'FeatureType': 'Integral'}, {'FeatureName': 'building_rate', 'FeatureType': 'Integral'}, {'FeatureName': 'volume', 'FeatureType': 'Integral'}, {'FeatureName': 'price', 'FeatureType': 'Fractional'}, {'FeatureName': 'ID', 'FeatureType': 'Integral'}, {'FeatureName': 'RECORD_DATE', 'FeatureType': 'String'}], 'CreationTime': datetime.datetime(2024, 7, 1, 6, 38, 36, 183000, tzinfo=tzlocal()), 'OnlineStoreConfig': {'EnableOnlineStore': True}, 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemaker-ap-northeast-1-*****/data-for-machine-learning/for_sagemaker_feature_store', 'ResolvedOutputS3Uri': 's3://sagemaker-ap-northeast-1-*****/data-for-machine-learning/for_sagemaker_feature_store/*****/sagemaker/ap-northeast-1/offline-store/test_20240701-*****/data'}, 'DisableGlueTableCreation': False, 'DataCatalogConfig': {'TableName': 'test_20240701_*****', 'Catalog': 'AwsDataCatalog', 'Database': 'sagemaker_featurestore'}}, 'ThroughputConfig': {'ThroughputMode': 'OnDemand'}, 'RoleArn': 'arn:aws:iam::*****:role/service-role/AmazonSageMaker-ExecutionRole-*****', 'FeatureGroupStatus': 'Created', 'OfflineStoreStatus': {'Status': 'Active'}, 'Description': 'test for sagemaker feature store', 'OnlineStoreTotalSizeBytes': 0, 'ResponseMetadata': {'RequestId': '*****', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '*****', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2040', 'date': 'Mon, 01 Jul 2024 07:41:32 GMT'}, 'RetryAttempts': 0}}
また、SageMaker StudioのGUI画面でも、Feature Storeに新しいグループが作成された事を確認出来ます。
これで、Feature Storeにグループが作成出来ましたが、先程も少し記載した通り、このグループにはまだデータが何も入っていません。
そのため、試しに訓練(train)データの最初の100レコードを、このグループに保存してみます。
train_feature_df_100 = train_feature_df.iloc[:100, :].copy()
print(train_feature_df_100)
code distance area building_rate volume price ID \
0 30201 45 45 80 300 6.875061 0
1 30201 8 75 80 400 7.397940 1
2 30201 6 75 80 400 6.880814 2
3 30201 29 60 80 300 6.869232 3
4 30201 9 65 80 400 7.255273 4
.. ... ... ... ... ... ... ...
96 30201 45 40 60 200 7.079181 96
97 30201 8 70 80 300 7.322219 97
98 30201 3 50 60 200 6.763428 98
99 30201 14 65 80 600 7.079181 99
100 30201 9 80 80 400 7.301030 100
RECORD_DATE
0 2024-07-01T06:26:27Z
1 2024-07-01T06:26:27Z
2 2024-07-01T06:26:27Z
3 2024-07-01T06:26:27Z
4 2024-07-01T06:26:27Z
.. ...
96 2024-07-01T06:26:27Z
97 2024-07-01T06:26:27Z
98 2024-07-01T06:26:27Z
99 2024-07-01T06:26:27Z
100 2024-07-01T06:26:27Z
[100 rows x 8 columns]
Feature Groupのオブジェクトのingestメソッドを使って、データをグループに保存します。
feature_store_group.ingest(data_frame=train_feature_df_100,
max_workers=1,
wait=True)
IngestionManagerPandas(feature_group_name='test_20240701', feature_definitions={'code': {'FeatureName': 'code', 'FeatureType': 'Integral'}, 'distance': {'FeatureName': 'distance', 'FeatureType': 'Integral'}, 'area': {'FeatureName': 'area', 'FeatureType': 'Integral'}, 'building_rate': {'FeatureName': 'building_rate', 'FeatureType': 'Integral'}, 'volume': {'FeatureName': 'volume', 'FeatureType': 'Integral'}, 'price': {'FeatureName': 'price', 'FeatureType': 'Fractional'}, 'ID': {'FeatureName': 'ID', 'FeatureType': 'Integral'}, 'RECORD_DATE': {'FeatureName': 'RECORD_DATE', 'FeatureType': 'String'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at *****>, sagemaker_session=<sagemaker.session.Session object at *****>, max_workers=1, max_processes=1, profile_name=None, _async_result=None, _processing_pool=None, _failed_indices=[])
2.3. Feature Storeのグループからデータを取得
では、特徴量を一元管理出来るSageMaker Feature Storeからデータを取得してみます。
行う事は、別のipynbノートブックにて、先程作成して100レコードを保存したFeature Storeのグループからデータ(レコード)を取得してみます。
まずは、別のipynbノートブックを立ち上げて、ライブラリーのインポートや、初期変数を設定します。
from datetime import datetime
import os
import numpy as np
import pandas as pd
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
ROLE = sagemaker.get_execution_role()
SAGEMAKER_SESSION = sagemaker.Session()
REGION = boto3.Session().region_name
S3_BUCKET = SAGEMAKER_SESSION.default_bucket()
S3_PREFIX = "data-for-machine-learning/for_sagemaker_feature_store"
FEATURE_STORE_GROUP_NAME = "test_20240701"
Feature Groupのオブジェクトを作成します。引数nameには、先程作成したグループの名前を設定します。
feature_store_group_2 = FeatureGroup(name=FEATURE_STORE_GROUP_NAME,
sagemaker_session=SAGEMAKER_SESSION)
ここからデータ(レコード)を取得していきます。データ取得の流れは、
「Feature GroupのオブジェクトからAthena Queryのオブジェクトを作成」
->「Athena Queryのオブジェクトのrunメソッドを使って、Feature Storeのグループからデータ(レコード)を取得」
になります。
まずは、athena_queryクラスで、Athena Queryのオブジェクトを作成します。
feature_store_group_2_query = feature_store_group_2.athena_query()
続いて、引数query_stringにクエリを入れる形で、runメソッドを行います。
runメソッドの引数の一覧はこちらになります。
クエリの内容は、テーブルから全てのカラムを最大1000レコード取得する、という内容になります。
このクエリは、Feature StoreのGUI画面のここから引用しました。
また、runメソッドの引数output_locationは、クエリの結果(今回の場合はテーブルから取得したレコード)を保存するS3フォルダを設定します。
query = 'SELECT * FROM "sagemaker_featurestore"."test_20240701_*****" LIMIT 1000'
feature_store_group_2_query.run(query_string=query,
output_location="s3://{a}/result_athena_query".format(a=os.path.join(S3_BUCKET, S3_PREFIX)))
feature_store_group_2_query.wait()
runメソッドの完了後、as_dataframeメソッドを使って、取得したレコードをDataFrameの形で表示してみます。
print(feature_store_group_2_query.as_dataframe())
code distance area building_rate volume price id \
0 30201 26 65 60 200 7.041393 28
1 30201 45 55 50 100 6.845098 19
2 30201 45 55 60 200 7.079181 24
3 30201 4 55 60 200 6.755875 26
4 30201 5 65 60 200 6.832509 46
.. ... ... ... ... ... ... ..
95 30201 11 70 80 400 7.278754 95
96 30201 7 55 80 400 7.301030 32
97 30201 20 45 80 400 6.462398 41
98 30201 45 80 60 200 7.462398 44
99 30201 11 70 80 400 7.301030 51
record_date write_time api_invocation_time \
0 2024-07-01T06:26:27Z 2024-07-01 07:38:11.514 2024-07-01 07:33:11.000
1 2024-07-01T06:26:27Z 2024-07-01 07:38:11.554 2024-07-01 07:33:10.000
2 2024-07-01T06:26:27Z 2024-07-01 07:38:11.394 2024-07-01 07:33:10.000
3 2024-07-01T06:26:27Z 2024-07-01 07:38:11.554 2024-07-01 07:33:11.000
4 2024-07-01T06:26:27Z 2024-07-01 07:38:11.554 2024-07-01 07:33:11.000
.. ... ... ...
95 2024-07-01T06:26:27Z 2024-07-01 07:38:01.368 2024-07-01 07:33:12.000
96 2024-07-01T06:26:27Z 2024-07-01 07:38:01.368 2024-07-01 07:33:11.000
97 2024-07-01T06:26:27Z 2024-07-01 07:38:01.368 2024-07-01 07:33:11.000
98 2024-07-01T06:26:27Z 2024-07-01 07:38:01.368 2024-07-01 07:33:11.000
99 2024-07-01T06:26:27Z 2024-07-01 07:38:01.368 2024-07-01 07:33:11.000
is_deleted
0 False
1 False
2 False
3 False
4 False
.. ...
95 False
96 False
97 False
98 False
99 False
[100 rows x 11 columns]
別のipynbノートブックにて、Feature Storeから特徴量のデータを取得出来ました。
以上になります。
まとめ
Feature Storeに特徴量データを保存して、別のノートブックからその特徴量データを取得してみました。S3に保存しているcsvファイルを皆がアクセスするよりも、Feature Storeに保存しているとくちょうりょうデータを皆がアクセスする、という形の方が管理面でも有用そうと思いました。
参考