趣旨
Telegramをアプリにする方法を紹介します。Telegramを常時利用する人にとっては、あるいは複数の取引所を常時利用する人にとっては少しだけ便利かも知れません。僕としてはトレードインターフェースよりもTelegramでbotを作る方法を学んだことの価値の方が大きかったので共有します。
コード
.envファイルでAPIやトークンを管理します。他の方法でも良いです。Telegramのトークンの取得方法などは別途調べてください。ここでは紹介しません。
.env
EXCHANGE_API_KEY=
EXCHANGE_SECRET=
EXCHANGE_PASSWORD=
TELEGRAM_TOKEN=
telegramをトレードインターフェースにするコードです。常時起動させておく必要があります。価格情報の取得、全取引所におけるUSD建ての資産総額、注文、注文のキャンセル、チャートの生成など、一通りトレードの必要な機能は網羅しています。
python telegram_bot.py
import ccxt
import os
from dotenv import dotenv_values
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
import ccxt.async_support as ccxt
import matplotlib.pyplot as plt
import pandas as pd
import logging
# Get the directory where the script is located
current_directory = os.path.dirname(os.path.realpath(__file__))
# Construct the path to the .env file
env_path = os.path.join(current_directory, '.env')
# Load API keys from the .env file using its path
config = dotenv_values(env_path)
# Configure logging
logging.basicConfig(level=logging.INFO)
# Set up telegram bot
token = config.get('TELEGRAM_TOKEN')
async def price(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) != 2:
await update.message.reply_text('Usage: /price <exchange_id> <symbol>')
return
exchange_id, symbol = args
exchange_class = None
try:
exchange_class = getattr(ccxt, exchange_id.lower())()
prices = await exchange_class.fetch_ticker(symbol)
price = prices['last']
await update.message.reply_text(f"Last price for {symbol} on {exchange_id}: {price}")
except Exception as e:
await update.message.reply_text(f"Error processing fetch command: {str(e)}")
finally:
if exchange_class:
await exchange_class.close()
async def balance(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) != 1:
await update.message.reply_text('Usage: /balance <exchange_id>')
return
exchange_id = args[0]
exchange_class = None
try:
exchange_class = getattr(ccxt, exchange_id.lower())({
'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
'secret': config.get(f'{exchange_id.upper()}_SECRET'),
'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
'uid': config.get(f'{exchange_id.upper()}_UID'),
'enableRateLimit': True,
})
balance = await exchange_class.fetch_balance()
balance_info = "Balances:\n"
for key, value in balance['total'].items():
if value > 0:
balance_info += f"{key}: {value}\n"
await update.message.reply_text(balance_info.strip())
except Exception as e:
await update.message.reply_text(f"Error fetching balance: {str(e)}")
finally:
if exchange_class:
await exchange_class.close()
async def trade(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) < 5:
await update.message.reply_text('Usage: /trade <exchange_id> <symbol> <market_type> <order_type> <side> <amount> [<price>]')
return
exchange_id, symbol, market_type, order_type, side, amount, *optional = args
price = optional[0] if optional else None
exchange_class = None
try:
# Initialize the exchange with API keys
exchange_class = getattr(ccxt, exchange_id.lower())({
'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
'secret': config.get(f'{exchange_id.upper()}_SECRET'),
'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
'uid': config.get(f'{exchange_id.upper()}_UID'),
'enableRateLimit': True,
})
exchange_class.options['createMarketBuyOrderRequiresPrice'] = False
# Determine the order type and execute accordingly
if side.lower() == 'buy':
if order_type.lower() == 'market':
await exchange_class.create_order(symbol, order_type, 'buy', amount, None)
elif order_type.lower() == 'limit' and price is not None:
await exchange_class.create_order(symbol, order_type, 'buy', amount, price)
else:
await update.message.reply_text("Unsupported order type with buy.")
return
elif side.lower() == 'sell':
if order_type.lower() == 'market':
await exchange_class.create_order(symbol, order_type, 'sell', amount, None)
elif order_type.lower() == 'limit' and price is not None:
await exchange_class.create_order(symbol, order_type, 'sell', amount, price)
else:
await update.message.reply_text("Unsupported order type with sell.")
return
# Prepare and send the trade confirmation message
trade_message = "\n".join([
f"Order placed on {exchange_id} for {symbol}:",
f"Type: {order_type}",
f"Side: {side}",
f"Amount: {amount}" + (f"\nPrice: {price}" if price else "")
])
await update.message.reply_text(trade_message)
except Exception as e:
# Handle any exceptions that occur during the trade operation
await update.message.reply_text(f"Error processing trade command: {str(e)}")
finally:
# Ensure the exchange class is properly closed to release resources
if exchange_class:
await exchange_class.close()
async def balances(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Fetch balances from all exchanges."""
exchanges = ['binance', 'bitget', 'gate', 'bybit', 'bitmart', 'bitrue', 'coinex', 'mexc'] # Add or remove exchanges as needed
messages = []
for exchange_id in exchanges:
try:
exchange_class = getattr(ccxt, exchange_id.lower())({
'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
'secret': config.get(f'{exchange_id.upper()}_SECRET'),
'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
'uid': config.get(f'{exchange_id.upper()}_UID'),
'enableRateLimit': True,
})
balance = await exchange_class.fetch_balance()
balance_info = f"Balances for {exchange_id}:\n"
for coin, value in balance['total'].items():
if value > 0:
balance_info += f"{coin}: {value}\n"
messages.append(balance_info.strip())
except Exception as e:
messages.append(f"Error fetching balances for {exchange_id}: {str(e)}")
finally:
if exchange_class:
await exchange_class.close()
await update.message.reply_text("\n\n".join(messages))
async def fetch_orders(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) != 2:
await update.message.reply_text('Usage: /fetch_orders <exchange_id> <symbol>')
return
exchange_id, symbol = args
exchange_class = None
try:
exchange_class = getattr(ccxt, exchange_id.lower())({
'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
'secret': config.get(f'{exchange_id.upper()}_SECRET'),
'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
'uid': config.get(f'{exchange_id.upper()}_UID'),
'enableRateLimit': True,
})
orders = await exchange_class.fetch_orders(symbol)
if orders:
orders_info = [f"Order ID: {order['id']}, Status: {order['status']}, Type: {order['type']}, Side: {order['side']}, Price: {order['price']}, Amount: {order['amount']}" for order in orders]
orders_info_message = f"All orders for {symbol} on {exchange_id}:\n" + "\n".join(orders_info)
else:
orders_info_message = f"No orders found for {symbol} on {exchange_id}."
await update.message.reply_text(orders_info_message)
except Exception as e:
await update.message.reply_text(f"Error fetching orders: {str(e)}")
finally:
if exchange_class:
await exchange_class.close()
async def cancel_orders(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) != 2:
await update.message.reply_text('Usage: /cancel_orders <exchange_id> <symbol>')
return
exchange_id, symbol = args
exchange_class = None
try:
exchange_class = getattr(ccxt, exchange_id.lower())({
'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
'secret': config.get(f'{exchange_id.upper()}_SECRET'),
'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
'uid': config.get(f'{exchange_id.upper()}_UID'),
'enableRateLimit': True,
})
cancel_result = await exchange_class.cancel_all_orders(symbol)
# Inform the user about the cancellation result
cancel_message = f"All orders for {symbol} on {exchange_id} have been canceled:\n{cancel_result}"
await update.message.reply_text(cancel_message)
except Exception as e:
await update.message.reply_text(f"Error canceling orders: {str(e)}")
finally:
if exchange_class:
await exchange_class.close()
async def usd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) != 1:
await update.message.reply_text('Usage: /usd <exchange_id>')
return
exchange_id = args[0]
exchange_class = None
try:
# Initialize the exchange with API keys
exchange_class = getattr(ccxt, exchange_id.lower())({
'apiKey': config.get(f'{exchange_id.upper()}_API_KEY'),
'secret': config.get(f'{exchange_id.upper()}_SECRET'),
'password': config.get(f'{exchange_id.upper()}_PASSWORD'),
'uid': config.get(f'{exchange_id.upper()}_UID'),
'enableRateLimit': True,
})
balance = await exchange_class.fetch_balance()
total_usd_value = 0
balance_info = "Balance in USD:\n"
# Fetch conversion rates and calculate USD value for each coin
for coin, value in balance['total'].items():
if value > 0:
if coin == 'USDT':
usd_value = round(value,2)
else:
try:
ticker = await exchange_class.fetch_ticker(f'{coin}/USD')
usd_value = value * ticker['last']
except Exception:
try:
# Attempt to convert using a bridge currency if direct USD pair is not available
ticker = await exchange_class.fetch_ticker(f'{coin}/USDT')
usd_value = round(value * ticker['last'],2)
except Exception as e:
await update.message.reply_text(f"Error fetching balance in USD: {str(e)}")
usd_value = 0 # If conversion fails, set USD value to 0
balance_info += f"{coin}: {usd_value} USD\n"
total_usd_value += usd_value
balance_info += f"\nTotal value: {round(total_usd_value,2)} USD"
await update.message.reply_text(balance_info.strip())
except Exception as e:
await update.message.reply_text(f"Error fetching balance in USD: {str(e)}")
finally:
if exchange_class:
await exchange_class.close()
async def chart(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
args = context.args
if len(args) != 3:
await update.message.reply_text('Usage: /chart <exchange_id> <symbol> <timeframe>')
return
exchange_id, symbol, timeframe = args
exchange_class = getattr(ccxt, exchange_id.lower())()
try:
# Fetch OHLC data
ohlc = await exchange_class.fetch_ohlcv(symbol, timeframe=timeframe, limit=1000)
df_ohlc = pd.DataFrame(ohlc, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df_ohlc['timestamp'] = pd.to_datetime(df_ohlc['timestamp'], unit='ms')
# Calculate moving averages
for ma in [21, 50, 100, 200]:
df_ohlc[f'MA{ma}'] = df_ohlc['close'].rolling(window=ma).mean()
# Plotting
plt.style.use('fivethirtyeight') # A style that somewhat resembles TradingView's theme
fig, ax1 = plt.subplots(figsize=(14, 7))
# Plot closing price and MAs
ax1.plot(df_ohlc['timestamp'], df_ohlc['close'], label='Close Price', color='blue', linewidth=2)
ax1.plot(df_ohlc['timestamp'], df_ohlc['MA21'], label='MA21', color='magenta', linewidth=1, linestyle='--')
ax1.plot(df_ohlc['timestamp'], df_ohlc['MA50'], label='MA50', color='orange', linewidth=1, linestyle='--')
ax1.plot(df_ohlc['timestamp'], df_ohlc['MA100'], label='MA100', color='green', linewidth=1, linestyle='--')
ax1.plot(df_ohlc['timestamp'], df_ohlc['MA200'], label='MA200', color='red', linewidth=1, linestyle='--')
# Set chart title and labels
ax1.set_title(f'Closing Price Chart for {symbol} with Moving Averages')
ax1.set_xlabel('Date')
ax1.set_ylabel('Price (USDT)')
ax1.legend()
# Plot volume on a secondary y-axis
ax2 = ax1.twinx()
ax2.bar(df_ohlc['timestamp'], df_ohlc['volume'], color='grey', alpha=0.3, width=1.0) # Adjusted width
ax2.set_ylabel('Volume')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
# Save and close the plot
chart_filename = 'closing_price_chart_with_MA_and_volume.png'
plt.savefig(chart_filename)
plt.close(fig)
# Send chart
with open(chart_filename, 'rb') as chart_image:
await context.bot.send_photo(chat_id=update.effective_chat.id, photo=chart_image)
except Exception as e:
await update.message.reply_text(f"Error generating chart: {e}")
finally:
await exchange_class.close()
def main() -> None:
"""Start the bot."""
application = Application.builder().token(token).build()
application.add_handler(CommandHandler("price", price))
application.add_handler(CommandHandler("balance", balance))
application.add_handler(CommandHandler("trade", trade))
application.add_handler(CommandHandler("balances", balances))
application.add_handler(CommandHandler("fetch_orders", fetch_orders))
application.add_handler(CommandHandler("cancel_orders", cancel_orders))
application.add_handler(CommandHandler("usd", usd))
application.add_handler(CommandHandler("chart", chart))
application.run_polling()
if __name__ == '__main__':
main()