28
25
記事投稿キャンペーン 「2024年!初アウトプットをしよう」

ハイリキ(Hyperliquid)の取引量をbotで増幅させてみた件

Last updated at Posted at 2024-01-25

勉強がてらハイリキのbotを作ってみたので忘れないように以下に記録します。
コードはご自由にお使いください。

環境設定(コピペでOK)

環境設定が一番難しいです。

以下のコードを実行する場合は全て自己責任でお願い致します。
本記事は教育目的での共有であり、投資を勧誘するものではありません。

Macでの設定

仮想環境を用意します。

仮想環境を用意する方法は以下の通りです。
ターミナルを開いて以下のコマンドをコピペします。

pip install virtualenv
virtualenv myenv
source myenv/bin/activate

次に必要なライブラリをインストールします。

pip install --upgrade pip
pip install hyperliquid-python-sdk
pip install --upgrade eth_abi parsimonious
pip install matplotlib
pip install ta
pip install pandas

コードを保存するディレクトリを作成します。

mkdir hyperliquid
cd hyperliquid
touch websocket_manager.py
touch market_order.py
touch check_positions.py
touch main.py
touch config.json
touch utils.py

Metamaskでの設定

Arbitramネットワークの追加方法はコインゲッコーを参照。
https://www.coingecko.com/learn/add-arbitrum-to-metamask

Import tokensをクリックします。
Screenshot 2024-01-25 at 22.33.25.png

Token contract addressの部分にUSDCのコントラクト情報を記入してNextを押します。

USDCのコントラクト情報
0xaf88d065e77c8cC2239327C5EDb3A432268e5831

Screenshot 2024-01-25 at 22.33.34.png

botのコード(コピペでOK)

file_nameの箇所をそれぞれのPythonファイルの名前に置き換えてコマンドを打ちます。

nano file_name

サンプル
Screenshot 2024-01-25 at 22.04.53.png

以下のような画面が出てくるので該当のコードをコピペします。
その後にCtrl + xを入力すると保存するかどうか聞かれるのでyを入力して保存します。
Screenshot 2024-01-25 at 22.05.07.png

Websocket用コード

Websocketは初回に一度だけでクライアントとサーバーを接続して常時繋がった状態にする機能です。これを使ってハイリキから直接価格情報を常に取得し続けるようにセットします。

websocket_manager.py
import json
import logging
import threading
import time
from collections import defaultdict
import websocket
from hyperliquid.utils.types import Any, Callable, Dict, List, NamedTuple, Optional, Subscription, Tuple, WsMsg

ActiveSubscription = NamedTuple("ActiveSubscription", [("callback", Callable[[Any], None]), ("subscription_id", int)])

def subscription_to_identifier(subscription: Subscription) -> str:
    if subscription["type"] == "allMids":
        return "allMids"
    elif subscription["type"] == "l2Book":
        return f'l2Book:{subscription["coin"].lower()}'
    elif subscription["type"] == "trades":
        return f'trades:{subscription["coin"].lower()}'
    elif subscription["type"] == "userEvents":
        return "userEvents"

def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]:
    if ws_msg["channel"] == "pong":
        return "pong"
    elif ws_msg["channel"] == "allMids":
        return "allMids"
    elif ws_msg["channel"] == "l2Book":
        return f'l2Book:{ws_msg["data"]["coin"].lower()}'
    elif ws_msg["channel"] == "trades":
        trades = ws_msg["data"]
        if len(trades) == 0:
            return None
        else:
            return f'trades:{trades[0]["coin"].lower()}'
    elif ws_msg["channel"] == "user":
        return "userEvents"

class WebsocketManager(threading.Thread):
    def __init__(self, base_url):
        super().__init__()
        self.subscription_id_counter = 0
        self.ws_ready = False
        self.queued_subscriptions: List[Tuple[Subscription, ActiveSubscription]] = []
        self.active_subscriptions: Dict[str, List[ActiveSubscription]] = defaultdict(list)
        ws_url = base_url  # Use the provided WebSocket URL directly
        self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_open=self.on_open)
        self.ping_sender = threading.Thread(target=self.send_ping)

    def run(self):
        self.ping_sender.start()
        self.ws.run_forever()

    def send_ping(self):
        while True:
            time.sleep(50)
            logging.debug("Websocket sending ping")
            self.ws.send(json.dumps({"method": "ping"}))

    def on_message(self, _ws, message):
        if message == "Websocket connection established.":
            logging.debug(message)
            return
        logging.debug(f"on_message {message}")
        ws_msg: WsMsg = json.loads(message)
        identifier = ws_msg_to_identifier(ws_msg)
        if identifier == "pong":
            logging.debug("Websocket received pong")
            return
        if identifier is None:
            logging.debug("Websocket not handling empty message")
            return
        active_subscriptions = self.active_subscriptions[identifier]
        if len(active_subscriptions) == 0:
            print("Websocket message from an unexpected subscription:", message, identifier)
        else:
            for active_subscription in active_subscriptions:
                active_subscription.callback(ws_msg)

    def on_open(self, _ws):
        logging.debug("on_open")
        self.ws_ready = True
        for (subscription, active_subscription) in self.queued_subscriptions:
            self.subscribe(subscription, active_subscription.callback, active_subscription.subscription_id)

    def subscribe(
        self, subscription: Subscription, callback: Callable[[Any], None], subscription_id: Optional[int] = None
    ) -> int:
        if subscription_id is None:
            self.subscription_id_counter += 1
            subscription_id = self.subscription_id_counter
        if not self.ws_ready:
            logging.debug("enqueueing subscription")
            self.queued_subscriptions.append((subscription, ActiveSubscription(callback, subscription_id)))
        else:
            logging.debug("subscribing")
            identifier = subscription_to_identifier(subscription)
            if subscription["type"] == "userEvents":
                # TODO: ideally the userEvent messages would include the user so that we can support multiplexing them
                if len(self.active_subscriptions[identifier]) != 0:
                    raise NotImplementedError("Cannot subscribe to UserEvents multiple times")
            self.active_subscriptions[identifier].append(ActiveSubscription(callback, subscription_id))
            self.ws.send(json.dumps({"method": "subscribe", "subscription": subscription}))
        return subscription_id

    def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool:
        if not self.ws_ready:
            raise NotImplementedError("Can't unsubscribe before websocket connected")
        identifier = subscription_to_identifier(subscription)
        active_subscriptions = self.active_subscriptions[identifier]
        new_active_subscriptions = [x for x in active_subscriptions if x.subscription_id != subscription_id]
        if len(new_active_subscriptions) == 0:
            self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription}))
        self.active_subscriptions[identifier] = new_active_subscriptions
        return len(active_subscriptions) != len(active_subscriptions)

if __name__ == "__main__":
    # Use the provided WebSocket URL
    base_url = "wss://api.hyperliquid.xyz/ws"
    # Create a WebSocket manager
    websocket_manager = WebsocketManager(base_url)
    # Start the WebSocket manager thread
    websocket_manager.start()
    # Subscribe to AllMids updates
    allmids_subscription = {
        "type": "allMids"
    }
    subscription_id = websocket_manager.subscribe(allmids_subscription)
    # Keep the main thread alive to continue receiving updates
    while True:
        pass

成行注文

指定された銘柄とポジションサイズで成行注文を行うコードです。

market_order.py
import argparse
import eth_account
import utils
from eth_account.signers.local import LocalAccount
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants

def place_market_order(coin, is_buy, sz):
    config = utils.get_config()
    account: LocalAccount = eth_account.Account.from_key(config["secret_key"])
    exchange = Exchange(account, constants.MAINNET_API_URL)

    print(f"We try to Market {'Buy' if is_buy else 'Sell'} {sz} {coin}.")

    order_result = exchange.market_open(coin, is_buy, sz)
    if order_result["status"] == "ok":
        for status in order_result["response"]["data"]["statuses"]:
            try:
                filled = status["filled"]
                print(f'Order #{filled["oid"]} filled {filled["totalSz"]} @{filled["avgPx"]}')
            except KeyError:
                print(f'Error: {status["error"]}')

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Market Order Script')
    parser.add_argument('coin', type=str, help='Cryptocurrency for the order (e.g., DOGE)')
    parser.add_argument('order_type')
    parser.add_argument('sz', type=float, help='Size of the order')
    args = parser.parse_args()

    place_market_order(args.coin, is_buy, args.sz)

ポジション確認

都度セットしたアカウントのポジション情報を表示するためのコードです。

check_positions.py
import json
import eth_account
import utils
from eth_account.signers.local import LocalAccount
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants

def check_positions():
    config = utils.get_config()
    account: LocalAccount = eth_account.Account.from_key(config["secret_key"])
    info = Info(constants.MAINNET_API_URL, skip_ws=True)
    user_state = info.user_state(account.address)
    positions = []
    for position in user_state["assetPositions"]:
        if float(position["position"]["szi"]) != 0:
            positions.append(position["position"])
    if len(positions) > 0:
        print("position data:")
        for position in positions:
            print(json.dumps(position, indent=2))
            coin = position["coin"]
            quantity = float(position["szi"])
    else:
        print("No open positions")

メインアプリ

botを起動する際にメインとなるコードです。csvへの価格情報の保存、価格情報の取得、チャートの形成、トレードストラテジーのロジックなどが記載されています。

main.py
import time
import csv
import pandas as pd
import os
import argparse
from ta.trend import SMAIndicator
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from websocket_manager import WebsocketManager
from market_order import place_market_order
from check_positions import check_positions

def update_csv_with_sma(timestamp, price, coin):
    csv_file_name = f'price_data_{coin}.csv'
    if not os.path.exists(csv_file_name):
        with open(csv_file_name, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["Timestamp", "Price"])
    df = pd.read_csv(csv_file_name)
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    df.set_index('Timestamp', inplace=True)
    df.loc[timestamp] = price
    sma_indicator = SMAIndicator(df['Price'], window=14)
    df['SMA'] = sma_indicator.sma_indicator()
    df.to_csv(csv_file_name)

def handle_allmids_update(allmids, coin, sz):
    mids = allmids["data"]["mids"]
    coin_mid_price_str = mids.get(coin)
    if coin_mid_price_str is not None:
        coin_mid_price = float(coin_mid_price_str)
        print(f"The mid price of {coin} is: {coin_mid_price}")
        current_time = time.strftime("%Y-%m-%d %H:%M:%S")
        update_csv_with_sma(current_time, coin_mid_price, coin)
        trading_strategy(coin_mid_price, coin, sz)
    else:
        print(f"{coin} not found in the AllMids data.")
    time.sleep(6)

def plot_chart(df, coin):
    plt.figure(figsize=(10,6))
    plt.plot(df.index, df['Price'], label='Price')
    plt.plot(df.index, df['SMA'], label='SMA 14', alpha=0.7)
    plt.title(f'Price Chart with SMA for {coin}')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.savefig(f'{coin}_price_chart.png')

def trading_strategy(coin_mid_price, coin, sz):
    csv_file_name = f'price_data_{coin}.csv'
    df = pd.read_csv(csv_file_name)
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    df.set_index('Timestamp', inplace=True)
    sma_indicator = SMAIndicator(df['Price'], window=14)
    df['SMA'] = sma_indicator.sma_indicator()
    plot_chart(df, coin)
    latest_sma = df['SMA'].iloc[-1]

    print(f"Latest SMA: {latest_sma}, Current Price: {coin_mid_price}")  # Debugging print

    if coin_mid_price > latest_sma:
        print("Condition met for Buying")  # Debugging print
        is_buy = True
        try:
            place_market_order(coin, is_buy, sz)
            check_positions()
        except Exception as e:
            print(f"Error placing buy order: {e}")  # Error handling
    elif coin_mid_price < latest_sma:
        check_positions()
        print("Condition met for Selling")  # Debugging print
        is_buy = False
        try:
            place_market_order(coin, is_buy, sz)
            check_positions()            
        except Exception as e:
            print(f"Error placing sell order: {e}")  # Error handling

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Crypto Trading Bot')
    parser.add_argument('coin', type=str, help='Symbol of the cryptocurrency (e.g., BTC, ETH)')
    parser.add_argument('sz', type=float, help='Size of the order')
    args = parser.parse_args()
    base_url = "wss://api.hyperliquid.xyz/ws"
    websocket_manager = WebsocketManager(base_url)
    websocket_manager.start()
    allmids_subscription = {"type": "allMids"}
    subscription_id = websocket_manager.subscribe(allmids_subscription, lambda allmids: handle_allmids_update(allmids, args.coin, args.sz))
    while True:
        time.sleep(60)

JSONファイル読み込み

ファイルを読み込むためのコードです。

utils.py
import json
import os

def get_config():
    config_path = os.path.join(os.path.dirname(__file__), "config.json")
    with open(config_path) as f:
        return json.load(f)

シークレットキー設定

config.jsonにてシークレットキーをJSON形式で保存します。

シークレットキーの扱いには要注意。

config.json
{
    "secret_key": "このダブルクオーテーションマークの中身をシークレットキーに変更して保存してください"
}

ロジックの説明

価格がSMAより高ければショート、価格がSMAより低ければロング

というシンプルなロジックです。
ただし価格取得頻度は6秒毎にセットしてあるので大量の注文が入ります。

botの使い方

以下のようにDOGEの部分を好きな銘柄、一度の取引で注文したい量を設定してコマンドを打つとbotが動きます。

python main.py DOGE 200

botが正常に起動すると取得した価格を随時保存するcsvファイルと自動で更新され続けるチャートが生成されます。

Screenshot 2024-01-25 at 22.13.30.png

今回使用したロジックは取引量を増幅させますが勝てません。
少しずつ負けます。本記事がインスピレーションの機会になれば幸いです。

28
25
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
28
25