Python
MachineLearning
DeepLearning
DAAL

CPUに最適化した計算ライブラリ

More than 1 year has passed since last update.

ChainerのCPU最適化版のIntel Chainerが先月公開されたようなのでGithubを眺めていたところ、Intelから科学計算用に様々なIntel製品に最適化されたライブラリが出ていることに気づき調査してみました。はじめにIntel Chainerの紹介を一応しますが、そのあと全く触れませんのでご了承くださいw

Intel Chainerとは

PFNが開発したChainerをIntelが主に自社のプロダクトに最適化した(GPUはサポートされています)深層学習フレームワーク。公式のGithubページを見るとMKL-DNNというライブラリを使ってXeonやXeon Phiプロセッサでの演算に最適化している模様。

MKL-DNN

解説ページGithubページを見ると、IntelのXeonやXeon Phiに特化した深層学習用のライブラリのよう。72コアとかを上手に使いきるように書くのは難しい...という部分をうまく処理してくれるみたいです。
解説ページにはMKL-DNNの他にもDAALとか別のライブラリについても触れられています。どんなものがあるのか一応調べてみました。

Intelの計算高速化ライブラリ群

Intelが出している高速化ライブラリについてXLSOFTの解説ページを見るとMKL-DNN以外だと以下があるようです。

  • インテル® MKL
    • 最適化済みの数学ライブラリ
    • conda installでインストール可能
    • 高度にベクトル化、スレッド化された関数を使用できCPUの性能を最大限発揮できるように最適化されている
  • インテル® DAAL
    • MKLよりも高次のライブラリ
    • 前処理、変換、解析、モデル化、検証、意思決定用のアルゴリズムを提供
    • Intelのプロセッサーの命令セット、レジスター、メモリー・アーキテクチャー向けに高度に最適化
    • Hadoop、Spark、SQL、Non-SQL、ファイル、インメモリーDBに簡単に接続でき、高いスループットを実現できる(らしい)
    • 普通にSparkを実行するよりもPCAの計算でマルチノード計算で4.5倍程度早くなるらしい(こちらの資料参考)
  • インテル® MPI ライブラリ
    • マルチコア/マルチクラスタの並列計算最適化用(メッセージパッシング)ライブラリ
    • マルチコア・プラットフォームの共有メモリーパスの最適化
    • マルチレール機能による高帯域幅、プロセス間通信の向上
  • インテル® TBB
    • こちらもマルチコアにおける並列計算の最適化用ライブラリ
    • MPIよりはソフトウェアよりっぽい
    • ループ処理なんかをスレッドセーフな形で最適化してくれる感じかな?

これらのライブラリの関係性について主にMKLとDAALについてまとめてある資料がありましたので、そちらのリンクを貼り付けておきます(2015年なので古いのですが最近の資料にあまり良い図が見当たらなかったのでご了承ください)。
MKLが下地でその上にDAALが乗っているようです。DAALはMKLの他、TBBとMPIも使用しています。

各深層学習フレームワークと上のライブラリ群の対応

XLSOFTのページで各種フレームワークの対応が書かれています。が、ChainerはGithub上ではMKL-DNN対応しているようなので、ちょっと情報が古いような気も...

本当はMKL-DNNを実行したいのですが、Xeon Phiなど持っているはずがないので、データを扱う人間にダイレクトに恩恵がありそうなDAALについて調査しました。

DAAL

環境構築

DAALのAPIはJava、C++、Pythonサポートしており、PyDAALというものがあります。これはCondaを使って簡単にPyDAALに対応した環境を作ることができます。PyDAALについてはこちらをご覧ください。

環境は以下のコマンドでcondaで構築できます。

conda config --add channels intel
conda create --name idp intelpython3_full python=3

LinuxとMacユーザの方はsource activate idp、windowsでBoW使ってない方はactivate idpでDAAL環境を使用できます。

Benchmark

PyDAAL tutorialをもとにSVMを対象に速度を計測しました。PyDAALとsklearnで計測しています。

条件

  • データ
    • sklearnのdigitsデータ
    • 10クラス(0~9までの数字)
    • 訓練1,697枚、テスト100枚
  • モデル
    • カーネルは線形
    • マルチクラスの計算にはOVOを使用
  • CPU
    • i7 7500U(2コア、4スレッド)

サンプルコード

ほぼtutorialそのままですが、tutorialが作成された時とAPIが変わってるいるようですので一部書き換えています。

データロード

PyDAALではデータアクセスを高速化するためにHomogenNumericTableクラスを使用して、メモリへの格納位置を最適化しています。

from sklearn.datasets import load_digits
import numpy as np
from daal.data_management import HomogenNumericTable
# HomogenNumericTableは数値データマネージメント用のクラス
# メモリアクセスを最適にするようにデータを格納してくれる


def load_data_for_daal():
    digits = load_digits()

    n_samples = len(digits.images)
    data = np.ascontiguousarray(digits.data, dtype = np.double)
    labels = np.ascontiguousarray(digits.target.reshape(n_samples,1), dtype = np.double)

    train_data = HomogenNumericTable(data[:-100])
    train_labels = HomogenNumericTable(labels[:-100])

    test_data = HomogenNumericTable(data[-100:])
    test_labels = HomogenNumericTable(labels[-100:])

    return train_data, train_labels, test_data, test_labels



def load_data_for_sklearn():
    digits = load_digits()

    n_samples = len(digits.images)
    data = np.ascontiguousarray(digits.data, dtype = np.double)
    labels = np.ascontiguousarray(digits.target.reshape(n_samples, ), dtype = np.double)

    train_data = data[:-100]
    train_labels = labels[:-100]

    test_data = data[-100:]
    test_labels = labels[-100:]

    return train_data, train_labels, test_data, test_labels

DAAL

DAALではまずマルチクラスSVMを定義する必要があります。以下記載します。trainingとpredictionで別々にオブジェクトを作る必要があるので、少し野暮ったい感じはあります。。

from daal.algorithms.svm import training as svm_training
from daal.algorithms.svm import prediction as svm_prediction
from daal.algorithms.kernel_function import linear, rbf
from daal.algorithms.multi_class_classifier import training as multiclass_training
from daal.algorithms.multi_class_classifier import prediction as multiclass_prediction
from daal.algorithms.classifier import training as training_params
from daal.algorithms.classifier import prediction as prediction_params

class MulticlassSVM:
    def __init__(self, nclasses):
        # クラス数
        self._nclasses = nclasses
        # 訓練用2値分類用のSVM(後程多値分類用に使う)
        # バッチ処理
        # Batch_Float64DefaultDenseでFP64で中間演算を行い、バッチ処理をすることを宣言
        # Online_Float64DefaultDense とかにすればonlineアルゴリズムも使えるようです
        # が、現時点ではSVMはバッチ処理にしか対応していないように見えます
        # 詳しくはドキュメントのlow_order_momentsを参照、
        # self._svm_training_alg = svm_training.Batch_Float64DefaultDense()
        self._svm_training_alg = svm_training.Batch_Float64Boser()
        # 推定用2値分類用のSVM(後程多値分類用に使う)
        # バッチ処理
        self._svm_prediction_alg = svm_prediction.Batch_Float64DefaultDense()

    def set_svm_params(self,
            cachesize = 1000000000,
            C = 1.0,
            sigma = 1.0,  # rbfの係数
            kernel = linear.Batch_Float64DefaultDense(),  # defaultでlinearカーネル
            shrinking = False):

        # 2値分類SVMにパラメタをセット
        self._svm_training_alg.parameter.cacheSize = cachesize    # kernel matrixのキャッシュサイズ
        self._svm_training_alg.parameter.C = C
        if getattr(kernel.parameter, 'sigma', None):
            kernel.parameter.sigma = sigma
        self._svm_training_alg.parameter.kernel = kernel
        self._svm_prediction_alg.parameter.kernel = kernel
        self._svm_training_alg.parameter.doShrinking = shrinking  # 収縮最適化を行うかどうかのフラグ


    def fit(self, data, labels):
        '''
        マルチクラスSVMによる学習
        '''

        # 2値分類と同様に訓練用のオブジェクトを用意
        # バッチ処理、中間演算はFP64、one-against-oneアルゴリズムを使用
        multiclass_training_alg = multiclass_training.Batch_Float64OneAgainstOne()
        multiclass_training_alg.parameter.nClasses = self._nclasses
        # 中で使用する2値分類器をセット
        multiclass_training_alg.parameter.training = self._svm_training_alg
        multiclass_training_alg.parameter.prediction = self._svm_prediction_alg

        # 訓練に使用する特徴量とラベルをセット
        multiclass_training_alg.input.set(training_params.data, data)
        multiclass_training_alg.input.set(training_params.labels, labels)

        # モデル構築
        return multiclass_training_alg.compute().get(training_params.model)

    def predict(self, model, testdata):
        # 訓練時と同じくマルチクラスSVMモデルのオブジェクトを生成
        multiclass_prediction_alg = multiclass_prediction.Batch_Float64MultiClassClassifierWuOneAgainstOne()
        multiclass_prediction_alg.parameter.nClasses = self._nclasses
        multiclass_prediction_alg.parameter.training = self._svm_training_alg
        multiclass_prediction_alg.parameter.prediction = self._svm_prediction_alg

        # 学習済みモデルのセット
        multiclass_prediction_alg.input.setModel(prediction_params.model, model)
        # テストデータのセット
        multiclass_prediction_alg.input.setTable(prediction_params.data, testdata)

        # 推定
        results = multiclass_prediction_alg.compute()
        return results.get(prediction_params.prediction)

次にデータを当てはめてfitとpredictを行い時間を計測します。100回の平均値を評価しました。

import sys
sys.path.append('data_loader')
sys.path.append('daal')
import time
import numpy as np
from daal.algorithms.kernel_function import linear, rbf
from quality_metrics import ClassifierQualityMetrics

from daal_svm import MulticlassSVM
from digits_loader import load_data_for_daal


nclasses = 10
classifier = MulticlassSVM(nclasses)

classifier.set_svm_params(
        cachesize = 32000000,
        kernel = linear.Batch_Float64DefaultDense(),
        shrinking = True)

train_data, train_labels, test_data, test_labels = load_data_for_daal()

train_times = []
test_times = []
for _ in range(100):
    start = time.time()
    svm_model = classifier.fit(train_data, train_labels)
    end = time.time()
    # print('train time:{}'.format(end-start))
    train_times.append(end-start)

    start = time.time()
    predictions = classifier.predict(svm_model, test_data)
    end = time.time()
    # print('infer time:{}'.format(end-start))
    test_times.append(end-start)

    # quality = ClassifierQualityMetrics(test_labels, predictions, nclasses)
    # print('Average accuracy: {:.2f}%'.format(quality.get('accuracy')*100))

print('average train time:{}'.format(np.mean(train_times)))
print('average test time:{}'.format(np.mean(test_times)))

結果は以下の通りです。
average train time:0.01678(s)
average test time:0.00199(s)

sklearn

次にsklearnのコードです。特に説明の必要はないと思いますので、コードと結果を以下に記載します。

import sys
sys.path.append('data_loader')
import time
from sklearn import svm, metrics
import numpy as np

from digits_loader import load_data_for_sklearn


train_data, train_labels, test_data, test_labels = load_data_for_sklearn()

sklearn_classifier = svm.SVC(kernel='linear', decision_function_shape='ovo')

train_times = []
test_times = []
for _ in range(100):
    start = time.time()
    sklearn_classifier.fit(train_data, train_labels)
    end = time.time()
    # print('train time:{}'.format(end-start))
    train_times.append(end-start)

    start = time.time()
    sklearn_predictions = sklearn_classifier.predict(test_data)
    end = time.time()
    # print('infer time:{}'.format(end-start))
    test_times.append(end-start)

    # sklearn_quality = metrics.accuracy_score(test_labels, sklearn_predictions)
    # print('Average accuracy: {:.2f}%'.format(sklearn_quality*100))

print('average train time:{}'.format(np.mean(train_times)))
print('average test time:{}'.format(np.mean(test_times)))

結果は以下の通りです。
average train time:0.04020(s)
average test time:0.00291(s)

比較結果

trainingで約2.4倍、testで約1.5倍DAALの方が早い結果になりました。4スレッドのCPUで計算した結果でこれですので、DAALの最適化が生きる条件で行えばさらに差が広がると考えられます。

コメント

データを扱う際にありがたいライブラリをIntelがオープンソースで公開されているようですので、使って幸せになる場合は積極的に使っていこうと思います。