70
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

NTTドコモ サービスイノベーション部Advent Calendar 2019

Day 22

NVIDIA RAPIDSを使って前処理・機械学習・位置情報分析を高速化しよう

Last updated at Posted at 2019-12-22

NVIDIA RAPIDSを使ったデータ分析と位置情報分析の入門

皆さん、こんにちは、こんばんは。石黒慎と申します。
この記事では、NVIDIA RAPIDSを使ったデータ分析について、ご紹介させていただきます。

RAPIDSを使うと、データサイエンスに必要な前処理〜機械学習までを簡単に高速化できます。
本記事ではRAPIDSの導入方法・利用方法から、
RAPIDSを用いた位置情報データ分析までをご紹介します。

本記事を参考に、RAPIDSを用いた様々な分析にトライして頂けると幸いです。

本記事の目標: 読者にお持ち帰りいただきたいもの

  • RAPIDSとはなにか?
  • RAPIDSの導入方法
  • RAPIDSの各種機能のご紹介
  • RAPIDSを使ったデータ分析入門 (タクシーデータに対して、dask-XGBoostを用いて機械学習)
  • cuSpatialを使った位置情報データ分析

RAPIDSとはなにか?

RAPIDSは、NVIDIAが中心となってOpen Sourceで開発を進めている、
データサイエンスのための様々な処理を一貫してGPUで行うためのプラットフォームです。

データサイエンスの処理フローは、ざっくり、前処理→機械学習→後処理というケースが多いと思います。

もう少し具体的には、

RDSやBigQueryでSQLを使った簡単な前処理
→ numpyやpandasで複雑な前処理
→ Scikit-LearnやTensorflowなどで機械学習
→ またnumpyやpandasで必要に応じて後処理 

というようなフローで行われることが多いと思います。

分析者の悩みのタネは、このフローの途中途中で、
様々なライブラリ・プラットフォームを行き来して利用する必要があることです。
計算HWを跨ぐと、その分データの移動時間が必要です。
さらに大きな悩みは、前処理のデファクトスタンダードである「pandasの計算処理が遅い!!!」ことです。

この問題を解決するため、RAPIDSでは、前処理→機械学習→後処理というフロー全てを、
GPUを用いた一つのエコシステムの中で、高速に計算することを可能としています。

補足

また、このような一般的な問題に加えて、位置情報データ分析では特有の悩みもあります。
すなわち、大量の位置情報を用いたデータ分析は
SQLやpandasによる前処理の計算負荷が重すぎて、しばしば計算が困難となることです。

この問題の原因は、多くのSQL基盤やpandasが、GPU上での計算処理に対応していないこと、
そして空間データ構造を考慮した空間演算に対応していないことにあります。

RAPIDSでは、cuspatialという機能で空間演算の機能が実装され始めています。
また、omniscidbなど、RAPIDSのcumlなどを用いることによって、
位置情報データの高速なSQL処理を実現している基盤もあります。

※空間演算を高速化するためには、特別なデータ構造でデータを持つことで近似的に空間演算を行う必要があります。
こちらは昨年のAdvent Calendarでもご紹介させていただいていますので、ご参考ください。

https://qiita.com/shin_ishiguro/items/2429038b2c4c99c10837

RAPIDSの導入方法

まずは、下記URLから、自分の環境に合わせてインストールしましょう。
https://rapids.ai/start.html

私は記事作成時点で、Stable (0.10)のバージョンをcondaからインストールしました。
記事投稿日の12月22日では、0.11がstableリリースされています。

RAPIDSを既存のconda環境で試すと、他のライブラリと依存関係が衝突したので、
私は下記の用にRAPIDS専用に環境をクリーンインストールしました。

conda create -n rapids_0_11 python3.7 anaconda
source activate rapids_0_11
conda install -c rapidsai -c nvidia -c conda-forge -c defaults rapids=0.11 python=3.7

stableではなく、nightlyの方は開発者向けで、実験的なリリースです。
使ってみると今の所は様々エラーに遭遇し、まともに使えないので、
RAPIDSそのものの開発に興味がある方以外は、stableリリース版のインストールをおすすめします。

RAPIDSの各機能の紹介

RAPIDSを構成するライブラリは多数あります。ここでは以下のライブラリを使ってみます。

  • cuDF: cudaを利用することで、DataFrameをGPU上で扱えるようにする。pandasライクな動作
  • Dask-cuDF: Daskの機能を利用することで、cuDFをマルチGPU化して扱えるようにする。また、cuDFではGPUメモリが扱えるデータ量の上限となるが、Dask-cuDFを使うことでより大きな容量のデータが扱えるようになる。
  • cuML: cudaを利用することで、GPU上で機械学習をできるようにする。scikit-learnライクな動作
  • Dask-XGBoost: 機械学習の中でも特にXGBoostがよく使われるので、Dask化して高速に学習できるようにされたもの。
  • cuspatial: 位置情報データ(点群データや移動軌跡データ)に対して、様々な処理を行うためのもの

RAPIDSを用いたデータ分析入門

RAPIDSを用いて簡単な機械学習を試してみます。
機械学習の手法は既にいくつもRAPIDSに実装されています。
手法毎に開発のフェーズが異なっていて、Single GPUのみ対応のものと、
Multi GPUに対応しているものとが入り混じっているのが、現在の状況です。
将来的にはMulti-nodeクラスタ化した機械学習にも対応していく予定とのことなので、期待してます。

以下に、12月22日現在、cumlで使える手法を列挙します。既にかなりたくさんの手法を利用できますね。

  • DBSCAN
  • K-Means (dask経由のMulti GPUに対応)
  • PCA (dask経由のMulti GPUに対応)
  • tSVD (dask経由のMulti GPUに対応)
  • UMAP
  • TSNE
  • Linear Regression
  • Linear Regression with Lasso、Ridge正規化
  • ElasticNet
  • Logistic Regression (conda CUDA10はMulti GPUに対応)
  • SGD
  • Random Forest Classification, Regression (実験的にdask経由のMulti GPUに対応)
  • KNN Classification, Regression (nightly 0.12からMulti GPU対応済み)
  • SVM 分類器
  • Kalman Filter
  • Wolt-Winters Exponential Smoothing
  • ARIMA

また、XGBoostは、cumlには含められてはいないのですが、
RAPIDSのエコシステムとの連携・統合が進められており、利用可能です。

タクシーデータを用いた機械学習実践

それでは、RAPIDSによる機械学習の方法について見ていきましょう。
今回は↓のNYCのYellowタクシーのトリップデータを使ってみます。

データは下記のようにダウンロードしてください。
wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv

このデータには、乗車したお客様を目的地に運ぶまでの1トリップについて、
乗車時刻、降車時刻、乗車エリア、降車エリア、乗客数、料金レート、
支払い方法、客車走行距離、料金、追加料金、税金、料金所通行料、
チップ代、サービス改善用サーチャージ、混雑時追加料金、合計金額といったデータが含まれています。

今回はそれ以外のデータから、チップ代を推定する回帰問題をxgboostで解いてみます。

以下の公式チュートリアルを参考にコードを改造して、
Dask-cuDFとDask-XGBoostを組み合わせた機械学習を実施してみます。

まずは前処理です。cudfを駆使してデータを読み込み、特徴量に変換していきます。

import cudf
import cuml
import datetime as dt
import pandas as pd

file_path = 'yellow_tripdata_2019-01.csv'
cdf = cudf.io.csv.read_csv(file_path) # cudf dataframeへのcsvファイル読み込み処理
df = pd.read_csv(file_path) # pandas dataframeへのcsvファイル読み込み処理

cdf = cdf.drop(columns='tpep_pickup_datetime')
cdf = cdf.drop(columns='tpep_dropoff_datetime')
cdf['pickup_date'] = pd.to_datetime(df.tpep_pickup_datetime)
cdf['dropoff_date'] = pd.to_datetime(df.tpep_dropoff_datetime)

# 時系列データをcudfで処理する方法がわからなかったので(まだ無いのかな?)、
# pandasで型変換してからcudfに変換してます。
# もし、pd.to_datatime()をpandas dataframeではなく、
# cudf dataframeを入力としてしまうと超長時間の処理が走ってしまうので終わりません。
search_date = dt.datetime.strptime('2019-01-01', '%Y-%m-%d')
cdf = cdf.query('pickup_date >= @search_date')
search_date = dt.datetime.strptime('2019-01-05', '%Y-%m-%d')
cdf = cdf.query('pickup_date < @search_date')
cdf = cdf.reset_index(drop=True) # rapids0.11から inplace=Trueが使えるようになったようです。
                                 # 0.10では使えませんでした

cdf['pickup_date_day'] = cdf.pickup_date.dt.day
cdf['pickup_date_weekday'] = cdf.pickup_date.dt.weekday
cdf['pickup_date_hour'] = cdf.pickup_date.dt.hour
cdf['dropoff_date_day'] = cdf.dropoff_date.dt.day
cdf['dropoff_date_weekday'] = cdf.dropoff_date.dt.weekday
cdf['dropoff_date_hour'] = cdf.dropoff_date.dt.hour

le = cuml.preprocessing.LabelEncoder()
cdf['store_and_fwd_flag'] = le.fit_transform(cdf.store_and_fwd_flag)
# cumlは、sklearn同様カテゴリ変数のラベル特徴量化の前処理等もできます。

cdf.congestion_surcharge.fillna(0, inplace=True)

_columns = ['VendorID', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 
            'DOLocationID', 'payment_type', 
            'pickup_date_day', 'pickup_date_weekday', 'pickup_date_hour', 
            'dropoff_date_day', 'dropoff_date_weekday', 'dropoff_date_hour']

cdf = cudf.core.reshape.get_dummies(cdf, columns=_columns)
# 様々なカテゴリ変数を1-hot特徴量に変換しています。

for c in _columns:
    cdf = cdf.drop(columns=c)

cdf.fillna(0, inplace=True)

次に、計算機クラスターの設定を行っていきます。
複数GPUを使うため、Dask-CUDAという、
Daskのクラスターをセットアップするためのユーティリティライブラリを使います。

import dask
import dask_cudf
import dask_xgboost

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

import subprocess

cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCUDACluster(ip=IPADDR)
client = Client(cluster) #processes=False)#
client

今回は、TITAN-X(Pascal版)が2つ搭載されたワークステーションで動作させたため、Workerの数が2つになっています。

image-20191203061345505のコピー.png

次に、これまで前処理を行ってきたcudf dataframeから、dask dataframeに変換を行います。
データ変換することで初めて、Multi GPUでの機械学習が可能になります。
※前処理は、dask cudfでMulti GPUで実行することも可能です

_npartitions = 8
search_date = dt.datetime.strptime('2019-01-03', '%Y-%m-%d')
# rapids0.10だとdask_dataframeでdropしようとすると、inplace周りにバグがあるため、エラーが起きる
# このため、前処理で使えるメモリが減るが、cudf_dataframeの段階で前処理を済ませることにした

cdf = cdf.drop(columns='dropoff_date')
cdf_train = cdf.query('pickup_date < @search_date')
cdf_test  = cdf.query('pickup_date >= @search_date')
cdf_train = cdf_train.drop(columns='pickup_date')
cdf_test  = cdf_test.drop(columns='pickup_date')

ddf_train = dask_cudf.from_cudf(cdf_train, npartitions=_npartitions)
y_train   = ddf_train[['tip_amount']]
x_train   = ddf_train[ddf_train.columns.difference(['tip_amount'])]

ddf_test  = dask_cudf.from_cudf(cdf_test, npartitions=_npartitions)
y_test    = ddf_test[['tip_amount']]
x_test    = ddf_test[ddf_test.columns.difference(['tip_amount'])]

パラメータを定義して、XGBoostの学習を行います。

params = {
    'num_rounds': 100, # 学習ラウンド数です。多いほどデータセットにフィットします
    'max_depth': 8,
    'max_leaves': 2**8,
    'n_gpus': 1, # 1つのGPUでは1つのプロセスで処理を行うため、n_gpusは1に固定して使うことが必須。

                 # Dask側でMulti GPU Processの設定をしているので、ちゃんと複数で計算してくれています。

    'tree_method': 'gpu_hist',
    'objective': 'reg:squarederror',
    'grow_policy': 'lossguide'
}

bst = dask_xgboost.train(client, params, x_train, y_train, num_boost_round=params['num_rounds'])

# ここで私の場合は、謎のリスタートが生じることが何度かありました。
# エラー内容が表示されませんでしたが、どうやらメモリ容量不足のOOMエラーの様でした。
# エラーが起きてプロセスがリスタートされると計算が一生終わらないです。
# メモリ管理は工夫する必要があるようです。
# 今回は単純にデータ量を絞ることで学習を完遂させています。

最後に予測処理と評価です。うまく学習が出来たようです。

pred = dask_xgboost.predict(client, bst, x_test)
test = dask.dataframe.multi.concat([pred], axis=1)

test['squared_error'] = (test[0] - y_test['tip_amount'])**2

# 予測出力結果は、dask.dataframe.multi.concatを用いることで、
# [dask_cudf.Series]から、dask_cudf.DataFrameに変換を行っています。

rmse = np.sqrt(test.squared_error.mean().compute())
print('rmse value:', rmse)

# rmse value: 0.7319588230100298

cuspatialを使った位置情報分析の入門

次に、RAPIDSのcuspatialというライブラリを紹介します。
cuspatialはcudfの機能を使って、位置情報データを簡単かつ高速に扱えるようにするためのライブラリです。

現状使える機能のリスト

cuspatial/core/gis.py
direcrted_hausdorff_distance
トラジェクトリデータを入力として、全件のdirected Hausdorff distanceを計算する

haversine_distance
地球の形状を考慮し、球面上で2点間の距離を算出

lonlat_to_xy_km_coordinates
点群を、ある基準点の緯度経度からのkm距離で表したx, y座標へ変換

point_in_polygon_bitmap
点群とポリゴンを入力として、各点がポリゴンに含まれるかを計算しbool値を返却する

window_points
点群を入力として、指定のバウンディングボックスの領域内にある点群の部分集合を返却する

cuspatial/core/trajectory.py

spatial_bounds
トラジェクトリーを外包するバウンディングボックスを返却

derive(x_coords, y_coords, object_ids, timestamps)
緯度経度、ID、タイムスタンプからトラジェクトリを抽出

distance_and_speed
derive()によって抽出されたトラジェクトリーから、距離と速さを算出

cuspatialの性能評価

それでは、cuspatialを試してみましょう。
ここでは、Microsoft Researchが過去に公開しているT-driveという
タクシートリップのトラジェクトリーデータを用いて、hausdorff distanceの計算を行ってみます。

T-drive
https://www.microsoft.com/en-us/research/publication/t-drive-trajectory-data-sample/

hausdorff distanceは点群の集合と点群の集合の距離を計算するものです。
今回入力するのはタクシーのトラジェクトリ = 移動軌跡データです。
頂点数がかなりの数になりますが、hausdorff distanceを求めるには頂点の組み合わせの数だけ、
全て距離を計算する必要があるため、かなり計算量が重いタスクと言えると思います。

それでは実際に計算してみましょう。
データの読み取りです。

import pandas as pd

csv = 'T-drive/tdrive.txt'
data = pd.read_csv(csv, header=None)
data.columns = ['id', 'time', 'lon', 'lat']
data0 = data[data['id'].isin(range(1,100))]

単コアCPUの場合の処理と速度

%%timeit
import numpy as np
from scipy.spatial.distance import directed_hausdorff

for i in range(1, 100):
    point0 = np.array(data0[data0['id'] == i][['lon', 'lat']])
    for j in range(1, 100):
        point1 = np.array(data0[data0['id'] == j][['lon', 'lat']])
        sklearn_distance = directed_hausdorff(point0, point1)
33.5 s ± 133 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

複数コアCPUの場合の処理と速度
ここでは、8コア16スレッドのintel CPUを使っています。

%%timeit
import numpy as np
from concurrent import futures
from scipy.spatial.distance import directed_hausdorff

future_list = []
with futures.ProcessPoolExecutor(max_workers=16) as executor:
    for i in range(1, 100):
        point0 = np.array(data0[data0['id'] == i][['lon', 'lat']])
        for j in range(1, 100):
            point1 = np.array(data0[data0['id'] == j][['lon', 'lat']])
            future = executor.submit(fn=directed_hausdorff, p0=point0, p1=point1)
            future_list.append(future)
    _ = futures.as_completed(fs=future_list)
21.9 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

workerの数を様々変更して計算してみたのですが、
単コアCPUと比べて思ったよりも計算時間が縮みませんでした。
負荷を観察した所、個々のプロセスはCPUに対する負荷は少ないものの、
しかし大量に距離計算を繰り返す必要があるため、CPUの逐次処理性能を余している印象を受けました。

やはり、大量の頂点の距離計算は、並列計算が得意なプロセッサに向いているタスクだ、ということかと思います。

RAPIDS cuspatialによるGPUを使った計算速度
2つのTITAN-Xを使っています。

%%timeit
import numpy as np
import pandas as pd
import cuspatial
from cudf import Series

# pandas, numpyからcudfに変換して利用します
cnt = Series(data0.groupby('id').count().iloc[:,0])
lon = Series(data0.lon)
lat = Series(data0.lat)
distance = cuspatial.directed_hausdorff_distance(lon, lat, cnt)
450 ms ± 4.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

RAPIDSを用いた処理は、CPUマルチスレッド処理の50倍くらい高速という結果となりました。
さすがにGPUは速いですね!

cuspatialはRAPIDSの中でも機能的に新しいため、
まだまだ機能は僅かしかないのですが、今後に期待したいと思います。

cudf, cumlは、かなり機能も充実し始めているため、
機械学習の計算速度に悩みを抱えている方は試してみてはいかがでしょうか?

それでは、ありがとうございました。

70
40
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
70
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?