5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

仮想通貨botterAdvent Calendar 2024

Day 23

CEX-botのひな型

Last updated at Posted at 2024-12-22

はじめに

この記事は、仮想通貨botter Advent Calendar 2024 の23日目の記事です。

はじめまして!
カモネギと申します。好きな言葉は「一物二価」です。よろしくお願いします!

もともと株と為替をちょこちょこやっていて、一年くらい前から仮想通貨botter活動を開始しました。自分も何かアウトプットしたい。けどポロリはしたくない。という思いから、これからbotを作る人向けに、CEX-botのひな型くらいは紹介できるかなと考えて、簡単な記事を書いてみました。

対象読者

  • トレードアイデアはある
  • APIラッパーはインストールした
  • で、これをどうやって自動で動かすの?

くらいの方を想定しています。
あくまで自分の作り方の一例ではありますが、これからbotを作る際の参考になれば幸いです。
ちなみに、もっと高度なbotは、hayayu0(林)さんから紹介されております。
クラウド上のサーバーレスのbotを支えるサービス

botの基本構成

まずは構成を説明して、そのあとに具体的なコードを紹介したいと思います。

image.png

見ての通り、botの構成は、初期化処理、実行中処理、終了処理の3つです。
botと取引所の間には取引所APIラッパーがあります。
取引所とのやりとりは主に2つ、REST APIとWebSocketで行うのですが、botからみると、取引所APIラッパーが持っている関数を実行してその結果を取得することになります。

botの構成について、順にみていきましょう。

初期化処理

ここでは以下のような処理を実行して、準備が整ったら実行中処理に移ります。

  • APIキーとシークレットを環境変数から読み込む
  • 取引所のキャッシュ、コインの残高を取得する
  • WebSocketの接続を待つ

実行中処理

実行中処理の中では複数の無限ループが非同期で実行されます。絵の中には代表して3つのループを書きましたが、トレード中に実行し続けたい処理をここに入れます。

  • マーケットデータの更新
  • プライベート情報の更新
  • トレードロジック
  • ヘルスチェック
  • ユーザー入力待ち

終了処理

ユーザーが終了コマンドを入力したり、エラーが発生すると実行中処理を抜けて終了処理が走ります。

APIラッパーについて

突然ですが、みなさん、どのAPIラッパーを使っていますか?
cctxとpybottersが有名どころですかね。ゼロから自作している方もいるかもしれませんね。
自分は、ドースーさんのラッパーです!
シンプルで飽きのこない設計で、取引所に応じて自分で好きにカスタマイズできます。まだどのラッパーを使うか迷っていたら、ネットで探せばすぐ出てくるので、パクって使いましょう参考にさせてもらいましょう!

これから紹介する作成例でもドースーさんのラッパーを使っています。
他のラッパーでも同じことができるはずです。

実際のbot作成例

ここでは、(約)1分毎にltpの±2%に指し続けるトレードを行うbotを作ってみたいと思います

いきなりですが、全体のコードをまず紹介します。
行数減らしたくて変な書き方しているのと、エラートラップも書いてないですが、一応動くと思います。
言わずもがなですが、自己責任でお願いします。

import asyncio
from gmocoin import GMOCoin

suspend_event = asyncio.Event()
api_key = 'your_api_key'
api_secret = 'your_api_secret'

class Bot():
    private_queue = asyncio.Queue()
    public_queue = asyncio.Queue()
    suspend_queue = asyncio.Queue()

    def __init__(self):
        self.symbol = 'BTC'
        self.ltp_dict = {'ltp': 0, 'timestamp': 0}
        self.inventory = {'cash': 0, 'coin': 0}
        self.gmocoin = GMOCoin(api_key, api_secret)

    async def get_ltp(self):
        while True:
            data = await self.public_queue.get()
            self.ltp_dict['ltp'] = data['ltp']
            self.ltp_dict['timestamp'] = data['timestamp']

    async def trade_routine(self):
        while True:
            await asyncio.sleep(5)
            if self.ltp_dict['timestamp'] > 0:
                break

        suspend_event.set()
        print('trade_routine start')

        while True:
            self.gmocoin.cancelBulkOrders([self.symbol])
            asyncio.create_task(self.gmocoin.send())
            await suspend_event.wait()
            asyncio.create_task(self.open_order())
            await asyncio.sleep(60)

    async def open_order(self):
        sell_price = int(self.ltp_dict['ltp'] * 1.02)
        buy_price = int(self.ltp_dict['ltp'] * 0.98)
        size = 0.0001

        if self.inventory['cash'] > bid * size:
            self.gmocoin.order(self.symbol, 'BUY', 'LIMIT', buy_price, size)

        if self.inventory['coin'] >= size:
            self.gmocoin.order(self.symbol, 'SELL', 'LIMIT', sell_price, self.inventory['coin'])

        await self.gmocoin.send()

    async def process_executions(self):
        while True:
                data = await self.private_queue.get()

                if data['symbol'] == self.symbol and data.get('execution'):
                    size = float(data["execution"]["executionSize"])
                    price = float(data["execution"]["executionPrice"])
                    side = data['execution']['side']
                    self.inventory['coin'] += size if side == 'BUY' else -size
                    self.inventory['cash'] -= size * price if side == 'BUY' else -size * price

    async def health_check(self):
        while True:
            await self.suspend_queue.get()
            suspend_event.clear()
            self.gmocoin.cancelBulkOrders([self.symbol])
            await asyncio.sleep(30)
            suspend_event.set()

    async def run(self):
        try:
            self.gmocoin.assets()
            resp = await self.gmocoin.send()

            for d in resp[0]['data']:
                if d['symbol'] == self.symbol:
                    self.inventory['coin'] = float(d['available'])
                if d['symbol'] == 'JPY':
                    self.inventory['cash'] = float(d['available'])

            self.tasks = asyncio.gather(
                self.gmocoin.public_ws_run(self.symbol, self.public_queue, self.suspend_queue),
                self.gmocoin.private_ws_run(self.symbol, self.private_queue, self.suspend_queue),
                self.gmocoin.extend_token(),
                self.process_executions(),
                self.trade_routine(),
                self.get_ltp(),
                self.health_check(),
            )
            await self.tasks
        except asyncio.CancelledError:
            print('CancelledError')
        finally:
            self.gmocoin.cancelBulkOrders([self.symbol])
            await self.gmocoin.send()

if __name__ == "__main__":
    bot = Bot()
    asyncio.run(bot.run())

ここからは簡単な説明です。

from gmocoin import GMOCoin

これがラッパーです。
REST API、WebSocketともにGMOCoin内の関数を実行してその結果を受け取ることになります。

private_queue = asyncio.Queue()
public_queue = asyncio.Queue()
suspend_queue = asyncio.Queue()

オリジナルのラッパーはWebSocketでデータが配信されたらコールバック関数を呼び出して、その中で取得したデータの処理を行っていますが、コールバック関数の代わりにasyncio.Queueを使っています。

async def health_check(self):
    while True:
        await self.suspend_queue.get()
        suspend_event.clear()
        self.gmocoin.cancelBulkOrders([self.symbol])
        await asyncio.sleep(30)
        suspend_event.set()

private_queueとpublic_queueの使い道は何となく伝わると思いますが、suspend_queueはWebSocketが切断されたときにキューを入れて使います。
botがWebSocketの切断を検知してトレードロジックを止めたりできます。(某取引所はブツブツ切れます。なんで?pybottersだと切れないっぽい。。。)

async def run(self):
    try:
        self.gmocoin.assets()
        resp = await self.gmocoin.send()

        for d in resp[0]['data']:
            if d['symbol'] == self.symbol:
                self.inventory['coin'] = float(d['available'])
            if d['symbol'] == 'JPY':
                self.inventory['cash'] = float(d['available'])

        self.tasks = asyncio.gather(
            self.gmocoin.public_ws_run(self.symbol, self.public_queue, self.suspend_queue),
            self.gmocoin.private_ws_run(self.symbol, self.private_queue, self.suspend_queue),
            self.gmocoin.extend_token(),
            self.process_executions(),
            self.trade_routine(),
            self.get_ltp(),
            self.health_check(),
        )
        await self.tasks
    except asyncio.CancelledError:
        print('CancelledError')
    finally:
        self.gmocoin.cancelBulkOrders([self.symbol])
        await self.gmocoin.send()

ここがメインで実行されるところです。
最初に残高を取得して、self.tasksは非同期で実行する無限ループを並べて実行しています。
CTRL+Cでタスクがキャンセルされるので、最後にオーダーをキャンセルしてbotを終了する流れです。

以上です。良きbotterライフを!




ところで一つの取引所で複数botを動かすとき、レートリミットとか残高の管理が難しくないですか?
と思って、ラッパーをサーバー化して複数botで共有しようということをしています。
来年まで生き残っていたら書くかも。

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?