LoginSignup
21
15

More than 5 years have passed since last update.

Cloud Dataflow(Python)でXGBoostを動かす

Posted at

前回の記事に引き続きCloud Dataflowで機械学習パイプラインを動かす手順の備忘録。

やりたいこと

前回はCloud Dataflowで使われるノードにプリインストールされていたscikit-learnとpandasを使って機械学習を行いましたが、実際の機械学習のパイプラインではOpenCVなど前処理加工するライブラリを入れたり、好きな機械学習ライブラリをインストールして分析してみたくなると思います。
今回はCloud Dataflowへの任意ライブラリのインストール手順の例としてKaggleでも人気のXGBoostをインストールして動かす方法を書いていきたいと思います。

Dataflowへのライブラリのインストール手段

Cloud Dataflow(Python)で好きなライブラリをインストールする手段としては公式ドキュメントにあるとおり、大きく以下3つの手段があるようです。

  • PyPIサーバで公開されているPythonライブラリ
    • 該当ライブラリをrequirements.txtに書き出す
    • 実行時にオプション--requirements_fileで指定する
  • PyPIサーバにはないPythonライブラリ
    • ローカルでインストールされたパッケージのパスを確認する
    • 実行時にオプション--extra_packageで指定する
    • (複数存在する場合は以下setup.pyのinstall_requiresで指定する)
  • Python以外の依存が存在するライブラリ
    • 該当ライブラリインストール手順を記述したカスタムsetup.pyを作る
    • 実行時にオプション--setup_fileで指定する

XBGoostのようなライブラリは3つめの手順が必要になります。
以下XBGoost用のカスタムsetup.pyの作り方をみていきます。

カスタムsetup.pyの作り方

公式リポジトリのexampleコードにもあるようにsetuptoolsのお作法に従って記述していきます。Python依存ライブラリがあればinstall_requiresに記述しますが、インストール用に独自シェルコマンドを実行したい場合はsetuptoolsのCommandクラスを継承したコマンドクラスを作りコマンドを実行手順を記述していきます。
以下は上記公式exampleコードを手本にXGBoostのインストールコマンドを書いたものです。基本的にはXGBoostのインストール手順通りですがsubprocess.Popenで実行するためにディレクトリ移動を伴うコマンドについては実行パスを指定するようにしています。

setup.py
from distutils.command.build import build as _build
import subprocess
import setuptools


class build(_build):
    sub_commands = _build.sub_commands + [('CustomCommands', None)]

CUSTOM_COMMANDS = [(["sudo","apt-get","update"],"."),
                   (["sudo","apt-get","install","git","build-essential","libatlas-base-dev","-y"],"."), #"python-dev","gfortran"
                   (["git","clone","--recursive","https://github.com/dmlc/xgboost"],"."),
                   (["sudo","make"],"xgboost"),
                   (["sudo","python","setup.py","install"],"xgboost/python-package"),
                   (["sudo","pip","install","xgboost"],".")]


class CustomCommands(setuptools.Command):

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        print 'Running command: %s' % command_list[0]
        p = subprocess.Popen(command_list[0], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=command_list[1])
        stdout_data, _ = p.communicate()
        print 'Command output: %s' % stdout_data
        if p.returncode != 0:
            raise RuntimeError('Command %s failed: exit code: %s' % (command_list[0], p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


REQUIRED_PACKAGES = []

setuptools.setup(
    name='xgboost-install-examble',
    version='0.0.1',
    description='xgboost workflow packages.',
    packages=setuptools.find_packages(),
    #install_requires=REQUIRED_PACKAGES,
    cmdclass={
        'build': build,
        'CustomCommands': CustomCommands,
    }
)

setup.pyを作成したらpipelineのsetup_fileオプションで指定します。
以下はPythonコード内でのオプション指定例です。
(自分の環境ではfullパスで指定しないと動かないようでした)

xgboost_pipeline.py
setup_options = options.view_as(beam.utils.pipeline_options.SetupOptions)
setup_options.setup_file = "/home/orfeon/dataflow-sample/python/xgboost/setup.py" # set fullpath

オプションで指定してPipelineを実行し、インストールが通っていればパイプライン内でxgboostのコードが動きます。自分は前回の記事のコードでsklearnのregressorを使っていた部分をxgboostに置き換えた以下のコードが動きました。

xgboost_pipeline.py(learn_predict内部)
中略

year_max = data["year"].max()
train = data[data["year"] <  year_max]
test  = data[data["year"] == year_max]

dtrain = xgb.DMatrix(train[learn_attrs].values, label=train[target_attr].values, feature_names=learn_attrs)
dtest  = xgb.DMatrix(test[learn_attrs].values, label=test[target_attr].values, feature_names=learn_attrs)

evals_result = {}
watchlist = [(dtrain, 'train'),(dtest, 'test')]
best_params = {'objective': 'reg:linear',
               'eval_metric': 'rmse',
               'learning_rate': 0.01,
               'max_depth': 3,
               'colsample_bytree': 0.65,
               'subsample': 0.55,
               'min_child_weight': 7.0,
               'reg_alpha': 0.6,'reg_lambda': 0.7,'gamma': 1.2}

model = xgb.train(best_params, dtrain,
                  num_boost_round=1000,
                  evals=watchlist,
                  evals_result=evals_result,
                  verbose_eval=True)

test.loc[:, "predict"] = model.predict(dtest)

おわりに

カスタムsetup.pyを作ればいろいろなライブラリをDataflow上で動かすことができるようになります。お気に入りの機械学習ライブラリを使った大量実行だけでなく、画像加工ライブラリを使った画像データの加工など大量の計算機リソースを必要とする前処理の手軽な実行などにもDataflowの使い道が広がるのではないかと思います。
Python2系でしか動かない、streamingモードで動かないなどまだ一部制約もありますが徐々に対応されていくと思われます。

※分散版XGBoostのDataflow対応

今回動かしたXGboostはそれぞれ分散するものの学習自体は単一ノード上で動作するものでしたが、複数のノードに分散して学習するアルゴリズムが動くバージョン(XGBoost4J)も開発されております。実行エンジンとして現在はSpark,Flink上で動くようですが、開発ロードマップではCloud Dataflow(Apache Beam)対応も含まれているようで今後が楽しみです。

21
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
15