23
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Pythonその4Advent Calendar 2019

Day 21

自動ざくざく、ビットコイン。Python初心者がコインチェックの1分足チャートを作った話

Last updated at Posted at 2019-12-18

概要

仮想通貨取引(ビットコイン)の自動売買が普通になってきたので,コインチェックで自動売買をするため,Python初心者が1分足を自前で生成する話.コインチェックで公開されているAPIでは1分足を取得できなかったので,自作した.コアとなるロジックを簡単に説明した後,全ソースコードと実行結果を載せる

準備

コインチェックが提供しているAPIを確認する.コインチェックではRESTでAPIを提供しているが,一部WebSocketを使ったAPIも提供している.現在のところ,WebSocketで取得できるのは

  • 取引履歴
  • 板情報

であり,1分足を作成するには取引履歴を使う.取引履歴の使い方を見ると,

REQUEST
{
  "type": "subscribe",
  "channel": "[pair]-trades"
}

がリクエストのフォーマット.

RESPONSE
[
  "ID",
  "取引ペア",
  "注文のレート",
  "注文での量",
  "注文方法"
]

がレスポンスのフォーマットであることが確認できる.今回,ビットコインの1分足を作るので,[pair]はbtc_jpyであるため,リクエストの指定には"btc_jpn-trades"を利用する.
また,pythonでwebsocketを使うため,websocket-clientをpipでインストールしておく

基本設計

1分足の作成は,基本的に以下の手順で行う

  1. WebSocketから取引履歴を逐次受信
  2. 1分で区切り,最初の取引履歴の注文レートをopenとする.
  3. 取引履歴を受信しながら,注文レートを比較し,highとlowを更新する.そして,closeは常に更新し続ける.
  4. その他,注文量,注文方法で買いと売りのvolumeを加算する

以下,この手順を実装したコード断片

	def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0) # 1分をつくるため,secondとmicrosecondは0にする
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:       # 1分の範囲内なら更新
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0            
        else:                        # 1分の範囲外なら新しく足を追加
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )

なお,この関数はWebSocketで取引履歴を受信するたびに呼び出される.取引履歴を受信するために,コールバックの設定を行う.以下,コード断片


self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message, # 取引情報を受信するコールバック
                                        on_close = self.__on_close,
                                        on_open = self.__on_open, # WebSocketが開いた直後に呼ばれるコールバック
                                        on_error = self.__on_error)

def __on_open(self, ws): 
    self.ws.send(json.dumps({   # このメソッド内部で受信したい通貨ペアを送信する
        "type": "subscribe",
        "channel": "btc_jpy-trades"
    }))   

def __on_message(self, ws, message):
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade) # ここで足のデータを更新する

データの補完

ここまでの実装で基本的には1分足データは作れるが,取引履歴がなければ1分足も作れないことになる.そのため,仮に1分間まったく取引がなければ,1分間データが欠落してしまう.そこで,30秒毎にデータをチェックし,欠損が起きないようにする.以下がそのコード断片.なお,この処理は定期的に行う必要があるため,別スレッドで実行する.


	while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:   # 30秒待つ
                print("wait until 30")
                _timer_count += 1
                continue

            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            # 現在時刻の1分範囲じゃない
            if last_ts != mark_ts:
                self.__candle.append(  # 最後の1分足のデータを使って,新しい1分足データを作る
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            self.__thread_unlock()
            _timer_count = 0

接続切れ対応

実際にやってみると,コインチェックのWebSocketはよく切断される.切断されるときには,その前にon_errorが呼ばれる.また,何回か切れるタイミングを観察していると,そもそもon_openが呼ばれずに,そのままタイムアウトする場合があることがわかった.切れた途端に1分足が作れなくなってしまうのは困るので,切断されたらこれまで作成した足データはそのままに,再接続したい.さらに,on_openが呼ばれない場合,100%切れるので,3秒以内に再接続を試みたい.ということで,以下の処理を追加する


    def __on_error(self, ws, error):
        self.__reconnect()  # エラーが起きたら再接続

    def __candle_thread_terminate(self):
        self.__connected = True # データ補完用スレッドが,接続待ちのビジーループになっている場合,ループを抜ける
        time.sleep(1.5)


    def __exit(self):
        self.ws.close()    # WebSocketを切断
        self.__candle_thread_terminate() # データ補完用のスレッドを終了する
        

    def __reconnect(self):
        self.__exit()  # 明示的に接続を切る
        time.sleep(2)
        self.__connect() # コインチェックWebSocketで接続

# データ補完用スレッドが実行するメソッド
def __check_candle(self, args):
    _error_count = 0
    while True:
        if not self.__connected: #WebSocketの接続が確立されていない(on_messageが1度も呼ばれない)場合待つ
            _error_count += 1 
            self._logger.debug("wait 1 sec")
            time.sleep(1)

            if not self.__opened and _error_count > 3:  # 3秒待ってもon_openが呼ばれない場合,再接続する
                self.ws.on_error = None # 再接続時にon_errorが呼び出されて2回接続しに行かないようにする
                term_thread = threading.Thread(target=lambda: self.__reconnect()) # 別スレッドでやらないと,このスレッドを終了できない
                term_thread.start()
                break           

        else:
            break        

基本的には,ソースコード中のコメント通りの処理をしているが,3秒待っても接続できない場合,データ補完用のスレッドから直接self.__reconnect()を呼び出してしまうと,そのスレッド自身を終了できないため,別スレッドで実行する必要がある,というのがハマりどころだった.別スレッドで実行しないと,切断されるたびにデータ補完用スレッドが倍々で増えていくという,恐ろしい状態になった

全ソースコードと実行結果

coincheck_websocket
import websocket
import json
import threading
import time

from datetime import datetime

class CoinCheckWebSocket:

    URL = "wss://ws-api.coincheck.com/"
    MAX_CANDLE_LEN = 24 * 60

    def __init__(self):
        self.__candle = []
        self.__init_flg = False

        self.__f = open("coin1m.log", 'w')

        self.__lock = threading.Lock()

        self.__connect()
    

    def __connect(self):
        self.__connected = False
        self.__opened = False
        self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
                                        on_message = self.__on_message,
                                        on_close = self.__on_close,
                                        on_open = self.__on_open,
                                        on_error = self.__on_error)

        self.wst = threading.Thread(target=lambda: self.ws.run_forever())
        self.wst.daemon = True
        self.wst.start()


        # Candle Thread
        self._check_candle_thread = threading.Thread(
            target = self.__check_candle, args=("check_candle",)
        )
        self._check_candle_thread.daemon = True
        self._check_candle_thread.start()
        print("check candle thread start")
    

        

    def __on_open(self, ws):
        print("open")
        
        self.ws.send(json.dumps({
            "type": "subscribe",
            "channel": "btc_jpy-trades"
        }))        

        self.__opened = True

    def __on_error(self, ws, error):
        print("error")
        print(error)
        self.__reconnect()

    def __on_close(self, ws):
        print("close, why close")

    def __on_message(self, ws, message):
        #print(message)
        if self.__connected == False:
            self.__connected = True

        trade = self.__make_trade(message)
        #print(trade)
        if self.__init_flg == False:
            self.__init_candle_data(trade)
            self.__init_flg = True
        else:
            self.__update_candle_data(trade)

    def __candle_thread_terminate(self):
        print("__candle_thread_terminate invoked")
        self.__connected = True # waitで止まっていることもあるため
        time.sleep(1.5)


    def __exit(self):
        print("__exit invoked")
        self.ws.close()
        self.__candle_thread_terminate()
        

    def __reconnect(self):
        print("__reconnect invoked")
        self.__exit()
        time.sleep(2)
        self.__connect()

    def __make_trade(self, message):
        elements = message.split(',')
        trade = {
            "id": int(elements[0][1:len(elements[0])-1]),
            "price": float(elements[2][1:len(elements[2])-1]),
            "volume": float(elements[3][1:len(elements[3])-1]),
            "type": elements[4][1:len(elements[4])-2]
        }    
        return trade

    def __thread_lock(self):
        _count = 0
        while self.__lock.acquire(blocking=True, timeout=1) == False:
            _count += 1
            if _count > 3:
                print("lock acquite timeout")
                return False
        return True

    def __thread_unlock(self):
        try:
            self.__lock.release()
        except Exception as e:
            print("lock release a {}".format(e))
            return False
        return True

    def __format_candle(self, candle):
        dt = datetime.fromtimestamp(candle["timestamp"])
        s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
        fmt_str = "%s  %.1f  %.1f  %.1f  %.1f   %.6f   %.6f   %.6f" % (s_str, 
                                candle["open"],
                                candle["high"],
                                candle["low"],
                                candle["close"],
                                candle["volume"],
                                candle["buy"],
                                candle["sell"],
                                ) 
        return fmt_str

    def _write_log(self, candle):
        fmt_str = self.__format_candle(candle)
        fmt_str += '\r\n'        
        self.__f.write(fmt_str)
        self.__f.flush()

    def _print_log(self, candle):
        fmt_str = self.__format_candle(candle)
        print(fmt_str)
        

    def __init_candle_data(self, trade):
        _dt = datetime.now().replace(second=0, microsecond=0)
        _stamp = _dt.timestamp()
        self.__candle.append(
            {
                "timestamp": _stamp,
                "open": trade["price"],
                "high": trade["price"],
                "low": trade["price"],
                "close": trade["price"],
                "volume": trade["volume"],
                "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                "sell": trade["volume"] if trade["type"] == 'sell' else 0,
            }
        )                

    def __update_candle_data(self, trade):
        last_candle = self.__candle[-1]        
        _dt = datetime.now().replace(second=0, microsecond=0)
        mark_ts = _dt.timestamp()
        last_ts = last_candle["timestamp"]
        if last_ts == mark_ts:
            print("append")
            last_candle["high"] = max(last_candle["high"], trade["price"])
            last_candle["low"] = min(last_candle["low"], trade["price"])
            last_candle["close"] = trade["price"]
            last_candle["volume"] += trade["volume"]
            last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
            last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0 
            
            self._print_log(last_candle)
        else:
            print("add new")
            self._write_log(last_candle)
            self.__candle.append(
                {
                    "timestamp": mark_ts,
                    "open": trade["price"],
                    "high": trade["price"],
                    "low": trade["price"],
                    "close": trade["price"],
                    "volume": trade["volume"],
                    "buy": trade["volume"] if trade["type"] == 'buy' else 0,
                    "sell": trade["volume"] if trade["type"] == 'sell' else 0,
                }
            )
    def get_candle(self, type=0):
        self.__thread_lock()
        if type == 0:
            candle = self.__candle[:-1]
        else:
            candle = self.__candle[:]
        self.__thread_unlock()
        return candle

    def __check_candle(self, args):
        _error_count = 0
        while True:
            if not self.__connected:
                _error_count += 1                
                print("wait 1 sec")
                time.sleep(1)

                if not self.__opened and _error_count > 3:
                    #print("nonono reconnect!!!")
                    self.ws.on_error = None # 2回呼ばれることを回避
                    term_thread = threading.Thread(target=lambda: self.__reconnect()) # 別スレッドでやらないと,このスレッドを終了できない
                    term_thread.start()
                    break           

            else:
                break        
        
                
        _timer_count = 0
        while self.ws.sock and self.ws.sock.connected:
            
            time.sleep(1)
            if _timer_count < 30:
                print("wait until 30")
                _timer_count += 1
                continue

            print(">>>>>>check candle")


            self.__thread_lock()

            _dt = datetime.now().replace(second=0, microsecond=0)
            mark_ts = _dt.timestamp()
            last_candle = self.__candle[-1]
            last_ts = last_candle["timestamp"]
            # 現在時刻の1分範囲じゃない
            if last_ts != mark_ts:
                print("---->>>>>>>  new in check candle")
                self._write_log(last_candle)
                self.__candle.append(
                    {
                        "timestamp": mark_ts,
                        "open": last_candle["close"],
                        "high": last_candle["close"],
                        "low": last_candle["close"],
                        "close": last_candle["close"],
                        "volume": 0,
                        "buy": 0,
                        "sell": 0,
                    }
                )
            if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
                self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]

            self.__thread_unlock()
            _timer_count = 0

        print("check candle end")

if __name__ == "__main__":    
    chs = CoinCheckWebSocket()

    while True:
        time.sleep(60)

以下のように実行すると,coin1m.logというファイルを生成し,その中に1分足のデータを書き出す

>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00  750169.0  750169.0  749685.0  749714.0   1.265700   0.000000   1.265700
2019-12-19 03:31:00  749685.0  750428.0  749685.0  750415.0   0.348400   0.169315   0.179085
2019-12-19 03:32:00  750481.0  750481.0  750152.0  750152.0   0.347950   0.050000   0.297950

以上,細かい処理の説明は省きましたが,何か間違いや疑問点などあれば何でもどうぞ.ただし,Python初心者です。そして投資は自己責任w

感想

明示的な型がほしいヽ(´・ω・)ノ

23
17
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?