42
45

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

GCPのサーバレスで機械学習FX売買サービスを実現

Last updated at Posted at 2019-10-20

こんにちは、@THERE2 です。

学習済みの機械学習モデルを使ってFXの自動売買を行うサービスを、GCPのサーバレス環境で実現してみました。

VM(Compute Engine)を使えばローカル環境と同じように運用できるのですが、VMはずっと起動させておくとそれなりに費用がかかってしまいます。
時間起動するFX自動売買であれば、実行時間に応じて課金されるCloud Functionsの方が安く運用でき、最先端のクラウド技術にも触れられるという事で自分でやってみる事にしました。

FX自動売買の機能要件

今回のFX自動売買の機能要件と処理の流れは次のようになります。
物理ファイルや外部API連携、DBアクセスも必要となり、それなりに幅広くクラウドの機能を使う必要がありました。

Oanda APIを使った売買については、先日の記事を参考にして頂ければ幸いです。

  • 月〜金の決まった時間(まずは1日1回、13時)に起動する。
  • Oanda APIを使ってUSD_JPYの最新の価格を取得する。
  • データの前処理を行う。(PandasのDataFrame利用)
  • FXのテクニカル指標を計算する。(TA-Lib利用)
  • 機械学習の学習済みモデルをロードする。(今回はLightGBMのモデル)
  • 学習済みモデルを使って将来の価格が上がるか下がるかの予測を行う。
  • 予測結果に応じて、Oanda APIを使って買い注文または売り注文を行う。
  • 予測結果、注文実施内容をおって分析するためにDBに格納する。
  • 外部から勝手に実行されないように、外から認証無しでは実行できないようにする。

これらの事が、Compute EngineではなくサーバレスのCloud Functionsで実現できるのか心配でしたが、結果的にはGCPの各サービスを活用する事で問題なくできました。
苦労もありましたが、一回実装すれば次からは容易に対応できると思います。

GCPで利用したサービス

上記の機能要件を満たすためには、Cloud Functions以外にもいくつかのサービスを組み合わせる必要がありました。

GCPのクラウドサービス 利用用途
Cloud Storage 機械学習の学習済みモデルを置いておきます。
Cloud Firestore ドキュメントDBです。のちのちの解析用に予測結果、注文実施内容を保存しておきます。
Cloud Scheduler cronの時間起動でイベントを登録するためのサービスです。
Pub/Sub Cloud Schedulerからのイベントを元にCloud Functionsを呼び出すサービスです。
Cloud Functions イベントトリガーでPythonの関数を1回実行します。関数の中身だけ実装すればよいという事で、今回のサーバレスの主役です。
IAMと管理 DBやストレージの認証を行うためのサービスです。

今回はFX予測して売買するためのシンプルな処理ですが、クラウド関数、クラウドストレージ、クラウドDB、クラウドスケジューラー、クラウド認証と幅広くサービスを活用する事になり、大変勉強になりました。

これだけの事を物理的にサーバを立てるところから実施したらかなり大変ですが、GCPでは簡単に実現でき、本当にサービス開発が容易になってきているなと感じました。

各サービスの詳細

今回の実装概要をまとめておきたいと思います。

Cloud Storage

Cloud Functionsで機械学習の学習済みモデルを読み込むためのStorageをセットアップしました。
バケットに名前を付けて保存します。
アクセス制御については、ファイル単位の制御は必要ないので、バケットレベルとしました。
image.png

バケット作成後に、ローカルの機械学習の学習済みモデルをアップロードしておきました。
「ファイルのアップロード」ボタンからローカルのファイルをサクッとアップロードできます。

image.png

Cloud Firestore

処理結果のログをドキュメントDBに残していきたかったので、Firestore DBをセットアップしました。
利用開始には難しい事は何もなく、頭を悩ますような設定もありません。
スキーマレスなのでテーブルの定義、型定義など何もなく、コレクション名を決めるぐらいです。
これで自動でスケールされるのですから、DB管理者は不要ですね。

以下はすでにCloud Functionsからデータを投入済みの状態です。

image.png

Cloud Pub/Sub

Cloud FuncitonsをHTTPトリガーにしてしまうと、外部公開されてしまうということで、Pub/Subを挟んで呼び出す事にしました。
これを使うと一時的に大量のリクエストが発生してもこのキューに溜め込んでCloud Functionsが順に処理できるようになります。
また、Cloud FunctionsをHTTPで外部公開する必要がないため、セキュリティ的にも安心です。

作成はPub/Subの「トピックの作成」からトピックIDを入力して作成すれば完了です。

image.png

Cloud Scheduler

今回のFX自動売買は、月〜金で1日1回、13時に実行することにしました。
cronと同じ形式で実行タイミングをセットできます。
実行時間はどのタイミングで実行するかのタイムゾーンを指定し、ターゲットは上記で指定したPub/Subのfx_topicを指定しました。
ペイロードはいまいちよく分かっていませんが、固定パラメータを渡せるようです。
これで時間になるとPub/Subにトリガーが送られます。

image.png

ジョブを作成した後は、実際にその時間にならなくても、「今すぐ実行」ボタンで実行できるのが便利です。

image.png

Cloud Functions

ここまでで必要な機能が揃ってきたので、いよいよCloud Functionsを実装していきます。

  • メモリは256 MBも割り当てておけば十分かと思います。
  • トリガーにはPub/Subを、トピックにSchedulerと設定したものと同じトピックをセットします。これで、Schedulerにて登録されたトリガーをCloud Functionsが検知してトリガー毎に1回処理できるようになります。
  • ランタイムは「Python 3.7」が選択できます。

Cloud Functionsの主役はNode.jsのようで、Googleのドキュメントではjsのサンプルコードはすぐに見つかるのですがPythonはあまりなく、ところどころ苦労がありました。
ただ、機械学習のライブラリやPandasを使うためにはPythonがないと困りますね。
なお、現時点ではGolangは選択できますが、.netやjavaは選択できないようです。

image.png

ソースコードとしては、main.pyrequirements.txtのみを編集します。
main.pyでは複数の関数を定義できますが、呼び出す関数を一つ定義しておく必要があります。
デフォルトでは、hello_pubsubを呼び出すようになっており、引数にevent, contextを受けるようになっています。

image.png

requirements.txtの内容

ここに必要なpythonパッケージを記入しておくと、デプロイ時に反映してくれます。(pip installが実行されるようです)
Cloud Storageを利用する場合は、google-cloud-storage、firestoreを利用する場合はgoogle-cloud-firestoreを記入しておく必要があります。
pandasnumpyはデータサイエンスには欠かせないライブラリですね。
最強の機械学習アルゴリズムのlightgbmも問題ないくinstallしてくれました。
ta-libはバイナリがなくてimportエラーになり詰んだ、、と思ったのですが、talib-binaryの方は問題なくimportできて助かりました。

# Function dependencies, for example:
# package>=version
pandas
numpy
lightgbm
oandapyV20
talib-binary
google-cloud-storage
google-cloud-firestore
jpholiday

main.pyの内容

ライブラリのインポート、DB、Storageの利用準備

まずは利用する必要なライブラリをimportします。

import base64
import requests
import json
import datetime
import jpholiday
from pandas.tseries.holiday import USFederalHolidayCalendar as uscalendar
import talib
import pandas as pd
import numpy as np
import lightgbm as lgb

Cloud Storage、Cloud Firestoreにアクセスするための準備です。
次のようにclientを設定しておくことで簡単に扱う事ができます。

from google.cloud import storage
storage_client = storage.Client()

from google.cloud import firestore 
db = firestore.Client()

価格データの取得、前処理の実施

続いて最新の価格データを取得して前処理を行う部分ですが、今回の記事の範囲を超えるため詳細を割愛します。

#1. 最新データの取得
def get_candledata_from_oanda():
    # oandaからデータを取得してpandas DataFrameに格納する処理。
    # 今回の記事の範囲を超えるため、詳細は割愛。
    return df

# 2. データ加工
def preprocess_data(df):
    # 前処理の実施。
    # 日付を加工したり、価格を前日比にしたり、テクニカル指標を追加したり。
    # 今回の記事の範囲を超えるため、詳細は割愛。

    return df

機械学習モデルによる予測の実行

ここで、Cloud Storageに保存しておいた学習済みのモデルをblobとして取り出し、いったん/tmpに保存します。
/tmpはメモリ上のCloud Functionsの一時ファイル保存用で、実行が終わると消えてしまいますが、今回のように一度Cloud Storageから取り出したファイルを置いておくのに便利です。
LightGBMのBoosterにそのモデルファイルを読み込ませ、LightGBMで予測を行います。

#3. 予測の実行
def run_prediction(df):
    bucket = storage_client.get_bucket('there2_ml_model')
    blob = bucket.get_blob('lightgbm_model_v1.mdl')

    blob.download_to_filename("/tmp/tmpmodel.mdl")

    # lightGBMのトレーニング済みのモデルの読み込み
    clf = lgb.Booster(model_file="/tmp/tmpmodel.mdl") 

    # 予測の実行
    y_pred = clf.predict(df, num_iteration=clf.best_iteration) 

    return y_pred

予測結果を元に注文を指示し、予測結果と注文結果をドキュメントDBに格納

上記までの予測結果を元に、売買の注文を行います。
Oanda APIを使ってpythonから直接売買の指示を出すことが出来ます。

具体的なやり方は先日の記事に記載しました。
機械学習でFX:Oanda APIを使ってPythonから自動売買する

その後、ドキュメントDBにJSON形式で内容をセットすればDBに格納されます。簡単過ぎますね・・

# 4. 注文の実施
def execute_transaction(y_pred):

    # 予測結果を元に売り買いの指示を出します。
    # 今回の記事の範囲を超えるため、詳細は割愛。
    # ログをDBに出力
    doc_ref = db.collection(u'cloud_function_log').document()
    doc_ref.set({
        u'datetime' : firestore.SERVER_TIMESTAMP, 
        u'y_pred' : y_pred[0].astype(np.float),
        u'order' : {
            u'accountBalance' : 3000000,
            u'accountID' : u'dummy3',
            u'instrument' : u'USD_JPY',
            u'orderID' : u'99999',
            u'price' : 110.222,
            u'reason' : u'MARKET_ORDER',
            u'units' : 10000
        }
    })
    return "OK"

全体の実行

hello_pubsubが最初に呼ばれるようになっていますので、そこからmain_processを呼び、main_processが各関数を順に呼ぶようにしています。

def main_process():
    df = get_candledata_from_oanda()
    df = preprocess_data(df)
    y_pred = run_prediction(df)
    oanda_res = execute_transaction(y_pred)
    
    return "OK"

def hello_pubsub(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    res = main_process()
    print(res)

実装が完了したら、Pub/Subからトリガーしなくても「テスト」タブから関数を実行して試してみる事ができます。
エラーになった場合はログ表示からエラーの内容を確認できます。

image.png

IAMと管理

もしかしたら必要なかったのかもしれませんが、cloud functionsがstorageを参照できるような権限を設定しました。
まず、cloud functionsの概要から「サービスアカウント」を確認します。

image.png

cloud storageのバケットの詳細から上記の「サービスカウント」をメンバーとして追加し、ストレージの閲覧者、作成者の権限を付与しました。

image.png

まとめ

以上でGCPのサービスをフル活用して最小限のコーディングで機械学習の自動売買サービスを作成する事ができました。
費用もこれぐらいの使い方であれば月に数ドルで収まるように思います。
1年で無料枠を使い切ることもないでしょう。

いままでインフラ担当者とか基盤担当者が苦労して構築してきた事が、クラウドサービスを使えばほぼ問題なく実現できてしまうのは、凄くいい時代になりましたね。
もちろん大規模なサーバ運用ではそうもいかないもかもしれませんが、スモールサービスの構築であればワードプレスぐらいのノリで労力かけずにそれなりの物が作れてしまえる、という印象を受けました。

今後は、IT技術者がシステムの運用よりも、IT知識を活かして新しいサービスを生み出していく事が求められていくのかもしれませんね。

@THERE2

42
45
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
42
45

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?