3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Telegramで、すべての仮想通貨取引所を一括で操作できるトレードインターフェースを実装する方法

Posted at

趣旨

Telegramをアプリにする方法を紹介します。Telegramを常時利用する人にとっては、あるいは複数の取引所を常時利用する人にとっては少しだけ便利かも知れません。僕としてはトレードインターフェースよりもTelegramでbotを作る方法を学んだことの価値の方が大きかったので共有します。

コード

.envファイルでAPIやトークンを管理します。他の方法でも良いです。Telegramのトークンの取得方法などは別途調べてください。ここでは紹介しません。

.env
EXCHANGE_API_KEY=
EXCHANGE_SECRET=
EXCHANGE_PASSWORD=

TELEGRAM_TOKEN=

telegramをトレードインターフェースにするコードです。常時起動させておく必要があります。価格情報の取得、全取引所におけるUSD建ての資産総額、注文、注文のキャンセル、チャートの生成など、一通りトレードの必要な機能は網羅しています。

python telegram_bot.py
import ccxt
import os
from dotenv import dotenv_values
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
import ccxt.async_support as ccxt
import matplotlib.pyplot as plt
import pandas as pd
import logging

# Get the directory where the script is located
current_directory = os.path.dirname(os.path.realpath(__file__))

# Construct the path to the .env file
env_path = os.path.join(current_directory, '.env')
# Load API keys from the .env file using its path
config = dotenv_values(env_path)

# Configure logging
logging.basicConfig(level=logging.INFO)

# Set up telegram bot
token = config.get('TELEGRAM_TOKEN')


async def price(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args
    if len(args) != 2:
        await update.message.reply_text('Usage: /price <exchange_id> <symbol>')
        return

    exchange_id, symbol = args
    exchange_class = None
    try:
        exchange_class = getattr(ccxt, exchange_id.lower())()
        prices = await exchange_class.fetch_ticker(symbol)
        price = prices['last']
        await update.message.reply_text(f"Last price for {symbol} on {exchange_id}: {price}")
    except Exception as e:
        await update.message.reply_text(f"Error processing fetch command: {str(e)}")
    finally:
        if exchange_class:
            await exchange_class.close()

async def balance(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args
    if len(args) != 1:
        await update.message.reply_text('Usage: /balance <exchange_id>')
        return

    exchange_id = args[0]
    exchange_class = None
    try:
        exchange_class = getattr(ccxt, exchange_id.lower())({
            'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
            'secret': config.get(f'{exchange_id.upper()}_SECRET'),
            'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
            'uid': config.get(f'{exchange_id.upper()}_UID'),
            'enableRateLimit': True,
        })

        balance = await exchange_class.fetch_balance()
        balance_info = "Balances:\n"
        for key, value in balance['total'].items():
            if value > 0:
                balance_info += f"{key}: {value}\n"
        await update.message.reply_text(balance_info.strip())
    except Exception as e:
        await update.message.reply_text(f"Error fetching balance: {str(e)}")
    finally:
        if exchange_class:
            await exchange_class.close()

async def trade(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args

    if len(args) < 5:
        await update.message.reply_text('Usage: /trade <exchange_id> <symbol> <market_type> <order_type> <side> <amount> [<price>]')
        return

    exchange_id, symbol, market_type, order_type, side, amount, *optional = args
    price = optional[0] if optional else None

    exchange_class = None
    try:
        # Initialize the exchange with API keys
        exchange_class = getattr(ccxt, exchange_id.lower())({
            'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
            'secret': config.get(f'{exchange_id.upper()}_SECRET'),
            'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
            'uid': config.get(f'{exchange_id.upper()}_UID'),
            'enableRateLimit': True,
        })

        exchange_class.options['createMarketBuyOrderRequiresPrice'] = False

        # Determine the order type and execute accordingly
        if side.lower() == 'buy':
            if order_type.lower() == 'market':
                await exchange_class.create_order(symbol, order_type, 'buy', amount, None)
            elif order_type.lower() == 'limit' and price is not None:
                await exchange_class.create_order(symbol, order_type, 'buy', amount, price)
            else:
                await update.message.reply_text("Unsupported order type with buy.")
                return
        elif side.lower() == 'sell':
            if order_type.lower() == 'market':
                await exchange_class.create_order(symbol, order_type, 'sell', amount, None)
            elif order_type.lower() == 'limit' and price is not None:
                await exchange_class.create_order(symbol, order_type, 'sell', amount, price)
            else:
                await update.message.reply_text("Unsupported order type with sell.")
                return

        # Prepare and send the trade confirmation message
        trade_message = "\n".join([
            f"Order placed on {exchange_id} for {symbol}:",
            f"Type: {order_type}",
            f"Side: {side}",
            f"Amount: {amount}" + (f"\nPrice: {price}" if price else "")
        ])
        await update.message.reply_text(trade_message)

    except Exception as e:
        # Handle any exceptions that occur during the trade operation
        await update.message.reply_text(f"Error processing trade command: {str(e)}")
    finally:
        # Ensure the exchange class is properly closed to release resources
        if exchange_class:
            await exchange_class.close()

async def balances(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Fetch balances from all exchanges."""
    exchanges = ['binance', 'bitget', 'gate', 'bybit', 'bitmart', 'bitrue', 'coinex', 'mexc']  # Add or remove exchanges as needed
    messages = []

    for exchange_id in exchanges:
        try:
            exchange_class = getattr(ccxt, exchange_id.lower())({
                'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
                'secret': config.get(f'{exchange_id.upper()}_SECRET'),
                'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
                'uid': config.get(f'{exchange_id.upper()}_UID'),
                'enableRateLimit': True,
            })

            balance = await exchange_class.fetch_balance()
            balance_info = f"Balances for {exchange_id}:\n"
            for coin, value in balance['total'].items():
                if value > 0:
                    balance_info += f"{coin}: {value}\n"
            messages.append(balance_info.strip())
        except Exception as e:
            messages.append(f"Error fetching balances for {exchange_id}: {str(e)}")
        finally:
            if exchange_class:
                await exchange_class.close()

    await update.message.reply_text("\n\n".join(messages))

async def fetch_orders(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args
    if len(args) != 2:
        await update.message.reply_text('Usage: /fetch_orders <exchange_id> <symbol>')
        return

    exchange_id, symbol = args
    exchange_class = None
    try:
        exchange_class = getattr(ccxt, exchange_id.lower())({
            'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
            'secret': config.get(f'{exchange_id.upper()}_SECRET'),
            'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
            'uid': config.get(f'{exchange_id.upper()}_UID'),
            'enableRateLimit': True,
        })

        orders = await exchange_class.fetch_orders(symbol)
        if orders:
            orders_info = [f"Order ID: {order['id']}, Status: {order['status']}, Type: {order['type']}, Side: {order['side']}, Price: {order['price']}, Amount: {order['amount']}" for order in orders]
            orders_info_message = f"All orders for {symbol} on {exchange_id}:\n" + "\n".join(orders_info)
        else:
            orders_info_message = f"No orders found for {symbol} on {exchange_id}."

        await update.message.reply_text(orders_info_message)
    except Exception as e:
        await update.message.reply_text(f"Error fetching orders: {str(e)}")
    finally:
        if exchange_class:
            await exchange_class.close()

async def cancel_orders(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args
    if len(args) != 2:
        await update.message.reply_text('Usage: /cancel_orders <exchange_id> <symbol>')
        return

    exchange_id, symbol = args
    exchange_class = None
    try:
        exchange_class = getattr(ccxt, exchange_id.lower())({
            'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
            'secret': config.get(f'{exchange_id.upper()}_SECRET'),
            'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
            'uid': config.get(f'{exchange_id.upper()}_UID'),
            'enableRateLimit': True,
        })

        cancel_result = await exchange_class.cancel_all_orders(symbol)
        # Inform the user about the cancellation result
        cancel_message = f"All orders for {symbol} on {exchange_id} have been canceled:\n{cancel_result}"
        await update.message.reply_text(cancel_message)
    except Exception as e:
        await update.message.reply_text(f"Error canceling orders: {str(e)}")
    finally:
        if exchange_class:
            await exchange_class.close()

async def usd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args
    if len(args) != 1:
        await update.message.reply_text('Usage: /usd <exchange_id>')
        return

    exchange_id = args[0]
    exchange_class = None
    try:
        # Initialize the exchange with API keys
        exchange_class = getattr(ccxt, exchange_id.lower())({
            'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
            'secret': config.get(f'{exchange_id.upper()}_SECRET'),
            'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
            'uid': config.get(f'{exchange_id.upper()}_UID'),
            'enableRateLimit': True,
        })

        balance = await exchange_class.fetch_balance()
        total_usd_value = 0
        balance_info = "Balance in USD:\n"

        # Fetch conversion rates and calculate USD value for each coin
        for coin, value in balance['total'].items():
            if value > 0:
                if coin == 'USDT':
                    usd_value = round(value,2)
                else:
                    try:
                        ticker = await exchange_class.fetch_ticker(f'{coin}/USD')
                        usd_value = value * ticker['last']
                    except Exception:
                        try:
                            # Attempt to convert using a bridge currency if direct USD pair is not available
                            ticker = await exchange_class.fetch_ticker(f'{coin}/USDT')
                            usd_value = round(value * ticker['last'],2)
                        except Exception as e:
                            await update.message.reply_text(f"Error fetching balance in USD: {str(e)}")
                            usd_value = 0  # If conversion fails, set USD value to 0
                balance_info += f"{coin}: {usd_value} USD\n"
                total_usd_value += usd_value

        balance_info += f"\nTotal value: {round(total_usd_value,2)} USD"
        await update.message.reply_text(balance_info.strip())
    except Exception as e:
        await update.message.reply_text(f"Error fetching balance in USD: {str(e)}")
    finally:
        if exchange_class:
            await exchange_class.close()

async def chart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    args = context.args
    if len(args) != 3:
        await update.message.reply_text('Usage: /chart <exchange_id> <symbol> <timeframe>')
        return

    exchange_id, symbol, timeframe = args
    exchange_class = getattr(ccxt, exchange_id.lower())()

    try:
        # Fetch OHLC data
        ohlc = await exchange_class.fetch_ohlcv(symbol, timeframe=timeframe, limit=1000)
        df_ohlc = pd.DataFrame(ohlc, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
        df_ohlc['timestamp'] = pd.to_datetime(df_ohlc['timestamp'], unit='ms')

        # Calculate moving averages
        for ma in [21, 50, 100, 200]:
            df_ohlc[f'MA{ma}'] = df_ohlc['close'].rolling(window=ma).mean()

        # Plotting
        plt.style.use('fivethirtyeight')  # A style that somewhat resembles TradingView's theme
        fig, ax1 = plt.subplots(figsize=(14, 7))

        # Plot closing price and MAs
        ax1.plot(df_ohlc['timestamp'], df_ohlc['close'], label='Close Price', color='blue', linewidth=2)
        ax1.plot(df_ohlc['timestamp'], df_ohlc['MA21'], label='MA21', color='magenta', linewidth=1, linestyle='--')
        ax1.plot(df_ohlc['timestamp'], df_ohlc['MA50'], label='MA50', color='orange', linewidth=1, linestyle='--')
        ax1.plot(df_ohlc['timestamp'], df_ohlc['MA100'], label='MA100', color='green', linewidth=1, linestyle='--')
        ax1.plot(df_ohlc['timestamp'], df_ohlc['MA200'], label='MA200', color='red', linewidth=1, linestyle='--')

        # Set chart title and labels
        ax1.set_title(f'Closing Price Chart for {symbol} with Moving Averages')
        ax1.set_xlabel('Date')
        ax1.set_ylabel('Price (USDT)')
        ax1.legend()

        # Plot volume on a secondary y-axis
        ax2 = ax1.twinx()
        ax2.bar(df_ohlc['timestamp'], df_ohlc['volume'], color='grey', alpha=0.3, width=1.0)  # Adjusted width
        ax2.set_ylabel('Volume')

        plt.grid(True)
        plt.xticks(rotation=45)
        plt.tight_layout()

        # Save and close the plot
        chart_filename = 'closing_price_chart_with_MA_and_volume.png'
        plt.savefig(chart_filename)
        plt.close(fig)

        # Send chart
        with open(chart_filename, 'rb') as chart_image:
            await context.bot.send_photo(chat_id=update.effective_chat.id, photo=chart_image)

    except Exception as e:
        await update.message.reply_text(f"Error generating chart: {e}")
    finally:
        await exchange_class.close()

def main() -> None:
    """Start the bot."""
    application = Application.builder().token(token).build()

    application.add_handler(CommandHandler("price", price))
    application.add_handler(CommandHandler("balance", balance))
    application.add_handler(CommandHandler("trade", trade))
    application.add_handler(CommandHandler("balances", balances))
    application.add_handler(CommandHandler("fetch_orders", fetch_orders))
    application.add_handler(CommandHandler("cancel_orders", cancel_orders))
    application.add_handler(CommandHandler("usd", usd))
    application.add_handler(CommandHandler("chart", chart))

    application.run_polling()

if __name__ == '__main__':
    main()
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?