2020/5/24追記
昔書いた拙いコードが今も読まれているようなのでこちらに簡単にまとめなおしましたので合わせて参考にしてください。
https://alumi-labo.com/article/study/ゼロから始めるBitcoinの自動取引①
#初めに
こんにちは。プログラミング初心者です。
仮想通貨の取引をFXで以前やっていたんですけど大負けしてしまって懲りて撤退してました。
が、しかし人間というのは喉元過ぎれば熱さ忘れる動物なわけで、今度は自動取引でやってみよう、と思い実装することにしました。
その動機としては
①以前も1日あたり5000~10000くらい儲けられていたが、1月の大暴落時に損切りできず焼け死んだ。ならばエントリーのルールを変えずに損切りを機械的に行えればちゃんと儲けられのではないか?と思ったから
②自動取引であるので(損得に関わらず)少なくとも自分の時間は取られない
あたりです。
ということで、システムトレードの実装目指してコードをコツコツ書きました。
解説サイトが多かったのでbitFlyerのAPIを利用したものにしました。
言語はRubyかPythonで結構迷いました。RubyはAPI playgroundのサンプルコードがあるため楽だろうとは思いましたが、最終的にpandasやnumpyなどのライブラリを使いたくなり、Pythonにしました。
###参考にさせていただいたサイト、教材
https://ryota-trade.com
http://nbviewer.jupyter.org/url/forex.toyolab.com/ipynb/TA_ohlc.ipynb
https://www.udemy.com/ruby-bitcoin/learn/v4/overview
勉強になりました。ありがとうございました。
#importするライブラリ
import hashlib
import hmac
import requests
import datetime
import json
from key import API_KEY,API_SECRET
import ccxt
from pprint import pprint
import time
import numpy as np
import pandas as pd
ここらへんですね。書いてる途中でいらなくなったのも残ってるかも。pprintとか。
keyに関しては、自分のAPIキーとAPIシークレットを別のファイルで格納してるのでimportしてます。
#取引所での操作系の関数
#bitFlyerの終値のデータをpandasのSeriesにして返す関数
def get_price_data():
response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : period })
response = response.json()
close_data = []
for i in range(data_n):
close_data.insert(0,response["result"][str(period)][-i-1][4])
arr = np.array(close_data)
return pd.Series(arr)
#IFDOCO
#sideは"BUY"or"SELL",他は数値
def IFDOCO_order(IFD_side,size,OCO_limit_price,OCO_stop_price):
api_key = API_KEY
api_secret = API_SECRET
if IFD_side == "BUY":
other_side = "SELL"
else:other_side = "BUY"
base_url = "https://api.bitflyer.jp"
path_url = "/v1/me/sendparentorder"
method = "POST"
timestamp = str(datetime.datetime.today())
param = {
"order_method": "IFDOCO",
"minute_to_expire": 10000,
"time_in_force": "GTC",
"parameters": [{
"product_code": "FX_BTC_JPY",
"condition_type": "MARKET",
#"price": price,
"side": IFD_side,
"size": size
},
{
"product_code": "FX_BTC_JPY",
"condition_type": "LIMIT",
"side": other_side,
"price": OCO_limit_price,
"size": size
},
{
"product_code": "FX_BTC_JPY",
"condition_type": "STOP",
"side": other_side,
"trigger_price": OCO_stop_price,
"size": size
}]
}
body = json.dumps(param)
message = timestamp + method + path_url + body
signature = hmac.new(bytearray(api_secret.encode('utf-8')), message.encode('utf-8') , digestmod = hashlib.sha256 ).hexdigest()
headers = {
'ACCESS-KEY' : api_key,
'ACCESS-TIMESTAMP' : timestamp,
'ACCESS-SIGN' : signature,
'Content-Type' : 'application/json'
}
response = requests.post( base_url + path_url , data = body , headers = headers)
print( response.status_code )
print( response.json() )
#ポジション確認
def position():
api_key = API_KEY
api_secret = API_SECRET
base_url = "https://api.bitflyer.jp"
path_url = "/v1/me/getparentorders?product_code=FX_BTC_JPY"
method = "GET"
timestamp = str(datetime.datetime.today())
message = timestamp + method + path_url
signature = hmac.new(bytearray(api_secret.encode('utf-8')), message.encode('utf-8') , digestmod = hashlib.sha256 ).hexdigest()
headers = {
'ACCESS-KEY' : api_key,
'ACCESS-TIMESTAMP' : timestamp,
'ACCESS-SIGN' : signature,
'Content-Type' : 'application/json'
}
response = requests.get( base_url + path_url , headers = headers)
#pprint(response.json())
return response.json()
#現在の注文をIDとともにprintする関数
def now_order():
bitflyer = ccxt.bitflyer()
bitflyer.apiKey = API_KEY
bitflyer.secret = API_SECRET
orders = bitflyer.fetch_open_orders(
symbol = "BTC/JPY",
params = { "product_code" : "FX_BTC_JPY" })
for o in orders:
pprint( o["id"] )
一つ一つ解説していきます。
まずget_price_data()
ですね。これは正確にはbitFlyerのAPIではなくcryptowatchというサイトのAPIを利用したものです。bitFlyerでは過去のデータを取得できないっぽかったので、こうしました。多分どの解説サイトも同様にそうしているんじゃないかと思います。終値のデータのみを抽出し、扱いやすい形のデータにして返しています。period
みたいにたまに定義されてない変数が出てきますが、後で定義するのでとりあえず気にしないでください。ちなみにperiod
は何分足のデータにするかを指定する変数で、秒単位で数値化します。(5分足であれば300)
次にIFDOCO_order(変数)
です。これは名前の通りIFDOCO注文を行う関数です。注意点としてはIFD_sideを設定するとOCO注文のsideは自動で設定されます。また、sizeも共通の値にしました。自分の取引ルールに合わせて、IFDは成行注文、OCOは利確が指値、損切りはストップ注文(トリガー指値+成行)に設定していますが、そこはお好みで変えて貰えばいいと思います。
position()
は後々のアルゴリズムでポジションを確認するために作った関数ですが、中身は名前と少しずれていて、親注文(特殊注文)の状態を取得します。細かいことは実際に動かしてみたほうが早いので試してみてください。
now_order()
はccxtライブラリという便利なものを使って現在の注文を返す関数です。まあ、そこまで重要なやつじゃないです。作らなくてもいい。
追記:now_order()
でたまにタイムアウトしてエラー吐かれて止まってしまいますね。timeout値設定しようにもccxt内部のことでよくわからなかったので、そんなに重要でもないし、今は外すことにしてます。さよなら。
#テクニカル指標系関数
次にテクニカル指標の実装系関数です。
ただし注意ですが、僕が思うなりに作ったものなので間違っている可能性に留意してください。気づいたら教えていただけると嬉しいです。ほんとにプログラミング初心者なので・・・
#EMA
#EMA_periodは期間、nはろうそく何本分前の値か
def EMA(EMA_period,n):
EMA_data = []
for i in range(2*EMA_period):
EMA_data.insert(0,close[data_n-1-i])
if n == 0:
arr = np.array(EMA_data)[-EMA_period:]
else:
arr = np.array(EMA_data)[-n-EMA_period:-n]
#print(arr)
EMA = pd.Series(arr).ewm(span=EMA_period).mean()
#print(EMA)
return EMA[EMA_period-1]
#MACD
#a=短期EMA_period,b=長期EMA_period,s=シグナル期間
def MACD_and_signal(a,b,s):
MACD = []
for i in range(a):
MACD.insert(0,EMA(a,i)-EMA(b,i))
arr = np.array(MACD)[-s:]
Signal = pd.Series(arr).rolling(s).mean()
return MACD,Signal
#ATR
#nは期間、n=14が普通
def ATR(n):
data = []
for i in range(2*n-1):
p1 = response["result"][str(period)][-i-1][2]-response["result"][str(period)][-i-1][3] #当日高値-当日安値
p2 = response["result"][str(period)][-i-1][2]-response["result"][str(period)][-i-2][4] #当日高値-前日終値
p3 = response["result"][str(period)][-i-1][3]-response["result"][str(period)][-i-2][4] #当日安値-前日終値
tr = max(abs(p1),abs(p2),abs(p3))
data.insert(0,tr)
arr = np.array(data)[-n:]
#print(arr)
ATR = pd.Series(arr).ewm(span=n).mean()
#print(ATR)
return ATR[n-1]
#BB
#pは期間,nは偏差の倍率
def BB(p,n):
Bands_period = p
Deviation = n
Base = close.rolling(Bands_period).mean()
sigma = close.rolling(Bands_period).std(ddof=0)
Upper = Base+sigma*Deviation
Lower = Base-sigma*Deviation
print(Upper)
print(Lower)
return Base,Upper,Lower
#RSI
#pは期間
def RSI(p):
RSI_period = p
#RSI_data =
diff = close.diff(1)
positive = diff.clip_lower(0).ewm(alpha=1.0/RSI_period).mean()
negative = diff.clip_upper(0).ewm(alpha=1.0/RSI_period).mean()
RSI = 100-100/(1-positive/negative)
return RSI
ここは特別な説明は省こうと思います。EMA,MACD,ボリンジャーバンド,ATR,RSIを実装しました。たまにコメントでprintするのが混じってますが、エラー解消に試行錯誤した痕跡なので気にしないでください。わからないことがあればコメントで聞いてください。
さて、肝心のエントリーシグナルですが、今回はあえて公開しないでおきます。バックテストをしておらず、まだ成績が良いかもわからないので。buy_signal()
,sell_signal()
という名前の、booleanを返す関数にしてます。つまり、エントリーするかしないかの2択です。シグナルの強弱は意識してないシンプルなやつです。
#実際に動く部分
さて、最後にいよいよ実際に取引を行っている部分です。
period = 300 #何秒足か
data_n = 100 #終値配列の長さ
flag = {
"check":True,
"position":False
} #注文状況のflag
time_count = 0 #時間計測(s)
limit = 3600 #動かす最大時間(s)
while time_count < limit:
#30秒おきに価格データを取得してシグナルを満たせば注文する
while flag["check"]:
close = get_price_data()
response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : period })
response = response.json()
if buy_signal():
print("買い注文をします")
#order_ccxt("buy",0.01)
IFDOCO_order("BUY",0.01,int(close[data_n-1]+2*ATR(14)),int(close[data_n-1]-2*ATR(14)))
flag["position"] = True
flag["check"] = False
else:print("買い注文しません")
if sell_signal():
print("売り注文をします")
#order_ccxt("sell",0.01)
IFDOCO_order("SELL",0.01,int(close[data_n-1]-2*ATR(14)),int(close[data_n-1]+2*ATR(14)))
flag["position"] = True
flag["check"] = False
else:print("売り注文しません")
time.sleep(30)
time_count += 30
if time_count > limit:
break
#30秒おきにポジションが残っているか確認する
while flag["position"]:
if position()[0]["parent_order_state"] == "ACTIVE":
pass
elif position()[1]["parent_order_state"] == "ACTIVE":
pass
elif position()[2]["parent_order_state"] == "ACTIVE":
pass
else:
flag["position"] = False
flag["check"] = True
print("注文中")
now_order()
time.sleep(30)
time_count += 30
if time_count > limit:
break
おおまかに説明すると、オープンな注文がないときは、30秒ごとに5分足のデータを利用したシグナルを計算して、Trueが返ってきたときにエントリーします。IFDOCO注文にして、初めから利確と損切りラインを決めておき、あとはひたすら祈るのみ。利確幅と損切り幅は今回は両方2ATRにしています。
注文中はACTIVEな注文がなくなるまで何もしません。
全体のループは一定時間でループを止める用です。
さて、ここまで読んでもらってわかると思いますが初心者でも1週間もかからずにこれくらいはかけるのでけっこう簡単です。みなさんもぜひやってみてください。
今後はcryptowatchのデータを使ってバックテストなどもしていきたいです。
あとは、利確損切りの値幅にG.A.を用いることも計画中です。
最後まで読んでいただきありがとうございました。