QuantXを初めて触った時、これローカルで開発したいんだけど!!!って思ってたのですが、聞いたらどうも密かにその機能がリリースされてるっぽいとのことで、自分でやってみたので以下手順と若干詰まったところなどできるだけ分かりやすくまとめていきます。
#前準備
とりかかる前の自分の環境は
- Jupyter
- Docker
があるという状況でした。
「JupyterもDockerもまだ入れてないよ」という人は、
- Jupyter Lab のインストール、設定から動かすまで
-
DockerをMacにインストールする
などを参照してもらえると良いかと思います。
#docker-composeをインストールする
docker-composeをインストールして、quantx-labを実行しましょう。
Docker Compose - インストールを参照して、docker-composeをインストールしてください。
#quantx-labを起動する
dockerでquantx-labを立ち上げます。
ターミナルを立ち上げて、
docker-compose up lab
もしくは
docker run -p 8888:8888 smartradev/quantx-lab
を入力してください。
ダウンロード後、「このURLをブラウザ上にコピペしてね!」というメッセージが出てくると思うので、記載されているURLをコピペしましょう。
おそらく、http://(********* or 127.0.0.1):8888?token=**************
みたいな感じになると思うので、()内を直してコピペしましょう。
#ログイン
ログイン画面が出てくると思うので、先ほどのtoken=*************
の値部分だけをコピペしてログインしましょう。
上手く行くと、jupyterが立ち上がると思います。
すでにトークンを作成している人はエラーが出るみたいです。ターミナルでjupyter notebookを立ち上げた時に表示されるtokenを参照しましょう。
#早速動かしてみる
早速QuantX SDKを使ってみます。
###アクセスキーの取得
SDKを使うためには、アクセスキーとシークレットキーが必要です。QuantXにログインして、https://factory.quantx.io/settings/tokenから取得してください。トークン名はなんでもいいです。一度しか発行されないので、忘れずに保管しましょう。
###新規プロジェクトを作成
# public_keyとsecret_keyは各自ご自身のキーで置き換えてください
access_key = "************************************"
secret_key = "************************************"
# sdk をインポート
from quantx_sdk import QX
# 新規プロジェクト作成
qx = QX(access_key, secret_key)
# 新規プロジェクト名。5文字以上の適当な文字列
new_name = "New Project"
# 新規プロジェクトを作成。
prj = qx.new_project(new_name)
先ほど取得したアクセスキーとシークレットキーを変数内に格納して、新規プロジェクトを作成してください。今回はNew Projectというプロジェクト名にしました。実行後、ブラウザで新規プロジェクトが作成されていることが確認できると思います。
###ハッシュ番号を取得
# ハッシュ番号を格納,確認.
prjct_hash = prj.project_hash
QuantXではプロジェクトは名前ではなくハッシュ番号で管理しているようです。コードをプロジェクトにプッシュする上で、プッシュ先のハッシュ番号を間違えないように気をつけましょう。
###コード取得
新規作成したプロジェクトには、サンプルコードが与えられています。
prj.source()
で取得できます。
#自分のアルゴリズムをアップロードして実行する。
では、自分で書いたアルゴルズムをQuantXにアップロードしてバックテストを行ってみます。
###ソースコードをアップロードする
今回は、前回僕が書いたMACDのアルゴリズムを使ってみます。
アルゴリズムは文字列で渡します。
source = """import pandas as pd
import numpy as np
import talib as ta
def initialize(ctx):
# 設定
ctx.logger.debug("initialize() called")
ctx.configure(
channels=
{
"jp.stock": {
"columns": [
"close_price_adj"
],
"symbols": [
"jp.stock.1414", "jp.stock.2269",
"jp.stock.2292", "jp.stock.2371",
"jp.stock.2379", "jp.stock.2413",
"jp.stock.2427", "jp.stock.2702",
"jp.stock.2782", "jp.stock.2809",
"jp.stock.3003",
"jp.stock.3053",
"jp.stock.3092", "jp.stock.3141",
"jp.stock.3382", "jp.stock.3407",
"jp.stock.3665", "jp.stock.3676",
"jp.stock.4063", "jp.stock.4343",
"jp.stock.4452", "jp.stock.4519",
"jp.stock.4543", "jp.stock.4704",
"jp.stock.4751", "jp.stock.4911",
"jp.stock.4974", "jp.stock.5108",
"jp.stock.6028", "jp.stock.6098",
"jp.stock.6194", "jp.stock.6503",
"jp.stock.6645", "jp.stock.6758",
"jp.stock.6866", "jp.stock.7269",
"jp.stock.7272", "jp.stock.7606",
"jp.stock.7936", "jp.stock.8020",
"jp.stock.8252", "jp.stock.8591",
"jp.stock.8929", "jp.stock.9007",
"jp.stock.9086", "jp.stock.9433",
"jp.stock.9613"
]
}
}
)
def _my_signal(data):
cp=data["close_price_adj"].fillna(method='ffill')
macd=pd.DataFrame(data=0,columns=cp.columns,index=cp.index)
macdsignal=pd.DataFrame(data=0,columns=cp.columns,index=cp.index)
macdhist=pd.DataFrame(data=0,columns=cp.columns,index=cp.index)
dmacd = pd.DataFrame(data=0,columns=cp.columns,index=cp.index)
dmacdsignal = pd.DataFrame(data=0,columns=cp.columns, index=cp.index)
for (sym,val) in cp.items():
macd[sym], macdsignal[sym], macdhist[sym] = ta.MACD(cp[sym].values.astype(np.double), fastperiod=9, slowperiod=26, signalperiod=9)
dmacd[sym] = np.gradient(macd[sym])
dmacdsignal[sym] = np.gradient(macdsignal[sym])
#通常のMACD
#buy_sig = ((macd > macdsignal) & (macd.shift(1) < macdsignal.shift(1)))
#sell_sig= ((macd < macdsignal) & (macd.shift(1) > macdsignal.shift(1)))
#改良版
buy_sig = ((macd > macdsignal) & (macd.shift(1) < macdsignal.shift(1)) & (dmacd/dmacdsignal>5.0) & (abs(dmacdsignal) > 1.0))
sell_sig = ((macd < macdsignal) & (macd.shift(1) > macdsignal.shift(1)) & (dmacd/dmacdsignal>5.0) & (abs(dmacdsignal) > 1.0))
return {
"macd": macd,
"macdsignal": macdsignal,
"hist":macdhist,
"dmacd":dmacd,
"dmacdsignal":dmacdsignal,
"buy:sig": buy_sig,
"sell:sig": sell_sig,
}
# シグナル登録
ctx.regist_signal("my_signal", _my_signal)
def handle_signals(ctx, date, current):
'''
current: pd.DataFrame
'''
df = current.copy()
# 買いシグナル
df_long = df[df["buy:sig"]]
if not df_long.empty:
for (sym, val) in df_long.iterrows():
#ctx.logger.info(val)
sec = ctx.getSecurity(sym)
msg = "買いシグナル"
sec.order_target_percent(0.10, comment= msg)
# 売りシグナル
df_sell = df[df["sell:sig"]]
if not df_sell.empty:
for (sym, val) in df_sell.iterrows():
sec = ctx.getSecurity(sym)
msg = "売りシグナル"
sec.order_target_percent(0, comment= msg)
"""
次に、プロジェクトを指定してソースコードをアップロードします。先ほど作ったprjct_hashを使って、プロジェクトオブジェクトを作り、そのオブジェクトに対してupload_source()メソッドでコードをUploadします。
# hash番号を与えてプロジェクトオブジェクトを作ります
prj = qx.project(prjct_hash)
# プロジェクトにコードをUpload
prj.upload_source(source)
'code': 200が返ってくれば成功です。
###バックテストを実行する。
# バックテストパラメータ
bt_parameter = {
'engine': 'maron-0.0.1b',
'from_date': '2015-12-01',
'to_date': '2018-12-01',
'capital_base': 10000000}
# backtest の実行.
bt = prj.backtest(bt_parameter)
## backtest result インスタンスを作成
join = bt.join()
## 結果のサマリーを取得
summary = join.summary()
使用するエンジン、期間、資金額などのパラメータを指定してあげて、バックテストを実行してください。
summary内に結果が出力されているはずです。
#おわりに
JupyterでQuantXを実行してみました。ローカルで開発できるようになり、かなりアルゴリズムが書きやすくなったのではないでしょうか。pyfolioなどのライブラリを使えば、もっと精緻なバックテストの分析ができるみたいです。