はじめに
- 大きめなデータでk-meansを実行したかったが、ローカルでscikit-learnで実行しようとするとリソースが足りなかったので、Glueを試してみた。
Glueとは
- DataCatalog と ETL という2つのコンポーネントから構成されるサービス。
- 「S3のデータファイルから、AWS Redshift Spectrum と AWS Athenaのテーブルを作成する」 ツール。
- GlueでDataCatalogと呼んでいるテーブル定義は、Apache Hiveのテーブルのこと。 = Glueは、S3のファイルからHive Tableを作るツール。
- Glueはクローリングによるテーブル定義作成・更新に加えて、Apache Sparkを使って、プログラミングにより、ユーザーがより細かくデータ加工することもできる。
- Sparkについてはこちらを参照。
Glue利用手順
- AWS Glue や Amazon Athena を用いたサーバーレスな Machine Learning 環境 をベースに進めて行きます。
- データ概要
- ニューヨーク市のタクシーに関するデータセット
- データ格納先:
s3://serverless-analytics/glue-blog
- 2016 年 1 月のタクシー乗車に関する緑色のタイプから構成されたテーブル(green)を使用する
- 処理概要
- タクシーの座標に基づき100通りのクラスターに分割する
- -> 後続で、Athenaで乗車数・各クラスターのおおよその地域をクエリしたり、最も乗車数の多かった4つの地域の座標を割り出したりしたい。
①ローカルでscikit-learnでk-means手順(参考)
まず、参考までに(比較のために)、いつも通りローカルで実行してみる。
データセット取得
- データセットをダウンロードにダウンロードする
ディレクトリ構成
kmeans_demo/
├── inputs
│ ├── green_tripdata_2016-01.csv
│ ├── run-1505565929137-part-r-00000
│ ├── ...
├── outputs
└── src
└── MLkmeansLocal.ipynb
# ダウンロード
$ aws s3 cp s3://serverless-analytics/glue-blog/green/ . --recursive
# ファイルサイズ確認
$ ls -lh green_tripdata_2016-01.csv
-rw-rw-r-- 1 cloudshell-user cloudshell-user 221M Aug 1 2017 green_tripdata_2016-01.csv
$ cat * | wc -l
12352171
# カラム確認
$ head -1 green_tripdata_2016-01.csv
VendorID,lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,improvement_surcharge,Total_amount,Payment_type,Trip_type
データセット準備
csvファイルを1つにまとめる
MLkmeansLocal(1/3)
# 事前準備(csvファイルの結合)
import pandas as pd
import glob
# パスで指定したファイルの一覧をリスト形式で取得.
csv_files = glob.glob('../inputs/*')
#csvファイルの中身を追加していくリストを用意
data_list = []
#読み込むファイルのリストを走査
for file in csv_files:
data_list.append(pd.read_csv(file))
#リストを全て行方向に結合
#axis=0:行方向に結合, sort
df_tmp = pd.concat(data_list, axis=0, sort=True)
print(len(df_tmp))
df_tmp.to_csv("../inputs/green.csv",index=False)
scikit-learnスクリプト開発・実行
MLkmeansLocal(2/3)
import pandas as pd
from sklearn.cluster import KMeans
import time
start_time = time.time()
# INPUT / OUTPUT DATA
# source="../inputs/green.csv"
source="../inputs/run-1505565929137-part-r-00000"
destination = "../outputs/ClusterResults3.csv"
DataFrame1 =pd.read_csv(source)
print("len(DataFrame1): " + str(len(DataFrame1)))
#Load data and select fields
DataFrame0 =pd.read_csv(source)
# DataFrame0 = DataFrame0[["Trip_distance","Fare_amount","Pickup_longitude","Pickup_latitude" ]]
DataFrame0 = DataFrame0[["trip_distance","fare_amount","pickup_longitude","pickup_latitude" ]]
#Filter some unwanted values
# DataFrameFiltered = DataFrame0.query('Pickup_latitude > 40.472278 and Pickup_latitude < 41.160886 and Pickup_longitude > -74.300074 and Pickup_longitude < -71.844077')
DataFrameFiltered = DataFrame0.query('pickup_latitude > 40.472278 and pickup_latitude < 41.160886 and pickup_longitude > -74.300074 and pickup_longitude < -71.844077')
# #Select features and convert to SparkML required format
# features = ["Pickup_longitude","Pickup_latitude"]
# assembler = VectorAssembler(inputCols=features,outputCol='features')
# assembled_df = assembler.transform(DataFrameFiltered)
#Fit and Run Kmeans
start_time_kmeans = time.time()
kmeans = KMeans(n_clusters=100, random_state=1)
model = kmeans.fit(DataFrameFiltered)
end_time_kmeans = time.time()
print("exec time kmeans: " + str(end_time_kmeans - start_time_kmeans))
print("len(model.labels_): " + str(len(model.labels_)))
DataFrameFiltered['prediction']=model.labels_
#Save data to destination
DataFrameFiltered.to_csv(destination)
end_time = time.time()
print("exec time total: " + str(end_time - start_time))
実行結果確認
MLkmeansLocal(2/3)
# check result
print(DataFrameFiltered[0:10])
print("len(DataFrameFiltered): " + str(len(DataFrameFiltered)))
print(DataFrameFiltered.groupby('prediction').size().sort_values(ascending=False)[0:10])
②AWS環境でGlueでk-means実行する手順(コンソール)
データセット取得
- AWSがS3に格納してくれているデータを利用するので、今回は対応不要
データセット準備
Athenaテーブルを作成する
- AWSコンソールログイン→「Glue」
- データベースの追加
- Name: kmeans_demo_db
- クローラの追加
- Name: kmeans_demo_crawler
- Crawler source type: Data Stores
- Repeat crawls of S3 data stores: Crawl all folders
- データストアの選択: S3
- クロールするデータのパス: 別のアカウントで指定されたパス
- Include path: s3://serverless-analytics/glue-blog
- IAMロール: AWSGlueServiceRole-Demo
- 頻度: オンデマンドで実行
- Database: kmeans_demo_db
- クローラの実行
- テーブル作成確認
- 「テーブル」で、greenのテーブル等が作成されていることを確認する
Sparkスクリプト開発
-
MLkmeans.pyをアップロードする
s3://kmeans-demo-bucket-XXX/src/MLkmeans.py
- 事前にS3バケットを作成しておくこと
MLkmeans.py
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import SelectFields
from awsglue.transforms import RenameField
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader, DynamicFrameWriter, DynamicFrameCollection
from pyspark.context import SparkContext
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
#JOB INPUT DATA
# destination = "s3://luiscarosnaprds/gluescripts/results/ClusterResults3.parquet"
destination = "s3://kmeans_demo_bucket_XXX/outputs/ClusterResults3.parquet"
# namespace = "nyc-transportation-version2"
namespace = "kmeans_demo_db"
tablename = "green"
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#Load table and select fields
datasource0 = glueContext.create_dynamic_frame.from_catalog(name_space = namespace, table_name = tablename)
SelectFields0 = SelectFields.apply(frame = datasource0, paths=["trip_distance","fare_amount","pickup_longitude","pickup_latitude" ])
DataFrame0 = DynamicFrame.toDF(SelectFields0)
#Filter some unwanted values
DataFrameFiltered = DataFrame0.filter("pickup_latitude > 40.472278 AND pickup_latitude < 41.160886 AND pickup_longitude > -74.300074 AND pickup_longitude < -71.844077")
#Select features and convert to SparkML required format
features = ["pickup_longitude","pickup_latitude"]
assembler = VectorAssembler(inputCols=features,outputCol='features')
assembled_df = assembler.transform(DataFrameFiltered)
#Fit and Run Kmeans
kmeans = KMeans(k=100, seed=1)
model = kmeans.fit(assembled_df)
transformed = model.transform(assembled_df)
#Save data to destination
transformed.write.mode('overwrite').parquet(destination)
job.commit()
- (参考)argsに入ってくる値
args: {'job_bookmark_option': 'job-bookmark-disable', 'job_bookmark_from': None, 'job_bookmark_to': None, 'JOB_ID': 'j_b3dXXX', 'JOB_RUN_ID': 'jr_ad6XXX', 'SECURITY_CONFIGURATION': None, 'encryption_type': None, 'enable_data_lineage': None, 'RedshiftTempDir': 's3://kmeans-demo-bucket-XXX/tmp/', 'TempDir': 's3://kmeans-demo-bucket-XXX/tmp/', 'JOB_NAME': 'kmeans_demo_job'}
args['JOB_NAME']: kmeans_demo_job
Sparkジョブ実行
- ジョブの追加
- Name: kmeans_demo_job
- Type: Spark
- Glue version:
- このジョブ実行: ユーザーが提供する既存のスクリプト
- スクリプトが保存されている S3 パス: s3://kmeans-demo-bucket-XXX/src/MLkmeans.py
- 一時ディレクトリ: s3://kmeans-demo-bucket-XXX/tmp
- 作業者タイプ: 標準
- 作業者数: 2(最小が2)
- ジョブのタイムアウト: 30
- ジョブを保存してスクリプトを修正する
destination = "s3://kmeans-demo-bucket-XXX/outputs/"
namespace = "kmeansDemoDB"
-
ジョブの実行
- 事前にIAMロールにS3のアクセス権限(IAMポリシー)を付与しておくこと
-
ジョブの正常終了確認
- ジョブの実行ステータスを確認する
- 送信先のパスで parquet 形式のファイルが作成されているか確認する
- エラーが出た場合は、CloudWatchLogsを確認する(Glueの画面にリンクがある)
実行結果確認
出力されたファイルをAthenaテーブルとして取り込んで確認する
- クローラの作成
- Name: kmeans_demo_crawler_outputs
- Crawler source type: Data Stores
- Repeat crawls of S3 data stores: Crawl all folders
- データストアの選択: S3
- クロールするデータのパス: 別のアカウントで指定されたパス
- Include path: s3://kmeans-demo-bucket-XXX/outputs/ClusterResults3.parquet
- IAMロール: AWSGlueServiceRole-Demo
- 頻度: オンデマンドで実行
- Database: kmeans_demo_db
- クローラの実行
- クエリ実行
- 「Athena」
- S3設定
- クエリ実行
-
prediction
の列が、 k-means のアルゴリズムにより追加されたもので、各行に指定されたクラスター ID を表す整数 - 0-99の100分類
-
SELECT *
FROM kmeans_demo_db.clusterresults3_parquet
limit 10;
SELECT count(*)
FROM kmeans_demo_db.clusterresults3_parquet
SELECT count(*) AS count,
prediction
FROM kmeans_demo_db.clusterresults3_parquet
GROUP BY prediction
ORDER BY count DESC limit 10;