勉強がてらハイリキのbotを作ってみたので忘れないように以下に記録します。
コードはご自由にお使いください。
環境設定(コピペでOK)
環境設定が一番難しいです。
以下のコードを実行する場合は全て自己責任でお願い致します。
本記事は教育目的での共有であり、投資を勧誘するものではありません。
Macでの設定
仮想環境を用意します。
仮想環境を用意する方法は以下の通りです。
ターミナルを開いて以下のコマンドをコピペします。
pip install virtualenv
virtualenv myenv
source myenv/bin/activate
次に必要なライブラリをインストールします。
pip install --upgrade pip
pip install hyperliquid-python-sdk
pip install --upgrade eth_abi parsimonious
pip install matplotlib
pip install ta
pip install pandas
コードを保存するディレクトリを作成します。
mkdir hyperliquid
cd hyperliquid
touch websocket_manager.py
touch market_order.py
touch check_positions.py
touch main.py
touch config.json
touch utils.py
Metamaskでの設定
Arbitramネットワークの追加方法はコインゲッコーを参照。
https://www.coingecko.com/learn/add-arbitrum-to-metamask
Token contract addressの部分にUSDCのコントラクト情報を記入してNextを押します。
USDCのコントラクト情報
0xaf88d065e77c8cC2239327C5EDb3A432268e5831
botのコード(コピペでOK)
file_nameの箇所をそれぞれのPythonファイルの名前に置き換えてコマンドを打ちます。
nano file_name
以下のような画面が出てくるので該当のコードをコピペします。
その後にCtrl + x
を入力すると保存するかどうか聞かれるのでy
を入力して保存します。
Websocket用コード
Websocketは初回に一度だけでクライアントとサーバーを接続して常時繋がった状態にする機能です。これを使ってハイリキから直接価格情報を常に取得し続けるようにセットします。
import json
import logging
import threading
import time
from collections import defaultdict
import websocket
from hyperliquid.utils.types import Any, Callable, Dict, List, NamedTuple, Optional, Subscription, Tuple, WsMsg
ActiveSubscription = NamedTuple("ActiveSubscription", [("callback", Callable[[Any], None]), ("subscription_id", int)])
def subscription_to_identifier(subscription: Subscription) -> str:
if subscription["type"] == "allMids":
return "allMids"
elif subscription["type"] == "l2Book":
return f'l2Book:{subscription["coin"].lower()}'
elif subscription["type"] == "trades":
return f'trades:{subscription["coin"].lower()}'
elif subscription["type"] == "userEvents":
return "userEvents"
def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]:
if ws_msg["channel"] == "pong":
return "pong"
elif ws_msg["channel"] == "allMids":
return "allMids"
elif ws_msg["channel"] == "l2Book":
return f'l2Book:{ws_msg["data"]["coin"].lower()}'
elif ws_msg["channel"] == "trades":
trades = ws_msg["data"]
if len(trades) == 0:
return None
else:
return f'trades:{trades[0]["coin"].lower()}'
elif ws_msg["channel"] == "user":
return "userEvents"
class WebsocketManager(threading.Thread):
def __init__(self, base_url):
super().__init__()
self.subscription_id_counter = 0
self.ws_ready = False
self.queued_subscriptions: List[Tuple[Subscription, ActiveSubscription]] = []
self.active_subscriptions: Dict[str, List[ActiveSubscription]] = defaultdict(list)
ws_url = base_url # Use the provided WebSocket URL directly
self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_open=self.on_open)
self.ping_sender = threading.Thread(target=self.send_ping)
def run(self):
self.ping_sender.start()
self.ws.run_forever()
def send_ping(self):
while True:
time.sleep(50)
logging.debug("Websocket sending ping")
self.ws.send(json.dumps({"method": "ping"}))
def on_message(self, _ws, message):
if message == "Websocket connection established.":
logging.debug(message)
return
logging.debug(f"on_message {message}")
ws_msg: WsMsg = json.loads(message)
identifier = ws_msg_to_identifier(ws_msg)
if identifier == "pong":
logging.debug("Websocket received pong")
return
if identifier is None:
logging.debug("Websocket not handling empty message")
return
active_subscriptions = self.active_subscriptions[identifier]
if len(active_subscriptions) == 0:
print("Websocket message from an unexpected subscription:", message, identifier)
else:
for active_subscription in active_subscriptions:
active_subscription.callback(ws_msg)
def on_open(self, _ws):
logging.debug("on_open")
self.ws_ready = True
for (subscription, active_subscription) in self.queued_subscriptions:
self.subscribe(subscription, active_subscription.callback, active_subscription.subscription_id)
def subscribe(
self, subscription: Subscription, callback: Callable[[Any], None], subscription_id: Optional[int] = None
) -> int:
if subscription_id is None:
self.subscription_id_counter += 1
subscription_id = self.subscription_id_counter
if not self.ws_ready:
logging.debug("enqueueing subscription")
self.queued_subscriptions.append((subscription, ActiveSubscription(callback, subscription_id)))
else:
logging.debug("subscribing")
identifier = subscription_to_identifier(subscription)
if subscription["type"] == "userEvents":
# TODO: ideally the userEvent messages would include the user so that we can support multiplexing them
if len(self.active_subscriptions[identifier]) != 0:
raise NotImplementedError("Cannot subscribe to UserEvents multiple times")
self.active_subscriptions[identifier].append(ActiveSubscription(callback, subscription_id))
self.ws.send(json.dumps({"method": "subscribe", "subscription": subscription}))
return subscription_id
def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool:
if not self.ws_ready:
raise NotImplementedError("Can't unsubscribe before websocket connected")
identifier = subscription_to_identifier(subscription)
active_subscriptions = self.active_subscriptions[identifier]
new_active_subscriptions = [x for x in active_subscriptions if x.subscription_id != subscription_id]
if len(new_active_subscriptions) == 0:
self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription}))
self.active_subscriptions[identifier] = new_active_subscriptions
return len(active_subscriptions) != len(active_subscriptions)
if __name__ == "__main__":
# Use the provided WebSocket URL
base_url = "wss://api.hyperliquid.xyz/ws"
# Create a WebSocket manager
websocket_manager = WebsocketManager(base_url)
# Start the WebSocket manager thread
websocket_manager.start()
# Subscribe to AllMids updates
allmids_subscription = {
"type": "allMids"
}
subscription_id = websocket_manager.subscribe(allmids_subscription)
# Keep the main thread alive to continue receiving updates
while True:
pass
成行注文
指定された銘柄とポジションサイズで成行注文を行うコードです。
import argparse
import eth_account
import utils
from eth_account.signers.local import LocalAccount
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants
def place_market_order(coin, is_buy, sz):
config = utils.get_config()
account: LocalAccount = eth_account.Account.from_key(config["secret_key"])
exchange = Exchange(account, constants.MAINNET_API_URL)
print(f"We try to Market {'Buy' if is_buy else 'Sell'} {sz} {coin}.")
order_result = exchange.market_open(coin, is_buy, sz)
if order_result["status"] == "ok":
for status in order_result["response"]["data"]["statuses"]:
try:
filled = status["filled"]
print(f'Order #{filled["oid"]} filled {filled["totalSz"]} @{filled["avgPx"]}')
except KeyError:
print(f'Error: {status["error"]}')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Market Order Script')
parser.add_argument('coin', type=str, help='Cryptocurrency for the order (e.g., DOGE)')
parser.add_argument('order_type')
parser.add_argument('sz', type=float, help='Size of the order')
args = parser.parse_args()
place_market_order(args.coin, is_buy, args.sz)
ポジション確認
都度セットしたアカウントのポジション情報を表示するためのコードです。
import json
import eth_account
import utils
from eth_account.signers.local import LocalAccount
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
def check_positions():
config = utils.get_config()
account: LocalAccount = eth_account.Account.from_key(config["secret_key"])
info = Info(constants.MAINNET_API_URL, skip_ws=True)
user_state = info.user_state(account.address)
positions = []
for position in user_state["assetPositions"]:
if float(position["position"]["szi"]) != 0:
positions.append(position["position"])
if len(positions) > 0:
print("position data:")
for position in positions:
print(json.dumps(position, indent=2))
coin = position["coin"]
quantity = float(position["szi"])
else:
print("No open positions")
メインアプリ
botを起動する際にメインとなるコードです。csvへの価格情報の保存、価格情報の取得、チャートの形成、トレードストラテジーのロジックなどが記載されています。
import time
import csv
import pandas as pd
import os
import argparse
from ta.trend import SMAIndicator
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from websocket_manager import WebsocketManager
from market_order import place_market_order
from check_positions import check_positions
def update_csv_with_sma(timestamp, price, coin):
csv_file_name = f'price_data_{coin}.csv'
if not os.path.exists(csv_file_name):
with open(csv_file_name, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Timestamp", "Price"])
df = pd.read_csv(csv_file_name)
df['Timestamp'] = pd.to_datetime(df['Timestamp'])
df.set_index('Timestamp', inplace=True)
df.loc[timestamp] = price
sma_indicator = SMAIndicator(df['Price'], window=14)
df['SMA'] = sma_indicator.sma_indicator()
df.to_csv(csv_file_name)
def handle_allmids_update(allmids, coin, sz):
mids = allmids["data"]["mids"]
coin_mid_price_str = mids.get(coin)
if coin_mid_price_str is not None:
coin_mid_price = float(coin_mid_price_str)
print(f"The mid price of {coin} is: {coin_mid_price}")
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
update_csv_with_sma(current_time, coin_mid_price, coin)
trading_strategy(coin_mid_price, coin, sz)
else:
print(f"{coin} not found in the AllMids data.")
time.sleep(6)
def plot_chart(df, coin):
plt.figure(figsize=(10,6))
plt.plot(df.index, df['Price'], label='Price')
plt.plot(df.index, df['SMA'], label='SMA 14', alpha=0.7)
plt.title(f'Price Chart with SMA for {coin}')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.savefig(f'{coin}_price_chart.png')
def trading_strategy(coin_mid_price, coin, sz):
csv_file_name = f'price_data_{coin}.csv'
df = pd.read_csv(csv_file_name)
df['Timestamp'] = pd.to_datetime(df['Timestamp'])
df.set_index('Timestamp', inplace=True)
sma_indicator = SMAIndicator(df['Price'], window=14)
df['SMA'] = sma_indicator.sma_indicator()
plot_chart(df, coin)
latest_sma = df['SMA'].iloc[-1]
print(f"Latest SMA: {latest_sma}, Current Price: {coin_mid_price}") # Debugging print
if coin_mid_price > latest_sma:
print("Condition met for Buying") # Debugging print
is_buy = True
try:
place_market_order(coin, is_buy, sz)
check_positions()
except Exception as e:
print(f"Error placing buy order: {e}") # Error handling
elif coin_mid_price < latest_sma:
check_positions()
print("Condition met for Selling") # Debugging print
is_buy = False
try:
place_market_order(coin, is_buy, sz)
check_positions()
except Exception as e:
print(f"Error placing sell order: {e}") # Error handling
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Crypto Trading Bot')
parser.add_argument('coin', type=str, help='Symbol of the cryptocurrency (e.g., BTC, ETH)')
parser.add_argument('sz', type=float, help='Size of the order')
args = parser.parse_args()
base_url = "wss://api.hyperliquid.xyz/ws"
websocket_manager = WebsocketManager(base_url)
websocket_manager.start()
allmids_subscription = {"type": "allMids"}
subscription_id = websocket_manager.subscribe(allmids_subscription, lambda allmids: handle_allmids_update(allmids, args.coin, args.sz))
while True:
time.sleep(60)
JSONファイル読み込み
ファイルを読み込むためのコードです。
import json
import os
def get_config():
config_path = os.path.join(os.path.dirname(__file__), "config.json")
with open(config_path) as f:
return json.load(f)
シークレットキー設定
config.jsonにてシークレットキーをJSON形式で保存します。
シークレットキーの扱いには要注意。
{
"secret_key": "このダブルクオーテーションマークの中身をシークレットキーに変更して保存してください"
}
ロジックの説明
価格がSMAより高ければショート、価格がSMAより低ければロング
というシンプルなロジックです。
ただし価格取得頻度は6秒毎にセットしてあるので大量の注文が入ります。
botの使い方
以下のようにDOGEの部分を好きな銘柄、一度の取引で注文したい量を設定してコマンドを打つとbotが動きます。
python main.py DOGE 200
botが正常に起動すると取得した価格を随時保存するcsvファイルと自動で更新され続けるチャートが生成されます。
今回使用したロジックは取引量を増幅させますが勝てません。
少しずつ負けます。本記事がインスピレーションの機会になれば幸いです。