1. 概要
1.1 前編のおさらい
前編ではWaveNetを用いてビットコインの1時間足の収益率を予測するモデルを作成し、バックテストを行った。その結果WaveNetによる予測がベンチマークを上回ることがわかった。後編ではWaveNetを用いて自動売買を行うロジックを書いていく。
1.2 後編の内容
前回作成した予測モデルで予測を行い、実際に注文をする仕組みを記述していく。
使用するコードの詳細については下記のGitHubを参照。
https://github.com/SY122095/BTC_bot
1.3 開発環境と使用ツール
- Python 3.10
- SQLite3
取引結果を格納するために使用 - GMOコインアカウント & APIキー
データの取得やビットコインの注文に使用 - LINE Notify API
取引が行われたときにLINEに通知を送るために使用
以下のURLから登録
https://notify-bot.line.me/ja/
2. 実装する取引の流れ
取引は1時間間隔で行う。流れは以下の通り
- ポジションを決済する
- 最新データを取得してモデルの入力形式に変換する
- WaveNetにより収益率を予測する
- 予測値が正なら買い、負なら売りとしてポジションを決める
- 注文を出す
- 現在までの利益と残高をLINEに通知
- 60分待機
3. コードの詳細
3.1 ディレクトリ
┣━ main.py # bot本体を記述
┣━ src
┃ ┣━ data
┃ ┃ ┗━ make_datasets.py # 学習等データと予測用データ作成する関数を記述
┃ ┣━ features
┃ ┃ ┗━ build_features.py # 特徴量を作成する関数を記述
┃ ┗━ models
┃ ┣━ predict_model.py
┃ ┗━ train_model.py # モデル構築のための関数を記述
┣━ trading
┃ ┗━ trading.py # GMOコインにて取引を行うための関数を記述
┣━ models
┃ ┗━ model.h5 # WaveNetモデル
┣━ line
┃ ┗━ line_notify # LINEにメッセージを送るための関数を記述
┗━ sql
┗━ trading.db # 取引結果を記録するデータベース
3.2 LINEに通知
こちらの記事を参考に作成。
取引のポジション(ショートorロング)や決済後の利益を知らせるために用いる。
import requests
class LineNotify:
def __init__(self):
# LineNotify登録時に取得したTOKENを指定
self.line_notify_token = LINE_NOTIFY_TOKEN
self.line_notify_api = "https://notify-api.line.me/api/notify"
self.headers = {
"Authorization": f"Bearer {self.line_notify_token}"
}
def send(self, msg):
'''lineにメッセージを送るメソッド'''
msg = { "message": f" {msg}" }
requests.post(self.line_notify_api, headers = self.headers, data = msg)
3.3 取引の実行と結果の取得
APIを使って取引を行うための関数を作成する。
GMOコインの公式ドキュメントとブログ記事を参考に作成。
決済注文を行う関数、予測値に応じてレバレッジ取引を行う関数、データベースへ格納するために取引結果を返す関数を定義している。
作成した関数のコード
import hashlib
import hmac
import json
import requests
import time
import pandas as pd
from pytz import timezone
from datetime import datetime
from line.line_notify import LineNotify
line_notify = LineNotify()
api_Key = # 取得したAPIキーを入れる
secretKey = # 取得したシークレットキーを入れる
def get_availableAmount():
'''取引余力を取得する関数'''
timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
method = 'GET'
endPoint = 'https://api.coin.z.com/private'
path = '/v1/account/margin'
text = timestamp + method + path
sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
headers = {
"API-KEY": api_Key,
"API-TIMESTAMP": timestamp,
"API-SIGN": sign
}
res = requests.get(endPoint + path, headers=headers)
r = json.dumps(res.json(), indent=2)
return json.loads(r)['data']['availableAmount']
def build_position(symbol, side, executionType, size, price='', losscutPrice='', timeInForce='FAK'):
'''
ポジションを決める関数
'''
timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
method = 'POST'
endPoint = 'https://api.coin.z.com/private'
path = '/v1/order'
reqBody = {
"symbol": symbol,
"side": side,
"executionType": executionType,
"timeInForce": timeInForce,
"price": price,
"losscutPrice": losscutPrice,
"size": size
}
text = timestamp + method + path + json.dumps(reqBody)
sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
headers = {
"API-KEY": api_Key,
"API-TIMESTAMP": timestamp,
"API-SIGN": sign
}
res = requests.post(endPoint + path, headers=headers, data=json.dumps(reqBody))
return res.json()
def get_position():
'''建玉一覧を取得'''
timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
method = 'GET'
endPoint = 'https://api.coin.z.com/private'
path = '/v1/openPositions'
text = timestamp + method + path
sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
parameters = {
"symbol": "BTC_JPY",
"page": 1,
"count": 100
}
headers = {
"API-KEY": api_Key,
"API-TIMESTAMP": timestamp,
"API-SIGN": sign
}
res = requests.get(endPoint + path, headers=headers, params=parameters)
return res.json()
def close_position(ticker, side, size, executionType, position_id):
'''決済注文を出す'''
timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
method = 'POST'
endPoint = 'https://api.coin.z.com/private'
path = '/v1/closeOrder'
reqBody = {
"symbol": ticker,
"side": side,
"executionType": executionType,
"timeInForce": "",
"price": "",
"settlePosition": [
{
"positionId": position_id,
"size": size
}
]
}
text = timestamp + method + path + json.dumps(reqBody)
sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
headers = {
"API-KEY": api_Key,
"API-TIMESTAMP": timestamp,
"API-SIGN": sign
}
res = requests.post(endPoint + path, headers=headers, data=json.dumps(reqBody))
return res.json()
def exe_all_position():
'''すべてのポジションを決済する'''
position = get_position()
if position['data'] == {}:
print('ポジションはありません')
else:
for i in position['data']['list']:
if i['side'] == 'BUY':
close_res = close_position(i['symbol'], 'SELL', i['size'], 'MARKET', i['positionId'])
if close_res['status'] == 0:
print('レバレッジ取引(買い注文)は決済されました')
line_notify.send('レバレッジ取引(買い注文)は決済されました')
else:
print(close_res)
elif i['side'] == 'SELL':
close_res = close_position(i['symbol'], 'BUY', i['size'], 'MARKET', i['positionId'])
if close_res['status'] == 0:
print('レバレッジ取引(売り注文)は決済されました')
line_notify.send('レバレッジ取引(買い注文)は決済されました')
else:
print(close_res)
def order_process(symbol, side, executionType, size, price='', losscutPrice='', timeInForce='FAK'):
'''注文を出す'''
if side == 'BUY':
build_position(symbol, side, executionType, size, price, losscutPrice, timeInForce='FAK')
time.sleep(1)
print('ビットコインを' + str(get_position()['data']['list'][0]['price']) + '円でロングしました')
line_notify.send('ビットコインを' + str(get_position()['data']['list'][0]['price']) + '円でロングしました')
elif side == 'SELL':
build_position(symbol, side, executionType, size, price, losscutPrice, timeInForce='FAK')
time.sleep(1)
print('ビットコインを' + str(get_position()['data']['list'][0]['price']) + '円でショートしました')
line_notify.send('ビットコインを' + str(get_position()['data']['list'][0]['price']) + '円でショートしました')
def get_trade_result():
'''取引の記録を取得'''
timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
method = 'GET'
endPoint = 'https://api.coin.z.com/private'
path = '/v1/latestExecutions'
text = timestamp + method + path
sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
parameters = {
"symbol": "BTC_JPY",
"page": 1,
"count": 2
}
headers = {
"API-KEY": api_Key,
"API-TIMESTAMP": timestamp,
"API-SIGN": sign
}
res = requests.get(endPoint + path, headers=headers, params=parameters)
time_ = res.json()['data']['list'][1]['timestamp']
time_ = pd.Timestamp(time_)
time_ = time_.astimezone(timezone('Asia/Tokyo'))
year = time_.year
month = time_.month
day = time_.day
hour = time_.hour
date = str(year) + '-' + str(month)+ '-' + str(day)+ ' ' + str(hour) + ':00:00'
side = res.json()['data']['list'][1]['side']
if side == 'SELL':
position = -1
else:
position = 1
order_price = int(res.json()['data']['list'][1]['price'])
close_price = int(res.json()['data']['list'][0]['price'])
loss_gain = int(res.json()['data']['list'][0]['lossGain'])
id = int(res.json()['data']['list'][0]['executionId'])
return id, date, position, order_price, close_price, loss_gain
3.4 予測に使うデータの作成
今回はシーケンスの幅を20とするWaveNetにより予測値を算出するため、過去20時間のビットコインとイーサリアムの価格を取得する関数を作成する。
特徴量に関してはWindow normalizationを用いて正規化を行う。
また、WaveNetの入力形式に合わせて次元を追加する。
データセットを作成する関数
import json
import numpy as np
import pandas as pd
import requests
from datetime import datetime
from datetime import timedelta
from pandas import json_normalize
def get_data(symbol='BTC_JPY', interval='1hour', date=''):
'''1日分の1時間足データを取得する'''
endPoint = 'https://api.coin.z.com/public'
path = f'/v1/klines?symbol={symbol}&interval={interval}&date={date}'
response = requests.get(endPoint + path)
r = json.dumps(response.json(), indent=2)
r2 = json.loads(r)
df = json_normalize(r2['data'])
if len(df):
date = []
for i in df['openTime']:
i = int(i)
tsdate = int (i / 1000)
loc = datetime.utcfromtimestamp(tsdate)
date.append(loc)
df.index = date
df.index = df.index.tz_localize('UTC')
df.index = df.index.tz_convert('Asia/Tokyo')
df.drop('openTime', axis=1, inplace=True)
return df
def get_today():
'''YYYYMMDD形式で本日の日付を取得'''
now_time = datetime.now()
current_year = now_time.year
current_month = now_time.month
current_day = now_time.day
if current_month >= 10 and current_day >= 10:
today = str(current_year) + str(current_month) + str(current_day)
elif current_month < 10 and current_day >= 10:
today = str(current_year) + '0' + str(current_month) + str(current_day)
elif current_month < 10 and current_day < 10:
today = str(current_year) + '0' + str(current_month) + '0' + str(current_day)
elif current_month >= 10 and current_day < 10:
today = str(current_year) + str(current_month) + '0' + str(current_day)
return today
def data_for_prediction():
'''モデルへの入力のため過去20時間分のデータを用意する関数'''
today = get_today()
yesterday = datetime.strptime(today, '%Y%m%d')
yesterday -= timedelta(days=1)
yesterday = str(yesterday)
yesterday = yesterday.replace('-', '')
yesterday = yesterday.replace(' 00:00:00', '')
two_days_before = datetime.strptime(yesterday, '%Y%m%d')
two_days_before -= timedelta(days=1)
two_days_before = str(two_days_before)
two_days_before = two_days_before.replace('-', '')
two_days_before = two_days_before.replace(' 00:00:00', '')
if datetime.now().hour > 6:
btc_today = get_data(symbol='BTC_JPY', interval='1hour', date=today)
eth_today = get_data(symbol='ETH_JPY', interval='1hour', date=today)
btc_yesterday = get_data(symbol='BTC_JPY', interval='1hour', date=yesterday)
eth_yesterday = get_data(symbol='ETH_JPY', interval='1hour', date=yesterday)
btc_data = pd.concat([btc_yesterday, btc_today], axis=0)
eth_data = pd.concat([eth_yesterday, eth_today], axis=0)
eth_data.columns = ['eth_open', 'eth_high', 'eth_low', 'eth_close', 'eth_volume']
df = pd.concat([btc_data, eth_data], axis=1)
df = df.tail(20)
else:
btc_yesterday = get_data(symbol='BTC_JPY', interval='1hour', date=yesterday)
eth_yesterday = get_data(symbol='ETH_JPY', interval='1hour', date=yesterday)
btc_day2 = get_data(symbol='BTC_JPY', interval='1hour', date=two_days_before)
eth_day2 = get_data(symbol='ETH_JPY', interval='1hour', date=two_days_before)
btc_data = pd.concat([btc_day2, btc_yesterday], axis=0)
eth_data = pd.concat([eth_day2, eth_yesterday], axis=0)
eth_data.columns = ['eth_open', 'eth_high', 'eth_low', 'eth_close', 'eth_volume']
df = pd.concat([btc_data, eth_data], axis=1)
df = df.tail(20)
return df
特徴量を生成する関数
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from sklearn.model_selection import train_test_split
def normalise_windows(window_data, single_window=False):
''' window normalization'''
normalised_data = [] # 正規化したデータを格納
window_data = [window_data] if single_window else window_data
for window in window_data:
normalised_window = []
for col_i in range(window.shape[1]): # Windowの幅
# 各値を初期の値で割る
normalised_col = [((float(p) / float(window[0, col_i])) - 1) for p in window[:, col_i]]
normalised_window.append(normalised_col)
# reshape and transpose array back into original multidimensional format
normalised_window = np.array(normalised_window).T
normalised_data.append(normalised_window)
return np.array(normalised_data)
def create_x_for_prediction(df, seqence_width=20):
'''予測値算出のためのデータセット作成'''
open = df['open'].values
close = df['close'].values
high = df['high'].values
low = df['low'].values
eth_open = df['eth_open'].values
eth_close = df['eth_close'].values
seqence_width = seqence_width
open_df = sliding_window_view(open, seqence_width)
close_df = sliding_window_view(close, seqence_width)
high_df = sliding_window_view(high, seqence_width)
low_df = sliding_window_view(low, seqence_width)
eth_open_df = sliding_window_view(eth_open, seqence_width)
eth_close_df = sliding_window_view(eth_close, seqence_width)
x_open = open_df[:, :, np.newaxis]
x_close = close_df[:, :, np.newaxis]
x_high = high_df[:, :, np.newaxis]
x_low = low_df[:, :, np.newaxis]
x_eth_open = eth_open_df[:, :, np.newaxis]
x_eth_close = eth_close_df[:, :, np.newaxis]
x_data = np.concatenate([x_open, x_close], axis=2)
x_data = np.concatenate([x_data, x_high], axis=2)
x_data = np.concatenate([x_data, x_low], axis=2)
x_data = np.concatenate([x_data, x_eth_open], axis=2)
x_data = np.concatenate([x_data, x_eth_close], axis=2)
x_data = normalise_windows(x_data)
return x_data
3.5 Bot本体
モデルの更新やBotを停止する条件などは割愛しているため、詳細はGitHubを参照されたい。
import pandas as pd
import sqlite3
import tensorflow as tf
import time
from datetime import datetime
from line.line_notify import LineNotify
from src.data.make_dataset import data_for_prediction
from src.features.build_features import create_x_for_prediction
from src.models.train_model import mish
from trading.trading import get_availableAmount, exe_all_position, order_process, get_trade_result
####-----------------------------初期設定-----------------------------####
ticker = "BTC_JPY" # 売買対象の仮想通貨(ビットコインのレバレッジ取引)
exe_type = 'MARKET' # 注文方式(成行)
default_balance = float(get_availableAmount()) # デフォルトの残高
line_notify = LineNotify() # 取引発生時にLINEで知らせるためのインスタンス
line_notify.send('取引ボットの稼働を開始します。') # ボットの稼働開始を知らせる
trade_num = 0 # 取引回数
profit = 0 # 利益を格納する
tf.keras.utils.get_custom_objects().update({'mish': mish})
dbname = 'sql/trading.db' # 取引結果を格納するデータベース
####-----------------------------Bot本体の処理-----------------------------####
while True:
# ポジションを決済
exe_all_position()
##--------ポジションを決めるための予測を行う--------##
df = data_for_prediction()
x = create_x_for_prediction(df)
model = tf.keras.models.load_model('.\\models\\model.h5', custom_objects={'mish': mish})
prediction = model.predict(x)
print(prediction)
if prediction > 0:
side = 'BUY'
tmp_position = 'long'
hour = datetime.now().hour
elif prediction <= 0:
side = 'SELL'
tmp_position = 'short'
hour = datetime.now().hour
else:
continue
print(str(datetime.now().year) + '年' + str(datetime.now().month) + '月' + str(datetime.now().day) + '日' + str(datetime.now().hour) + '時のポジションは' + tmp_position + 'です。')
time.sleep(1)
##--------取引結果をデータベースに登録--------##
if trade_num != 0:
tmp_id, tmp_date, tmp_position_, tmp_order_price, tmp_close_price, tmp_loss_gain = get_trade_result() # 取引記録の取得
tmp_df = pd.DataFrame(columns=['id', 'date', 'position', 'order_price', 'close_price', 'loss_gain'],
data=[[tmp_id, tmp_date, tmp_position_, tmp_order_price, tmp_close_price, tmp_loss_gain]])
result_df = pd.concat([result_df, tmp_df])
profit += tmp_loss_gain
# データベースに登録
conn = sqlite3.connect(dbname)
cur = conn.cursor()
cur.execute('INSERT INTO trading values(?, ?, ?, ?, ?, ?)', (tmp_id, tmp_date, tmp_position_, tmp_order_price, tmp_close_price, tmp_loss_gain))
conn.commit()
conn.close()
print('取引結果をデータベースに登録しました。\n')
##--------利益と余力をLINEに通知--------##
available = int(get_availableAmount())
profit_loss = available - default_balance
profit_rate = profit / default_balance
if profit_loss > 0:
line_notify.send('現在の残高は' + str(available) + '円で、' + str(profit_loss) + '円の利益です')
elif profit_loss == 0:
line_notify.send('現在の残高は' + str(available) + '円で、' + '損益無しです')
else:
line_notify.send('現在の残高は' + str(available) + '円で、' + str(-profit_loss) + '円の損失です')
##--------注文を出す--------##
order_process(symbol=ticker, side=side, executionType=exe_type, size=0.01)
time.sleep(1)
trade_num += 1
##--------1時間経過するまで待つ--------##
minutes = 60 - datetime.now().minute
sleep_time = 60 * minutes
print(f'{minutes}分スリープします。\n')
time.sleep(sleep_time)
4. まとめ
前編と後編の2回にわたりビットコインの自動売買について書きました。
やはり収益を生み出すロジックを考える部分が最も重要かつ困難だと感じました。
特徴量の作成やモデルの選定などまだまだ改善点は多いため引き続き試行錯誤していこうと思います。