はじめに
この記事は、仮想通貨botter Advent Calendar 2024 の23日目の記事です。
はじめまして!
カモネギと申します。好きな言葉は「一物二価」です。よろしくお願いします!
もともと株と為替をちょこちょこやっていて、一年くらい前から仮想通貨botter活動を開始しました。自分も何かアウトプットしたい。けどポロリはしたくない。という思いから、これからbotを作る人向けに、CEX-botのひな型くらいは紹介できるかなと考えて、簡単な記事を書いてみました。
対象読者
- トレードアイデアはある
- APIラッパーはインストールした
- で、これをどうやって自動で動かすの?
くらいの方を想定しています。
あくまで自分の作り方の一例ではありますが、これからbotを作る際の参考になれば幸いです。
ちなみに、もっと高度なbotは、hayayu0(林)さんから紹介されております。
クラウド上のサーバーレスのbotを支えるサービス
botの基本構成
まずは構成を説明して、そのあとに具体的なコードを紹介したいと思います。
見ての通り、botの構成は、初期化処理、実行中処理、終了処理の3つです。
botと取引所の間には取引所APIラッパーがあります。
取引所とのやりとりは主に2つ、REST APIとWebSocketで行うのですが、botからみると、取引所APIラッパーが持っている関数を実行してその結果を取得することになります。
botの構成について、順にみていきましょう。
初期化処理
ここでは以下のような処理を実行して、準備が整ったら実行中処理に移ります。
- APIキーとシークレットを環境変数から読み込む
- 取引所のキャッシュ、コインの残高を取得する
- WebSocketの接続を待つ
実行中処理
実行中処理の中では複数の無限ループが非同期で実行されます。絵の中には代表して3つのループを書きましたが、トレード中に実行し続けたい処理をここに入れます。
- マーケットデータの更新
- プライベート情報の更新
- トレードロジック
- ヘルスチェック
- ユーザー入力待ち
終了処理
ユーザーが終了コマンドを入力したり、エラーが発生すると実行中処理を抜けて終了処理が走ります。
APIラッパーについて
突然ですが、みなさん、どのAPIラッパーを使っていますか?
cctxとpybottersが有名どころですかね。ゼロから自作している方もいるかもしれませんね。
自分は、ドースーさんのラッパーです!
シンプルで飽きのこない設計で、取引所に応じて自分で好きにカスタマイズできます。まだどのラッパーを使うか迷っていたら、ネットで探せばすぐ出てくるので、パクって使いましょう参考にさせてもらいましょう!
これから紹介する作成例でもドースーさんのラッパーを使っています。
他のラッパーでも同じことができるはずです。
実際のbot作成例
ここでは、(約)1分毎にltpの±2%に指し続けるトレードを行うbotを作ってみたいと思います
いきなりですが、全体のコードをまず紹介します。
行数減らしたくて変な書き方しているのと、エラートラップも書いてないですが、一応動くと思います。
言わずもがなですが、自己責任でお願いします。
import asyncio
from gmocoin import GMOCoin
suspend_event = asyncio.Event()
api_key = 'your_api_key'
api_secret = 'your_api_secret'
class Bot():
private_queue = asyncio.Queue()
public_queue = asyncio.Queue()
suspend_queue = asyncio.Queue()
def __init__(self):
self.symbol = 'BTC'
self.ltp_dict = {'ltp': 0, 'timestamp': 0}
self.inventory = {'cash': 0, 'coin': 0}
self.gmocoin = GMOCoin(api_key, api_secret)
async def get_ltp(self):
while True:
data = await self.public_queue.get()
self.ltp_dict['ltp'] = data['ltp']
self.ltp_dict['timestamp'] = data['timestamp']
async def trade_routine(self):
while True:
await asyncio.sleep(5)
if self.ltp_dict['timestamp'] > 0:
break
suspend_event.set()
print('trade_routine start')
while True:
self.gmocoin.cancelBulkOrders([self.symbol])
asyncio.create_task(self.gmocoin.send())
await suspend_event.wait()
asyncio.create_task(self.open_order())
await asyncio.sleep(60)
async def open_order(self):
sell_price = int(self.ltp_dict['ltp'] * 1.02)
buy_price = int(self.ltp_dict['ltp'] * 0.98)
size = 0.0001
if self.inventory['cash'] > bid * size:
self.gmocoin.order(self.symbol, 'BUY', 'LIMIT', buy_price, size)
if self.inventory['coin'] >= size:
self.gmocoin.order(self.symbol, 'SELL', 'LIMIT', sell_price, self.inventory['coin'])
await self.gmocoin.send()
async def process_executions(self):
while True:
data = await self.private_queue.get()
if data['symbol'] == self.symbol and data.get('execution'):
size = float(data["execution"]["executionSize"])
price = float(data["execution"]["executionPrice"])
side = data['execution']['side']
self.inventory['coin'] += size if side == 'BUY' else -size
self.inventory['cash'] -= size * price if side == 'BUY' else -size * price
async def health_check(self):
while True:
await self.suspend_queue.get()
suspend_event.clear()
self.gmocoin.cancelBulkOrders([self.symbol])
await asyncio.sleep(30)
suspend_event.set()
async def run(self):
try:
self.gmocoin.assets()
resp = await self.gmocoin.send()
for d in resp[0]['data']:
if d['symbol'] == self.symbol:
self.inventory['coin'] = float(d['available'])
if d['symbol'] == 'JPY':
self.inventory['cash'] = float(d['available'])
self.tasks = asyncio.gather(
self.gmocoin.public_ws_run(self.symbol, self.public_queue, self.suspend_queue),
self.gmocoin.private_ws_run(self.symbol, self.private_queue, self.suspend_queue),
self.gmocoin.extend_token(),
self.process_executions(),
self.trade_routine(),
self.get_ltp(),
self.health_check(),
)
await self.tasks
except asyncio.CancelledError:
print('CancelledError')
finally:
self.gmocoin.cancelBulkOrders([self.symbol])
await self.gmocoin.send()
if __name__ == "__main__":
bot = Bot()
asyncio.run(bot.run())
ここからは簡単な説明です。
from gmocoin import GMOCoin
これがラッパーです。
REST API、WebSocketともにGMOCoin
内の関数を実行してその結果を受け取ることになります。
private_queue = asyncio.Queue()
public_queue = asyncio.Queue()
suspend_queue = asyncio.Queue()
オリジナルのラッパーはWebSocketでデータが配信されたらコールバック関数を呼び出して、その中で取得したデータの処理を行っていますが、コールバック関数の代わりにasyncio.Queue
を使っています。
async def health_check(self):
while True:
await self.suspend_queue.get()
suspend_event.clear()
self.gmocoin.cancelBulkOrders([self.symbol])
await asyncio.sleep(30)
suspend_event.set()
private_queueとpublic_queueの使い道は何となく伝わると思いますが、suspend_queueはWebSocketが切断されたときにキューを入れて使います。
botがWebSocketの切断を検知してトレードロジックを止めたりできます。(某取引所はブツブツ切れます。なんで?pybottersだと切れないっぽい。。。)
async def run(self):
try:
self.gmocoin.assets()
resp = await self.gmocoin.send()
for d in resp[0]['data']:
if d['symbol'] == self.symbol:
self.inventory['coin'] = float(d['available'])
if d['symbol'] == 'JPY':
self.inventory['cash'] = float(d['available'])
self.tasks = asyncio.gather(
self.gmocoin.public_ws_run(self.symbol, self.public_queue, self.suspend_queue),
self.gmocoin.private_ws_run(self.symbol, self.private_queue, self.suspend_queue),
self.gmocoin.extend_token(),
self.process_executions(),
self.trade_routine(),
self.get_ltp(),
self.health_check(),
)
await self.tasks
except asyncio.CancelledError:
print('CancelledError')
finally:
self.gmocoin.cancelBulkOrders([self.symbol])
await self.gmocoin.send()
ここがメインで実行されるところです。
最初に残高を取得して、self.tasks
は非同期で実行する無限ループを並べて実行しています。
CTRL+C
でタスクがキャンセルされるので、最後にオーダーをキャンセルしてbotを終了する流れです。
以上です。良きbotterライフを!
ところで一つの取引所で複数botを動かすとき、レートリミットとか残高の管理が難しくないですか?
と思って、ラッパーをサーバー化して複数botで共有しようということをしています。
来年まで生き残っていたら書くかも。