概要
少額の軍資金から自動売買をプログラムして増やしていく企画です.
0.更新履歴
2021年04月29日 初版投稿
2021年04月29日 板情報の記録追加
2021年04月30日 TensorBoardでリアルタイム可視化追加
2021年04月30日 論文読み追加(Bitcoin Price Prediction Based on Deep Learning Methods)
2021年05月02日 GPU環境の構築
2021年05月03日 お手軽時系列予測
2021年06月13日 古典的な簡易自動売買システムの構築
2021年06月29日 β版自動売買プログラムの稼働状況
1.調査
1.1 ウォレット関係
API Documentation
https://lightning.bitflyer.com/docs?lang=ja
注文数量について
https://bitflyer.com/ja-jp/faq/4-27
Realtime API
https://bf-lightning-api.readme.io/docs/realtime-api
1.2 論文関係
Bitcoin Price Prediction Based on Deep Learning Methods
https://www.researchgate.net/publication/339143532_Bitcoin_Price_Prediction_Based_on_Deep_Learning_Methods
↓↓↓↓↓↓↓
日本語化しました
https://qiita.com/m_tani_july/items/f1220992376008264450
1.3 コード関係
PythonでbitFlyer APIを利用する方法を現役エンジニアが解説【初心者向け】
https://techacademy.jp/magazine/43479
仮想通貨交換所ディーカレット、取引APIの提供を開始
https://crypto.watch.impress.co.jp/docs/news/1243844.html
仮想通貨自動取引入門
https://qiita.com/shionhonda/items/bd2a7aaf143eff4972c4
[仮想通貨取引所]BitlflyerのRealtimeAPIをPythonで呼び出す
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94
TensorBoardの最もシンプルなサンプル
https://qiita.com/yabeenico/items/b0fb24d65de988e59de2
FX予測 : PyTorchのLSTMで時系列データ予測
https://qiita.com/THERE2/items/3c13164c1c82c1dcf4b7
1.4 GPU 関係
NvidiaドライバとCUDAとcuDNNとTensorflow-gpuとPythonのバージョンの対応
https://qiita.com/konzo_/items/a6f2e8818e5e8fcdb896
2.バックテスト環境構築
2.1 板情報の記録
バックテストをするために,apiを用いて板の情報を記録するコードを作成します.
BitlflyerのRealtimeAPIをPythonで呼び出して記録する方法で作成します.
こちらの記事が神なので参考にさせていただきました.
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94
こんなコードで動かしてます.
import json
import websocket
import pprint
import os
from time import sleep
from logging import getLogger,INFO,StreamHandler
from datetime import datetime
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)
"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):
def __init__(self, url, channel):
self.url = url
self.channel = channel
#Define Websocket
self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
websocket.enableTrace(True)
def run(self):
#ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
self.ws.run_forever()
logger.info('Web Socket process ended.')
"""
Below are callback functions of websocket.
"""
# when we get message
def on_message(self, ws, message):
output = json.loads(message)['params']
# logger.info(output)
# pprint.pprint(output)
save_dict(output)
# when error occurs
def on_error(self, ws, error):
logger.error(error)
# when websocket closed.
def on_close(self, ws):
logger.info('disconnected streaming server')
# when websocket opened.
def on_open(self, ws):
logger.info('connected streaming server')
output_json = json.dumps(
{'method' : 'subscribe',
'params' : {'channel' : self.channel}
}
)
ws.send(output_json)
################################
# json を保存する関数
#
def save_dict(output):
save_dir = 'realtime_logs/' + datetime.now().strftime("%Y%m%d") + '/'
os.makedirs(save_dir, exist_ok=True)
save_path = save_dir + '/realtime_' + datetime.now().strftime("%Y%m%d-%H%M%S.%f") + '.json'
print("{:*^25}".format(" save json : " + save_path))
with open(save_path, mode='wt', encoding='utf-8') as file:
json.dump(output, file, ensure_ascii=False, indent=2)
if __name__ == '__main__':
#API endpoint
url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
channel = 'lightning_board_snapshot_XLM_JPY'
# channel = 'lightning_board_XLM_JPY'
json_rpc = RealtimeAPI(url=url, channel=channel)
#ctrl + cで終了
json_rpc.run()
# while 1:
# try:
# json_rpc.run()
# except:
# pritn("ERROR")
# pass
結果,こんな感じで取得した板情報をjsonに保存する様にしました.
途中でこんなエラーが多発する場合は,時間を置いてから再度実行してみてください.
回線速度が細すぎてもエラーがでるそうです.
私のネットワーク環境ではamazonプライム見ながらだとエラーがでます.
(venv) C:\Users\Documents\GIT\mybitflyer>python realtime_api.py
[WinError 10060] 接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。
disconnected streaming server
Web Socket process ended.
2.2 TensorBoardでリアルタイム可視化
リアルタイムに可視化するツールは数多く出回っています.しかし,移動平均が標準装備である点やその気になれば画像とかも描画でき,尚且つ表示させたい項目をポチポチ選べるのはTensorBoardだけではないでしょうか?
しかも使い方が簡単!!
と言うことでTensorBoardを使って可視化していこうと思います.
*解析とかはmatplotlibとか使うかもしれません.
TensorBoardを使うためにインストール
pip install tensorflow tensorboard
下記コードでTensorBoardの設定をして
################################
# tensorboardの設定
#
logdir = "logs/bitflyer/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()
下記コードで描画をする簡単なコードです.
################################
# tensorboardを描画する関数
#
def visial_board(output, i):
mid_price = output['message']['mid_price']
print("[{:03d}] mid_price: {}".format(i, mid_price))
tf.summary.scalar('{}/mid_price'.format(product_code), data=mid_price, step=i)
実行ごとにファイルが作成され,保存されます.
最終的なソースコードはこちら
import json
import websocket
import pprint
import os
import tensorflow as tf
from time import sleep
from logging import getLogger,INFO,StreamHandler
from datetime import datetime
################################
# loggerの設定
#
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)
################################
# tensorboardの設定
#
logdir = "logs/bitflyer/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()
################################
# Bitflyerの設定
#
product_code = 'XLM_JPY'
"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):
def __init__(self, url, channel):
self.url = url
self.channel = channel
self.i = 0
#Define Websocket
self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
websocket.enableTrace(True)
def run(self):
#ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
self.ws.run_forever()
logger.info('Web Socket process ended.')
"""
Below are callback functions of websocket.
"""
# when we get message
def on_message(self, ws, message):
output = json.loads(message)['params']
# logger.info(output)
# pprint.pprint(output)
save_dict(output)
visial_board(output, self.i)
self.i += 1
# when error occurs
def on_error(self, ws, error):
logger.error(error)
# when websocket closed.
def on_close(self, ws):
logger.info('disconnected streaming server')
# when websocket opened.
def on_open(self, ws):
logger.info('connected streaming server')
output_json = json.dumps(
{'method' : 'subscribe',
'params' : {'channel' : self.channel}
}
)
ws.send(output_json)
################################
# json を保存する関数
#
def save_dict(output):
save_dir = 'realtime_logs/' + datetime.now().strftime("%Y%m%d") + '/'
os.makedirs(save_dir, exist_ok=True)
save_path = save_dir + '/realtime_' + datetime.now().strftime("%Y%m%d-%H%M%S.%f") + '.json'
print("{:*^25}".format(" save json : " + save_path))
with open(save_path, mode='wt', encoding='utf-8') as file:
json.dump(output, file, ensure_ascii=False, indent=2)
################################
# tensorboardを描画する関数
#
def visial_board(output, i):
mid_price = output['message']['mid_price']
print("[{:03d}] mid_price: {}".format(i, mid_price))
tf.summary.scalar('{}/mid_price'.format(product_code), data=mid_price, step=i)
if __name__ == '__main__':
#API endpoint
url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
channel = 'lightning_board_snapshot_' + product_code
# channel = 'lightning_board_XLM_JPY'
json_rpc = RealtimeAPI(url=url, channel=channel)
#ctrl + cで終了
# json_rpc.run()
while 1:
try:
json_rpc.run()
except:
pritn("ERROR")
pass
結果のグラフはこちら.
これで楽々移動平均のグラフが描けますね.
3. GPU関係の設定
最終的にはdeeplearningを使っていきたいと思いますので,その環境を構築していきます.
PyTorchのサイトから,cuda10.2を使う前提でインストールを進めていきます.
3.1 nvidiaドライバインストール
こちらのサイトから
RTX2070を使用しているので,こちらの条件で検索します.
こんな画面がでてきたのでこれをインストールします.
ここで,cuda10.2にドライバが対応しているかをこちらのサイトで確認しておいてください.
下記コマンドでインストールの確認ができます.
C:\Windows\System32\nvidia-smi.exe
3.2 cudaインストール
ここから環境に合った選択肢を選択して,cuda10.2をインストール
3.3 PyTorchインストール
ここから選択肢を選択して,インストールコードをコピー
仮想環境にインストール
3.4 PyTorch + cuda起動確認
下記コードでGPUが設定されているか確認
import numpy as np
import pandas as pd
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import pickle
from datetime import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gc
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(1)
print("device:{}".format(device))
ちゃんとcudaになっていますね!
これでGPU環境構築は完了です.
4. お手軽DeepLaring時系列予測
lstmの練習も兼ねて,ネットにあるlstmの時系列予測するソースコードを改変しました.
kaggleのbitcoinのデータセットに適応させ,結果を可視化する簡単なものになります.
4.1 参照サイト
FX予測 : PyTorchのLSTMで時系列データ予測
https://qiita.com/THERE2/items/3c13164c1c82c1dcf4b7
Bitcoin Historical Data
https://www.kaggle.com/mczielinski/bitcoin-historical-data
4.2 ソースコード
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import pickle
from datetime import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gc
import os
import tensorflow as tf
from matplotlib import pyplot
import matplotlib.pyplot as plt
from tqdm import tqdm
#######################################
# tensorboard
#
logdir = "logs/lstm_sample/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()
#######################################
# ハイパーパラメータ
#
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(1)
future_num = 60 #何足先を予測するか
feature_num = 8 #volume, open, high, low, closeの5項目
batch_size = 512 * 200
view_size = 64
time_steps = 60 # lstmのtimesteps
# moving_average_num = 500 # 移動平均を取るCandle数
n_epocs = 300
#データをtrain, testに分割するIndex
# val_idx_from = 3000000
# test_idx_from = 4000000
val_idx_from = 4700000
test_idx_from = 4800000
lstm_hidden_dim = 16
target_dim = 1
save_dir = logdir + '/output'
os.makedirs(save_dir, exist_ok=True)
class LSTMClassifier(nn.Module):
def __init__(self, lstm_input_dim, lstm_hidden_dim, target_dim):
super(LSTMClassifier, self).__init__()
self.input_dim = lstm_input_dim
self.hidden_dim = lstm_hidden_dim
self.lstm = nn.LSTM(input_size=lstm_input_dim,
hidden_size=lstm_hidden_dim,
num_layers=1, #default
#dropout=0.2,
batch_first=True
)
self.dense = nn.Linear(lstm_hidden_dim, target_dim)
def forward(self, X_input):
_, lstm_out = self.lstm(X_input)
# LSTMの最終出力のみを利用する。
linear_out = self.dense(lstm_out[0].view(X_input.size(0), -1))
return torch.sigmoid(linear_out)
def prep_feature_data(batch_idx, time_steps, X_data, feature_num, device):
feats = torch.zeros((len(batch_idx), time_steps, feature_num), dtype=torch.float, device=device)
for b_i, b_idx in enumerate(batch_idx):
# 過去のN足分をtime stepのデータとして格納する。
b_slc = slice(b_idx + 1 - time_steps ,b_idx + 1)
feats[b_i, :, :] = X_data[b_slc, :]
return feats
def main():
print("\n{:>^30}".format(" init ").upper())
print(">> {:<15}:{}".format("device", device))
print("\n{:>^30}".format(" datasets load ").upper())
datasets_path = r"datasets\bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv"
df = pd.read_csv(datasets_path)
print(">> {}".format("origin data"))
print(df)
print(">> {}".format("replace nan data"))
df = df.fillna(method='ffill')
print(df)
# 2. 教師データの作成
future_price = df.iloc[future_num:]['Weighted_Price'].values
curr_price = df.iloc[:-future_num]['Weighted_Price'].values
y_data_tmp = future_price - curr_price
y_data = np.zeros_like(y_data_tmp)
y_data[y_data_tmp > 0] = 1
# 3. 価格の正規化
cols = df.columns
# for col in cols:
# # df['Roll_' + col] = df[col].rolling(window=500, min_periods=500).mean()
# df[col] = df[col] / df[col] - 1
#最初の500足分は移動平均データがないため除く。後半の144足分は予測データがないため除く
X_data = df.iloc[:-future_num][cols].values
# 4. データの分割、TorchのTensorに変換
#学習用データ
X_train = torch.tensor(X_data[:val_idx_from], dtype=torch.float, device=device)
y_train = torch.tensor(y_data[:val_idx_from], dtype=torch.float, device=device)
#評価用データ
X_val = torch.tensor(X_data[val_idx_from:test_idx_from], dtype=torch.float, device=device)
y_val = y_data[val_idx_from:test_idx_from]
#テスト用データ
X_test = torch.tensor(X_data[test_idx_from:], dtype=torch.float, device=device)
y_test = y_data[test_idx_from:]
print(">> {:<15}:{}".format("X_train", X_train.shape))
print(">> {:<15}:{}".format("X_val", X_val.shape))
print(">> {:<15}:{}".format("X_test", X_test.shape))
# Prepare for training
print("\n{:>^30}".format(" LSTMClassifier ").upper())
model = LSTMClassifier(feature_num, lstm_hidden_dim, target_dim).to(device)
loss_function = nn.BCELoss()
optimizer= optim.Adam(model.parameters(), lr=1e-4)
step = 0
train_size = X_train.size(0)
best_acc_score = 0
for epoch in range(n_epocs):
print(">> {:<15}:{}/{}".format("start epoch", epoch, n_epocs))
# 1. まずはtrainデータのindexをランダムに入れ替える。最初のtime_steps分は使わない。
perm_idx = np.random.permutation(np.arange(time_steps, train_size))
# 2. batch size毎にperm_idxの対象のindexを取得
for t_i in tqdm(range(0, len(perm_idx), batch_size)):
batch_idx = perm_idx[t_i:(t_i + batch_size)]
# 3. LSTM入力用の時系列データの準備
feats = prep_feature_data(batch_idx, time_steps, X_train, feature_num, device)
y_target = y_train[batch_idx]
# 4. pytorch LSTMの学習実施
model.zero_grad()
train_scores = model(feats) # batch size x time steps x feature_num
loss = loss_function(train_scores, y_target.view(-1, 1))
loss.backward()
optimizer.step()
tf.summary.scalar('EPOCH/step', data=loss.item(), step=step)
step = step + 1
if(t_i == 0):
print(">> {:<15}:{}".format("feats", feats.shape))
print(">> {:<15}:{}".format("y_target", y_target.shape))
print(">> {:<15}:{}".format("train_scores", train_scores.shape))
#######################################
# グラフ描画
#
fig = plt.figure(figsize=(16.0, 6.0))
# plt.plot(x, y, label='train')
x = list(range(view_size))
plt.bar(x, y_target[:view_size].detach().cpu().numpy() , align="edge", width=-0.3, label='true')
plt.bar(x, train_scores[:view_size, 0].detach().cpu().numpy() , align="edge", width= 0.3, label='train')
plt.xlabel("batch")
plt.ylabel("Judgment [0 or 1]")
plt.legend()
save_path = save_dir + "/bar_epoch{:03d}_ind{:03d}.png".format(epoch, t_i)
print(save_path)
fig.savefig(save_path)
# break
# 5. validationデータの評価
print('EPOCH ', str(epoch), ' loss :', loss.item())
# writer.add_scalar("EPOCH/loss", loss.item(), epoch)
tf.summary.scalar('EPOCH/loss', data=loss.item(), step=epoch)
with torch.no_grad():
feats_val = prep_feature_data(np.arange(time_steps, X_val.size(0)), time_steps, X_val, feature_num, device)
val_scores = model(feats_val)
tmp_scores = val_scores.view(-1).to('cpu').numpy()
bi_scores = np.round(tmp_scores)
acc_score = accuracy_score(y_val[time_steps:], bi_scores)
roc_score = roc_auc_score(y_val[time_steps:], tmp_scores)
print('Val ACC Score :', acc_score, ' ROC AUC Score :', roc_score)
# writer.add_scalar("EPOCH/Val ACC Score", acc_score, epoch)
# writer.add_scalar("EPOCH/Val ROC AUC Score", roc_score, epoch)
tf.summary.scalar('EPOCH/Val ACC Score', data=acc_score, step=epoch)
tf.summary.scalar('EPOCH/Val ROC AUC Score', data=roc_score, step=epoch)
#######################################
# グラフ描画
#
fig = plt.figure(figsize=(16.0, 6.0))
# plt.plot(x, y, label='train')
x = list(range(view_size))
plt.bar(x, y_val[time_steps:time_steps+view_size] , align="edge", width=-0.3, label='true')
plt.bar(x, tmp_scores[:view_size] , align="edge", width= 0.3, label='val')
plt.xlabel("batch")
plt.ylabel("Judgment [0 or 1]")
plt.legend()
save_path = save_dir + "/bar_epoch{:03d}_val.png".format(epoch)
print(save_path)
fig.savefig(save_path)
# 6. validationの評価が良ければモデルを保存
torch.save(model.state_dict(),'./models/pytorch_epoch{:03d}.mdl'.format(epoch))
if acc_score > best_acc_score:
best_acc_score = acc_score
torch.save(model.state_dict(),'./models/pytorch_v1.mdl')
print('best score updated, Pytorch model was saved!!', )
# 7. bestモデルで予測する。
model.load_state_dict(torch.load('./models/pytorch_v1.mdl'))
with torch.no_grad():
feats_test = prep_feature_data(np.arange(time_steps, X_test.size(0)), time_steps, X_test, feature_num, device)
val_scores = model(feats_test)
tmp_scores = val_scores.view(-1).to('cpu').numpy()
bi_scores = np.round(tmp_scores)
acc_score = accuracy_score(y_test[time_steps:], bi_scores)
roc_score = roc_auc_score(y_test[time_steps:], tmp_scores)
print('Test ACC Score :', acc_score, ' ROC AUC Score :', roc_score)
print(">> {:<15}:{}".format("feats_test", feats_test.shape))
print(">> {:<15}:{}".format("y_test", y_test.shape))
print(">> {:<15}:{}".format("tmp_scores", tmp_scores.shape))
#######################################
# グラフ描画
#
fig = plt.figure(figsize=(16.0, 6.0))
# plt.plot(x, y, label='train')
x = list(range(view_size))
plt.bar(x, y_test[time_steps:time_steps+view_size] , align="edge", width=-0.3, label='true')
plt.bar(x, tmp_scores[:view_size] , align="edge", width= 0.3, label='val')
plt.xlabel("batch")
plt.ylabel("Judgment [0 or 1]")
plt.legend()
save_path = save_dir + "/bar_best.png".format()
fig.savefig(save_path)
if __name__ == "__main__":
main()
4.3 お手軽時系列予測結果
下図はlossの学習曲線と精度とROCのグラフです.
下図は次の60分後に価格が上がるか下がるかを予測したものです.
上がれば1,下がれば0です.
valがモデルが予測した値で,trueが正解値です.
結果を見た通り,0.5を出力することでlossを最小限にするというインチキをネットワークが行ってきました.
これは株の予測でも見られるランダムウォークが原因と考えられます.
なので,このモデルから改良が必要そうですね.
4.4 ソースコードの問題点
今回の結果に関わっているかは不明ですが,このコードにはデータローダーがないです.
データローダーがないため,一度に大量のデータをtest,valしようとしています.
そのため,大分データの数を絞りました.
また,読み出しでマルチスレッド化してないのでその辺も学習時間をロスしてそうです.
5. 古典的な簡易自動売買システムの構築
DeepLaringを使用した自動売買はとてもロマンがあります.しかし,実行時間の問題やGPUを食ってしまう点や4.4で説明したちょっとしたバグなど,多々問題があるためとりあえず古典的アルゴリズムを用いた自動売買を構築していこうと思います.
トライで回してみましたがアルゴリズムの工夫次第ではけっこういい線いきそうです.
5.1 価格情報の記録システム
今まではtensorboardを使用した記録システムをもちいていました.リアルタイム可視化にはとてもよいのですが,自分の指標も同時に描画したり,領域を塗りつぶしたりするのは結構骨が折れそうということでmatplotlibベースの記録システムに移行していきます.
ベースとなるソースコードはこちらのサイトからお借りしました.
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94
その結果,改良したコードがこちら
import json
import websocket
import os
from time import sleep
from logging import getLogger,INFO,StreamHandler
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)
import pprint
from datetime import datetime as dt
"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):
def __init__(self, url, channel):
self.url = url
self.channel = channel
self.save_dir = 'output'
os.makedirs(self.save_dir, exist_ok=True)
#Define Websocket
self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
websocket.enableTrace(True)
def run(self):
#ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
self.ws.run_forever()
logger.info('Web Socket process ended.')
"""
Below are callback functions of websocket.
"""
# when we get message
def on_message(self, ws, message):
output = json.loads(message)['params']
# logger.info(output)
pprint.pprint(output)
# ---------------------------------------------------------
# csv write
# get date str
tdatetime = dt.now()
tstr = tdatetime.strftime('%Y%m%d')
tstr2 = tdatetime.strftime('%Y-%m-%d %H:%M:%S.%f')
t_unix = tdatetime.timestamp()
# write info
mid_price = output['message']['mid_price']
write_info = "{}, {}, {}".format(t_unix, tstr2, mid_price)
# create csv files
with open("{}/{}.csv".format(self.save_dir, tstr), 'a') as f:
print(write_info, file=f)
# when error occurs
def on_error(self, ws, error):
logger.error(error)
# when websocket closed.
def on_close(self, ws):
logger.info('disconnected streaming server')
# when websocket opened.
def on_open(self, ws):
logger.info('connected streaming server')
output_json = json.dumps(
{'method' : 'subscribe',
'params' : {'channel' : self.channel}
}
)
ws.send(output_json)
if __name__ == '__main__':
#API endpoint
url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
# channel = 'lightning_board_snapshot_BTC_JPY'
channel = 'lightning_board_snapshot_XLM_JPY'
json_rpc = RealtimeAPI(url=url, channel=channel)
#ctrl + cで終了
json_rpc.run()
mid_priceとUnix時刻,時刻情報をCSVに逐次追記する関数です.下記の画像のようにひたすら追記されます.
ただし,1日ごとにファイルが作成されるようになっています.これで記録部分は完成です.次は,描画部分です.
5.2 価格情報の描画システム
記録したCSVを読み込んで描画を行います.5.1でも記載したとおり,matplotlibベースで描画していきます.
リアルタイム可視化の参考サイトはこちらです.
https://techacademy.jp/magazine/34049
import seaborn as sns
from matplotlib import pyplot as plt
import pandas as pd
sns.set(style='darkgrid')
class RealtimeVisual(object):
def __init__(self):
self.csv_dir = 'output'
self.rolling_w = 100
def run(self):
while 1:
# --------------------------------------------------------
# read csv data
df_mid_price = pd.read_csv('{}/{}'.format(self.csv_dir, '20210613.csv'), header=None, names=('t_unix', 'data_str', 'mid_price'))
# --------------------------------------------------------
# rolling
roll = df_mid_price.mid_price.rolling(self.rolling_w)
mid_price_roll = roll.agg(['mean', 'std'])
# --------------------------------------------------------
# figure
plt.figure(num=1, figsize=(6, 6)) #...1
ax = plt.subplot(111)
# move mean
plt.plot(df_mid_price['t_unix'].values, mid_price_roll['mean'], color="g")
# mid price
sns.lineplot(data=df_mid_price, x='t_unix', y='mid_price', ci=95, color='orange')
# conf area
ax.fill_between(df_mid_price['t_unix'].values, mid_price_roll['mean'] - mid_price_roll['std'], mid_price_roll['mean'] + mid_price_roll['std'], alpha=.25)
# figure setting
ax.set_xlim([df_mid_price['t_unix'].values[-500], df_mid_price['t_unix'].values[-1]])
# figure end section
plt.pause(0.01)
ax.cla()
if __name__ == '__main__':
RealVis = RealtimeVisual()
RealVis.run()
こちらのコードで下記のようなリアルタイムの描画が可能です.
5.3 バックテストの環境構築
作成中
5.4 β版自動売買プログラムの稼働状況
おわりに
進捗があり次第更新していきます.