概要
仮想通貨取引(ビットコイン)の自動売買が普通になってきたので,コインチェックで自動売買をするため,Python初心者が1分足を自前で生成する話.コインチェックで公開されているAPIでは1分足を取得できなかったので,自作した.コアとなるロジックを簡単に説明した後,全ソースコードと実行結果を載せる
準備
コインチェックが提供しているAPIを確認する.コインチェックではRESTでAPIを提供しているが,一部WebSocketを使ったAPIも提供している.現在のところ,WebSocketで取得できるのは
- 取引履歴
- 板情報
であり,1分足を作成するには取引履歴を使う.取引履歴の使い方を見ると,
{
"type": "subscribe",
"channel": "[pair]-trades"
}
がリクエストのフォーマット.
[
"ID",
"取引ペア",
"注文のレート",
"注文での量",
"注文方法"
]
がレスポンスのフォーマットであることが確認できる.今回,ビットコインの1分足を作るので,[pair]はbtc_jpy
であるため,リクエストの指定には"btc_jpn-trades"を利用する.
また,pythonでwebsocketを使うため,websocket-clientをpipでインストールしておく
基本設計
1分足の作成は,基本的に以下の手順で行う
- WebSocketから取引履歴を逐次受信
- 1分で区切り,最初の取引履歴の注文レートをopenとする.
- 取引履歴を受信しながら,注文レートを比較し,highとlowを更新する.そして,closeは常に更新し続ける.
- その他,注文量,注文方法で買いと売りのvolumeを加算する
以下,この手順を実装したコード断片
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0) # 1分をつくるため,secondとmicrosecondは0にする
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts: # 1分の範囲内なら更新
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
else: # 1分の範囲外なら新しく足を追加
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
なお,この関数はWebSocketで取引履歴を受信するたびに呼び出される.取引履歴を受信するために,コールバックの設定を行う.以下,コード断片
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message, # 取引情報を受信するコールバック
on_close = self.__on_close,
on_open = self.__on_open, # WebSocketが開いた直後に呼ばれるコールバック
on_error = self.__on_error)
def __on_open(self, ws):
self.ws.send(json.dumps({ # このメソッド内部で受信したい通貨ペアを送信する
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
def __on_message(self, ws, message):
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade) # ここで足のデータを更新する
データの補完
ここまでの実装で基本的には1分足データは作れるが,取引履歴がなければ1分足も作れないことになる.そのため,仮に1分間まったく取引がなければ,1分間データが欠落してしまう.そこで,30秒毎にデータをチェックし,欠損が起きないようにする.以下がそのコード断片.なお,この処理は定期的に行う必要があるため,別スレッドで実行する.
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30: # 30秒待つ
print("wait until 30")
_timer_count += 1
continue
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
# 現在時刻の1分範囲じゃない
if last_ts != mark_ts:
self.__candle.append( # 最後の1分足のデータを使って,新しい1分足データを作る
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
self.__thread_unlock()
_timer_count = 0
接続切れ対応
実際にやってみると,コインチェックのWebSocketはよく切断される.切断されるときには,その前にon_error
が呼ばれる.また,何回か切れるタイミングを観察していると,そもそもon_open
が呼ばれずに,そのままタイムアウトする場合があることがわかった.切れた途端に1分足が作れなくなってしまうのは困るので,切断されたらこれまで作成した足データはそのままに,再接続したい.さらに,on_open
が呼ばれない場合,100%切れるので,3秒以内に再接続を試みたい.ということで,以下の処理を追加する
def __on_error(self, ws, error):
self.__reconnect() # エラーが起きたら再接続
def __candle_thread_terminate(self):
self.__connected = True # データ補完用スレッドが,接続待ちのビジーループになっている場合,ループを抜ける
time.sleep(1.5)
def __exit(self):
self.ws.close() # WebSocketを切断
self.__candle_thread_terminate() # データ補完用のスレッドを終了する
def __reconnect(self):
self.__exit() # 明示的に接続を切る
time.sleep(2)
self.__connect() # コインチェックWebSocketで接続
# データ補完用スレッドが実行するメソッド
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected: #WebSocketの接続が確立されていない(on_messageが1度も呼ばれない)場合待つ
_error_count += 1
self._logger.debug("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3: # 3秒待ってもon_openが呼ばれない場合,再接続する
self.ws.on_error = None # 再接続時にon_errorが呼び出されて2回接続しに行かないようにする
term_thread = threading.Thread(target=lambda: self.__reconnect()) # 別スレッドでやらないと,このスレッドを終了できない
term_thread.start()
break
else:
break
基本的には,ソースコード中のコメント通りの処理をしているが,3秒待っても接続できない場合,データ補完用のスレッドから直接self.__reconnect()
を呼び出してしまうと,そのスレッド自身を終了できないため,別スレッドで実行する必要がある,というのがハマりどころだった.別スレッドで実行しないと,切断されるたびにデータ補完用スレッドが倍々で増えていくという,恐ろしい状態になった
全ソースコードと実行結果
import websocket
import json
import threading
import time
from datetime import datetime
class CoinCheckWebSocket:
URL = "wss://ws-api.coincheck.com/"
MAX_CANDLE_LEN = 24 * 60
def __init__(self):
self.__candle = []
self.__init_flg = False
self.__f = open("coin1m.log", 'w')
self.__lock = threading.Lock()
self.__connect()
def __connect(self):
self.__connected = False
self.__opened = False
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message,
on_close = self.__on_close,
on_open = self.__on_open,
on_error = self.__on_error)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
# Candle Thread
self._check_candle_thread = threading.Thread(
target = self.__check_candle, args=("check_candle",)
)
self._check_candle_thread.daemon = True
self._check_candle_thread.start()
print("check candle thread start")
def __on_open(self, ws):
print("open")
self.ws.send(json.dumps({
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
self.__opened = True
def __on_error(self, ws, error):
print("error")
print(error)
self.__reconnect()
def __on_close(self, ws):
print("close, why close")
def __on_message(self, ws, message):
#print(message)
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
#print(trade)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade)
def __candle_thread_terminate(self):
print("__candle_thread_terminate invoked")
self.__connected = True # waitで止まっていることもあるため
time.sleep(1.5)
def __exit(self):
print("__exit invoked")
self.ws.close()
self.__candle_thread_terminate()
def __reconnect(self):
print("__reconnect invoked")
self.__exit()
time.sleep(2)
self.__connect()
def __make_trade(self, message):
elements = message.split(',')
trade = {
"id": int(elements[0][1:len(elements[0])-1]),
"price": float(elements[2][1:len(elements[2])-1]),
"volume": float(elements[3][1:len(elements[3])-1]),
"type": elements[4][1:len(elements[4])-2]
}
return trade
def __thread_lock(self):
_count = 0
while self.__lock.acquire(blocking=True, timeout=1) == False:
_count += 1
if _count > 3:
print("lock acquite timeout")
return False
return True
def __thread_unlock(self):
try:
self.__lock.release()
except Exception as e:
print("lock release a {}".format(e))
return False
return True
def __format_candle(self, candle):
dt = datetime.fromtimestamp(candle["timestamp"])
s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
fmt_str = "%s %.1f %.1f %.1f %.1f %.6f %.6f %.6f" % (s_str,
candle["open"],
candle["high"],
candle["low"],
candle["close"],
candle["volume"],
candle["buy"],
candle["sell"],
)
return fmt_str
def _write_log(self, candle):
fmt_str = self.__format_candle(candle)
fmt_str += '\r\n'
self.__f.write(fmt_str)
self.__f.flush()
def _print_log(self, candle):
fmt_str = self.__format_candle(candle)
print(fmt_str)
def __init_candle_data(self, trade):
_dt = datetime.now().replace(second=0, microsecond=0)
_stamp = _dt.timestamp()
self.__candle.append(
{
"timestamp": _stamp,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts:
print("append")
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
self._print_log(last_candle)
else:
print("add new")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def get_candle(self, type=0):
self.__thread_lock()
if type == 0:
candle = self.__candle[:-1]
else:
candle = self.__candle[:]
self.__thread_unlock()
return candle
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected:
_error_count += 1
print("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3:
#print("nonono reconnect!!!")
self.ws.on_error = None # 2回呼ばれることを回避
term_thread = threading.Thread(target=lambda: self.__reconnect()) # 別スレッドでやらないと,このスレッドを終了できない
term_thread.start()
break
else:
break
_timer_count = 0
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30:
print("wait until 30")
_timer_count += 1
continue
print(">>>>>>check candle")
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
# 現在時刻の1分範囲じゃない
if last_ts != mark_ts:
print("---->>>>>>> new in check candle")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]
self.__thread_unlock()
_timer_count = 0
print("check candle end")
if __name__ == "__main__":
chs = CoinCheckWebSocket()
while True:
time.sleep(60)
以下のように実行すると,coin1m.logというファイルを生成し,その中に1分足のデータを書き出す
>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00 750169.0 750169.0 749685.0 749714.0 1.265700 0.000000 1.265700
2019-12-19 03:31:00 749685.0 750428.0 749685.0 750415.0 0.348400 0.169315 0.179085
2019-12-19 03:32:00 750481.0 750481.0 750152.0 750152.0 0.347950 0.050000 0.297950
以上,細かい処理の説明は省きましたが,何か間違いや疑問点などあれば何でもどうぞ.ただし,Python初心者です。そして投資は自己責任w
感想
明示的な型がほしいヽ(´・ω・)ノ