LoginSignup
16
17

More than 1 year has passed since last update.

仮想通貨わらしべ長者大作戦~少額で自動売買~

Last updated at Posted at 2021-04-29

概要

少額の軍資金から自動売買をプログラムして増やしていく企画です.

0.更新履歴

2021年04月29日 初版投稿
2021年04月29日 板情報の記録追加
2021年04月30日 TensorBoardでリアルタイム可視化追加
2021年04月30日 論文読み追加(Bitcoin Price Prediction Based on Deep Learning Methods)
2021年05月02日 GPU環境の構築
2021年05月03日 お手軽時系列予測
2021年06月13日 古典的な簡易自動売買システムの構築
2021年06月29日 β版自動売買プログラムの稼働状況

image.png

1.調査

1.1 ウォレット関係

API Documentation
https://lightning.bitflyer.com/docs?lang=ja

注文数量について
https://bitflyer.com/ja-jp/faq/4-27

Realtime API
https://bf-lightning-api.readme.io/docs/realtime-api

1.2 論文関係

Bitcoin Price Prediction Based on Deep Learning Methods
https://www.researchgate.net/publication/339143532_Bitcoin_Price_Prediction_Based_on_Deep_Learning_Methods
↓↓↓↓↓↓↓
日本語化しました
https://qiita.com/m_tani_july/items/f1220992376008264450

1.3 コード関係

PythonでbitFlyer APIを利用する方法を現役エンジニアが解説【初心者向け】
https://techacademy.jp/magazine/43479

仮想通貨交換所ディーカレット、取引APIの提供を開始
https://crypto.watch.impress.co.jp/docs/news/1243844.html

仮想通貨自動取引入門
https://qiita.com/shionhonda/items/bd2a7aaf143eff4972c4

[仮想通貨取引所]BitlflyerのRealtimeAPIをPythonで呼び出す
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94

TensorBoardの最もシンプルなサンプル
https://qiita.com/yabeenico/items/b0fb24d65de988e59de2

FX予測 : PyTorchのLSTMで時系列データ予測
https://qiita.com/THERE2/items/3c13164c1c82c1dcf4b7

1.4 GPU 関係

NvidiaドライバとCUDAとcuDNNとTensorflow-gpuとPythonのバージョンの対応
https://qiita.com/konzo_/items/a6f2e8818e5e8fcdb896

2.バックテスト環境構築

2.1 板情報の記録

バックテストをするために,apiを用いて板の情報を記録するコードを作成します.
BitlflyerのRealtimeAPIをPythonで呼び出して記録する方法で作成します.
こちらの記事が神なので参考にさせていただきました.
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94

こんなコードで動かしてます.


import json
import websocket
import pprint
import os

from time import sleep
from logging import getLogger,INFO,StreamHandler
from datetime import datetime

logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)

"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):

    def __init__(self, url, channel):
        self.url = url
        self.channel = channel

        #Define Websocket
        self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
        websocket.enableTrace(True)

    def run(self):
        #ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
        self.ws.run_forever()   
        logger.info('Web Socket process ended.')

    """
    Below are callback functions of websocket.
    """
    # when we get message
    def on_message(self, ws, message):
        output = json.loads(message)['params']
        # logger.info(output)
        # pprint.pprint(output)
        save_dict(output)


    # when error occurs
    def on_error(self, ws, error):
        logger.error(error)

    # when websocket closed.
    def on_close(self, ws):
        logger.info('disconnected streaming server')

    # when websocket opened.
    def on_open(self, ws):
        logger.info('connected streaming server')
        output_json = json.dumps(
            {'method' : 'subscribe',
            'params' : {'channel' : self.channel}
            }
        )
        ws.send(output_json)

################################
# json を保存する関数
#
def save_dict(output):
    save_dir = 'realtime_logs/' + datetime.now().strftime("%Y%m%d") + '/'
    os.makedirs(save_dir, exist_ok=True)

    save_path = save_dir + '/realtime_' + datetime.now().strftime("%Y%m%d-%H%M%S.%f") + '.json'
    print("{:*^25}".format(" save json : " + save_path))
    with open(save_path, mode='wt', encoding='utf-8') as file:
        json.dump(output, file, ensure_ascii=False, indent=2)

if __name__ == '__main__':
    #API endpoint
    url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    channel = 'lightning_board_snapshot_XLM_JPY'
    # channel = 'lightning_board_XLM_JPY'
    json_rpc = RealtimeAPI(url=url, channel=channel)
    #ctrl + cで終了
    json_rpc.run()


    # while 1:
    #     try:
    #         json_rpc.run()
    #     except:
    #         pritn("ERROR")
    #         pass

結果,こんな感じで取得した板情報をjsonに保存する様にしました.

image.png

途中でこんなエラーが多発する場合は,時間を置いてから再度実行してみてください.
回線速度が細すぎてもエラーがでるそうです.
私のネットワーク環境ではamazonプライム見ながらだとエラーがでます.

(venv) C:\Users\Documents\GIT\mybitflyer>python realtime_api.py
[WinError 10060] 接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。
disconnected streaming server
Web Socket process ended.

2.2 TensorBoardでリアルタイム可視化

リアルタイムに可視化するツールは数多く出回っています.しかし,移動平均が標準装備である点やその気になれば画像とかも描画でき,尚且つ表示させたい項目をポチポチ選べるのはTensorBoardだけではないでしょうか?
しかも使い方が簡単!!

と言うことでTensorBoardを使って可視化していこうと思います.
*解析とかはmatplotlibとか使うかもしれません.

TensorBoardを使うためにインストール

pip install tensorflow tensorboard

下記コードでTensorBoardの設定をして

realtime_api.py
################################
# tensorboardの設定
#
logdir = "logs/bitflyer/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()

下記コードで描画をする簡単なコードです.

realtime_api.py
################################
# tensorboardを描画する関数
#
def visial_board(output, i):
    mid_price = output['message']['mid_price']
    print("[{:03d}] mid_price: {}".format(i, mid_price))
    tf.summary.scalar('{}/mid_price'.format(product_code), data=mid_price, step=i)

実行ごとにファイルが作成され,保存されます.

image.png

最終的なソースコードはこちら

realtime_api.py
import json
import websocket
import pprint
import os

import tensorflow as tf
from time import sleep
from logging import getLogger,INFO,StreamHandler
from datetime import datetime

################################
# loggerの設定
#
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)

################################
# tensorboardの設定
#
logdir = "logs/bitflyer/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()

################################
# Bitflyerの設定
#
product_code       = 'XLM_JPY'

"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):

    def __init__(self, url, channel):
        self.url = url
        self.channel = channel
        self.i = 0

        #Define Websocket
        self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
        websocket.enableTrace(True)

    def run(self):
        #ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
        self.ws.run_forever()   
        logger.info('Web Socket process ended.')

    """
    Below are callback functions of websocket.
    """
    # when we get message
    def on_message(self, ws, message):
        output = json.loads(message)['params']
        # logger.info(output)
        # pprint.pprint(output)
        save_dict(output)
        visial_board(output, self.i)
        self.i += 1


    # when error occurs
    def on_error(self, ws, error):
        logger.error(error)

    # when websocket closed.
    def on_close(self, ws):
        logger.info('disconnected streaming server')

    # when websocket opened.
    def on_open(self, ws):
        logger.info('connected streaming server')
        output_json = json.dumps(
            {'method' : 'subscribe',
            'params' : {'channel' : self.channel}
            }
        )
        ws.send(output_json)


################################
# json を保存する関数
#
def save_dict(output):
    save_dir = 'realtime_logs/' + datetime.now().strftime("%Y%m%d") + '/'
    os.makedirs(save_dir, exist_ok=True)

    save_path = save_dir + '/realtime_' + datetime.now().strftime("%Y%m%d-%H%M%S.%f") + '.json'
    print("{:*^25}".format(" save json : " + save_path))
    with open(save_path, mode='wt', encoding='utf-8') as file:
        json.dump(output, file, ensure_ascii=False, indent=2)

################################
# tensorboardを描画する関数
#
def visial_board(output, i):
    mid_price = output['message']['mid_price']
    print("[{:03d}] mid_price: {}".format(i, mid_price))
    tf.summary.scalar('{}/mid_price'.format(product_code), data=mid_price, step=i)

if __name__ == '__main__':
    #API endpoint
    url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    channel = 'lightning_board_snapshot_' + product_code
    # channel = 'lightning_board_XLM_JPY'
    json_rpc = RealtimeAPI(url=url, channel=channel)
    #ctrl + cで終了
    # json_rpc.run()


    while 1:
        try:
            json_rpc.run()
        except:
            pritn("ERROR")
            pass

結果のグラフはこちら.
これで楽々移動平均のグラフが描けますね.

image.png

3. GPU関係の設定

最終的にはdeeplearningを使っていきたいと思いますので,その環境を構築していきます.
PyTorchのサイトから,cuda10.2を使う前提でインストールを進めていきます.

image.png

3.1 nvidiaドライバインストール

こちらのサイトから
RTX2070を使用しているので,こちらの条件で検索します.

image.png

こんな画面がでてきたのでこれをインストールします.

image.png

ここで,cuda10.2にドライバが対応しているかをこちらのサイトで確認しておいてください.
image.png

下記コマンドでインストールの確認ができます.

C:\Windows\System32\nvidia-smi.exe

image.png

3.2 cudaインストール

ここから環境に合った選択肢を選択して,cuda10.2をインストール

image.png

3.3 PyTorchインストール

ここから選択肢を選択して,インストールコードをコピー

image.png

仮想環境にインストール

image.png

3.4 PyTorch + cuda起動確認

下記コードでGPUが設定されているか確認


import numpy as np 
import pandas as pd 
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import pickle
from datetime import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gc

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(1)

print("device:{}".format(device))

image.png

ちゃんとcudaになっていますね!
これでGPU環境構築は完了です.

4. お手軽DeepLaring時系列予測

lstmの練習も兼ねて,ネットにあるlstmの時系列予測するソースコードを改変しました.
kaggleのbitcoinのデータセットに適応させ,結果を可視化する簡単なものになります.

4.1 参照サイト

FX予測 : PyTorchのLSTMで時系列データ予測
https://qiita.com/THERE2/items/3c13164c1c82c1dcf4b7

Bitcoin Historical Data
https://www.kaggle.com/mczielinski/bitcoin-historical-data

4.2 ソースコード


import numpy as np 
import pandas as pd 
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import pickle
from datetime import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gc
import os

import tensorflow as tf
from matplotlib import pyplot
import matplotlib.pyplot as plt
from tqdm import tqdm

#######################################
# tensorboard
#
logdir = "logs/lstm_sample/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()


#######################################
# ハイパーパラメータ
#
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(1)

future_num = 60 #何足先を予測するか
feature_num = 8 #volume, open, high, low, closeの5項目
batch_size = 512 * 200
view_size  = 64
time_steps = 60 # lstmのtimesteps
# moving_average_num = 500 # 移動平均を取るCandle数
n_epocs = 300
#データをtrain, testに分割するIndex
# val_idx_from  = 3000000
# test_idx_from = 4000000
val_idx_from  = 4700000
test_idx_from = 4800000

lstm_hidden_dim = 16
target_dim = 1
save_dir = logdir + '/output'
os.makedirs(save_dir, exist_ok=True)


class LSTMClassifier(nn.Module):
    def __init__(self, lstm_input_dim, lstm_hidden_dim, target_dim):
        super(LSTMClassifier, self).__init__()
        self.input_dim = lstm_input_dim
        self.hidden_dim = lstm_hidden_dim
        self.lstm = nn.LSTM(input_size=lstm_input_dim, 
                            hidden_size=lstm_hidden_dim,
                            num_layers=1, #default
                            #dropout=0.2,
                            batch_first=True
                            )
        self.dense = nn.Linear(lstm_hidden_dim, target_dim)

    def forward(self, X_input):
        _, lstm_out = self.lstm(X_input)
        # LSTMの最終出力のみを利用する。
        linear_out = self.dense(lstm_out[0].view(X_input.size(0), -1))
        return torch.sigmoid(linear_out)

def prep_feature_data(batch_idx, time_steps, X_data, feature_num, device):
    feats = torch.zeros((len(batch_idx), time_steps, feature_num), dtype=torch.float, device=device)
    for b_i, b_idx in enumerate(batch_idx):
        # 過去のN足分をtime stepのデータとして格納する。
        b_slc = slice(b_idx + 1 - time_steps ,b_idx + 1)
        feats[b_i, :, :] = X_data[b_slc, :]

    return feats

def main():
    print("\n{:>^30}".format(" init ").upper())
    print(">> {:<15}:{}".format("device", device))

    print("\n{:>^30}".format(" datasets load ").upper())
    datasets_path = r"datasets\bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv"
    df = pd.read_csv(datasets_path)
    print(">> {}".format("origin data"))
    print(df)

    print(">> {}".format("replace nan data"))
    df = df.fillna(method='ffill')
    print(df)

    # 2. 教師データの作成
    future_price = df.iloc[future_num:]['Weighted_Price'].values
    curr_price = df.iloc[:-future_num]['Weighted_Price'].values
    y_data_tmp = future_price - curr_price
    y_data = np.zeros_like(y_data_tmp)
    y_data[y_data_tmp > 0] = 1

    # 3. 価格の正規化
    cols = df.columns
    # for col in cols:
    #     # df['Roll_' + col] = df[col].rolling(window=500, min_periods=500).mean()
    #     df[col] = df[col] / df[col] - 1

    #最初の500足分は移動平均データがないため除く。後半の144足分は予測データがないため除く
    X_data = df.iloc[:-future_num][cols].values

    # 4. データの分割、TorchのTensorに変換
    #学習用データ
    X_train = torch.tensor(X_data[:val_idx_from], dtype=torch.float, device=device)
    y_train = torch.tensor(y_data[:val_idx_from], dtype=torch.float, device=device)
    #評価用データ
    X_val   = torch.tensor(X_data[val_idx_from:test_idx_from], dtype=torch.float, device=device)
    y_val   = y_data[val_idx_from:test_idx_from]
    #テスト用データ
    X_test  = torch.tensor(X_data[test_idx_from:], dtype=torch.float, device=device)
    y_test  = y_data[test_idx_from:]

    print(">> {:<15}:{}".format("X_train", X_train.shape))
    print(">> {:<15}:{}".format("X_val", X_val.shape))
    print(">> {:<15}:{}".format("X_test", X_test.shape))

    # Prepare for training
    print("\n{:>^30}".format(" LSTMClassifier ").upper())
    model = LSTMClassifier(feature_num, lstm_hidden_dim, target_dim).to(device)
    loss_function = nn.BCELoss()
    optimizer= optim.Adam(model.parameters(), lr=1e-4)
    step = 0
    train_size = X_train.size(0)
    best_acc_score = 0
    for epoch in range(n_epocs):
        print(">> {:<15}:{}/{}".format("start epoch", epoch, n_epocs))
        # 1. まずはtrainデータのindexをランダムに入れ替える。最初のtime_steps分は使わない。
        perm_idx = np.random.permutation(np.arange(time_steps, train_size))
        # 2. batch size毎にperm_idxの対象のindexを取得
        for t_i in tqdm(range(0, len(perm_idx), batch_size)):
            batch_idx = perm_idx[t_i:(t_i + batch_size)]
            # 3. LSTM入力用の時系列データの準備
            feats = prep_feature_data(batch_idx, time_steps, X_train, feature_num, device)
            y_target = y_train[batch_idx]

            # 4. pytorch LSTMの学習実施
            model.zero_grad()
            train_scores = model(feats) # batch size x time steps x feature_num
            loss = loss_function(train_scores, y_target.view(-1, 1))
            loss.backward()
            optimizer.step()

            tf.summary.scalar('EPOCH/step', data=loss.item(), step=step)
            step = step + 1
            if(t_i == 0):
                print(">> {:<15}:{}".format("feats", feats.shape))
                print(">> {:<15}:{}".format("y_target", y_target.shape))
                print(">> {:<15}:{}".format("train_scores", train_scores.shape))

                #######################################
                # グラフ描画
                #
                fig = plt.figure(figsize=(16.0, 6.0))
                # plt.plot(x, y, label='train')

                x = list(range(view_size))
                plt.bar(x, y_target[:view_size].detach().cpu().numpy() , align="edge", width=-0.3, label='true')
                plt.bar(x, train_scores[:view_size, 0].detach().cpu().numpy() , align="edge", width= 0.3, label='train')
                plt.xlabel("batch")
                plt.ylabel("Judgment [0 or 1]")
                plt.legend()
                save_path = save_dir + "/bar_epoch{:03d}_ind{:03d}.png".format(epoch, t_i)
                print(save_path)
                fig.savefig(save_path)
                # break

        # 5. validationデータの評価
        print('EPOCH ', str(epoch), ' loss :', loss.item())
        # writer.add_scalar("EPOCH/loss", loss.item(), epoch)
        tf.summary.scalar('EPOCH/loss', data=loss.item(), step=epoch)

        with torch.no_grad():
            feats_val = prep_feature_data(np.arange(time_steps, X_val.size(0)), time_steps, X_val, feature_num, device)
            val_scores = model(feats_val)
            tmp_scores = val_scores.view(-1).to('cpu').numpy()
            bi_scores = np.round(tmp_scores)
            acc_score = accuracy_score(y_val[time_steps:], bi_scores)
            roc_score = roc_auc_score(y_val[time_steps:], tmp_scores)
            print('Val ACC Score :', acc_score, ' ROC AUC Score :', roc_score)
            # writer.add_scalar("EPOCH/Val ACC Score", acc_score, epoch)
            # writer.add_scalar("EPOCH/Val ROC AUC Score", roc_score, epoch)
            tf.summary.scalar('EPOCH/Val ACC Score', data=acc_score, step=epoch)
            tf.summary.scalar('EPOCH/Val ROC AUC Score', data=roc_score, step=epoch)

            #######################################
            # グラフ描画
            #
            fig = plt.figure(figsize=(16.0, 6.0))
            # plt.plot(x, y, label='train')
            x = list(range(view_size))
            plt.bar(x, y_val[time_steps:time_steps+view_size] , align="edge", width=-0.3, label='true')
            plt.bar(x, tmp_scores[:view_size] , align="edge", width= 0.3, label='val')
            plt.xlabel("batch")
            plt.ylabel("Judgment [0 or 1]")
            plt.legend()
            save_path = save_dir + "/bar_epoch{:03d}_val.png".format(epoch)
            print(save_path)
            fig.savefig(save_path)

        # 6. validationの評価が良ければモデルを保存
        torch.save(model.state_dict(),'./models/pytorch_epoch{:03d}.mdl'.format(epoch))
        if acc_score > best_acc_score:
            best_acc_score = acc_score
            torch.save(model.state_dict(),'./models/pytorch_v1.mdl')
            print('best score updated, Pytorch model was saved!!', )

    # 7. bestモデルで予測する。
    model.load_state_dict(torch.load('./models/pytorch_v1.mdl'))
    with torch.no_grad():
        feats_test = prep_feature_data(np.arange(time_steps, X_test.size(0)), time_steps, X_test, feature_num, device)
        val_scores = model(feats_test)
        tmp_scores = val_scores.view(-1).to('cpu').numpy()
        bi_scores = np.round(tmp_scores)
        acc_score = accuracy_score(y_test[time_steps:], bi_scores)
        roc_score = roc_auc_score(y_test[time_steps:], tmp_scores)
        print('Test ACC Score :', acc_score, ' ROC AUC Score :', roc_score)

        print(">> {:<15}:{}".format("feats_test", feats_test.shape))
        print(">> {:<15}:{}".format("y_test", y_test.shape))
        print(">> {:<15}:{}".format("tmp_scores", tmp_scores.shape))

        #######################################
        # グラフ描画
        #
        fig = plt.figure(figsize=(16.0, 6.0))
        # plt.plot(x, y, label='train')
        x = list(range(view_size))
        plt.bar(x, y_test[time_steps:time_steps+view_size] , align="edge", width=-0.3, label='true')
        plt.bar(x, tmp_scores[:view_size] , align="edge", width= 0.3, label='val')
        plt.xlabel("batch")
        plt.ylabel("Judgment [0 or 1]")
        plt.legend()
        save_path = save_dir + "/bar_best.png".format()
        fig.savefig(save_path)

if __name__ == "__main__":
    main()

4.3 お手軽時系列予測結果

下図はlossの学習曲線と精度とROCのグラフです.

image.png

下図は次の60分後に価格が上がるか下がるかを予測したものです.
上がれば1,下がれば0です.
valがモデルが予測した値で,trueが正解値です.

bar_epoch299_val.png

結果を見た通り,0.5を出力することでlossを最小限にするというインチキをネットワークが行ってきました.
これは株の予測でも見られるランダムウォークが原因と考えられます.
なので,このモデルから改良が必要そうですね.

4.4 ソースコードの問題点

今回の結果に関わっているかは不明ですが,このコードにはデータローダーがないです.
データローダーがないため,一度に大量のデータをtest,valしようとしています.
そのため,大分データの数を絞りました.
また,読み出しでマルチスレッド化してないのでその辺も学習時間をロスしてそうです.

5. 古典的な簡易自動売買システムの構築

DeepLaringを使用した自動売買はとてもロマンがあります.しかし,実行時間の問題やGPUを食ってしまう点や4.4で説明したちょっとしたバグなど,多々問題があるためとりあえず古典的アルゴリズムを用いた自動売買を構築していこうと思います.
トライで回してみましたがアルゴリズムの工夫次第ではけっこういい線いきそうです.

5.1 価格情報の記録システム

今まではtensorboardを使用した記録システムをもちいていました.リアルタイム可視化にはとてもよいのですが,自分の指標も同時に描画したり,領域を塗りつぶしたりするのは結構骨が折れそうということでmatplotlibベースの記録システムに移行していきます.

ベースとなるソースコードはこちらのサイトからお借りしました.
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94

その結果,改良したコードがこちら


import json
import websocket
import os 
from time import sleep
from logging import getLogger,INFO,StreamHandler
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)

import pprint
from datetime import datetime as dt

"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):

    def __init__(self, url, channel):
        self.url = url
        self.channel = channel
        self.save_dir = 'output'
        os.makedirs(self.save_dir, exist_ok=True)

        #Define Websocket
        self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
        websocket.enableTrace(True)

    def run(self):
        #ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
        self.ws.run_forever()   
        logger.info('Web Socket process ended.')

    """
    Below are callback functions of websocket.
    """
    # when we get message
    def on_message(self, ws, message):
        output = json.loads(message)['params']
        # logger.info(output)
        pprint.pprint(output)

        # ---------------------------------------------------------
        # csv write 

        # get date str
        tdatetime = dt.now()
        tstr     = tdatetime.strftime('%Y%m%d')
        tstr2   = tdatetime.strftime('%Y-%m-%d %H:%M:%S.%f')
        t_unix = tdatetime.timestamp()

        # write info 
        mid_price = output['message']['mid_price']
        write_info = "{}, {}, {}".format(t_unix, tstr2, mid_price)

        # create csv files
        with open("{}/{}.csv".format(self.save_dir, tstr), 'a') as f:
            print(write_info, file=f)


    # when error occurs
    def on_error(self, ws, error):
        logger.error(error)

    # when websocket closed.
    def on_close(self, ws):
        logger.info('disconnected streaming server')

    # when websocket opened.
    def on_open(self, ws):
        logger.info('connected streaming server')
        output_json = json.dumps(
            {'method' : 'subscribe',
            'params' : {'channel' : self.channel}
            }
        )
        ws.send(output_json)

if __name__ == '__main__':
    #API endpoint
    url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
    # channel = 'lightning_board_snapshot_BTC_JPY'
    channel = 'lightning_board_snapshot_XLM_JPY'

    json_rpc = RealtimeAPI(url=url, channel=channel)
    #ctrl + cで終了
    json_rpc.run()

mid_priceとUnix時刻,時刻情報をCSVに逐次追記する関数です.下記の画像のようにひたすら追記されます.
ただし,1日ごとにファイルが作成されるようになっています.これで記録部分は完成です.次は,描画部分です.

Screenshot from 2021-06-13 13-44-26.png

5.2 価格情報の描画システム

記録したCSVを読み込んで描画を行います.5.1でも記載したとおり,matplotlibベースで描画していきます.

リアルタイム可視化の参考サイトはこちらです.
https://techacademy.jp/magazine/34049

realtime_visual.py

import seaborn as sns
from matplotlib import pyplot as plt
import pandas as pd

sns.set(style='darkgrid')

class RealtimeVisual(object):

    def __init__(self):
        self.csv_dir = 'output'
        self.rolling_w = 100

    def run(self):
        while 1:
            # --------------------------------------------------------
            # read csv data
            df_mid_price = pd.read_csv('{}/{}'.format(self.csv_dir, '20210613.csv'), header=None, names=('t_unix', 'data_str', 'mid_price'))

            # --------------------------------------------------------
            # rolling
            roll = df_mid_price.mid_price.rolling(self.rolling_w)
            mid_price_roll = roll.agg(['mean', 'std'])

            # --------------------------------------------------------
            # figure 
            plt.figure(num=1, figsize=(6, 6)) #...1
            ax = plt.subplot(111)

            # move mean
            plt.plot(df_mid_price['t_unix'].values, mid_price_roll['mean'], color="g")

            # mid price 
            sns.lineplot(data=df_mid_price, x='t_unix', y='mid_price', ci=95, color='orange')

            # conf area 
            ax.fill_between(df_mid_price['t_unix'].values, mid_price_roll['mean'] - mid_price_roll['std'], mid_price_roll['mean'] + mid_price_roll['std'], alpha=.25)

            # figure setting
            ax.set_xlim([df_mid_price['t_unix'].values[-500], df_mid_price['t_unix'].values[-1]])

            # figure end section
            plt.pause(0.01)
            ax.cla()

if __name__ == '__main__':
    RealVis = RealtimeVisual()
    RealVis.run()

こちらのコードで下記のようなリアルタイムの描画が可能です.

Screenshot from 2021-06-13 14-51-43.png

5.3 バックテストの環境構築

作成中

5.4 β版自動売買プログラムの稼働状況

おわりに

進捗があり次第更新していきます.

16
17
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
17