目次
- 1.背景
- 2.構成
- 2-1.Lambdaの構築
- 2-2.NiceHash APIによる収益情報取得
- 2-3.EventBridgeによるトリガー定義
- 2-4.S3バケットの作成
- 2-5.LINE NotifyによるLINE通知
- 2-6.RDBの作成
- 3.実行結果
- 4.終わりに
- 5.更新履歴
1.背景
「②NiceHashマイニング収益をAWS Lambda×LINE NotifyでLINE通知する」で日々のマイニング収益を定期的に通知するシステムを構築してから数カ月経って、統計データがたまってきていたので収益と円相場の変遷をグラフ化したいと思った。
2.構成
システム構成は、AWS Lambdaベースのアーキテクチャ。
処理の流れ
①. EventBridgeの日次実行cronがトリガーとなり、
[Lambda①]nicehash-balance-notification-LINE
がキックされる
②. 外部APIから当日のマイニング収益情報を取得
③. MySQL on EC2に当日の収益情報をレコード追加
④. MySQL on EC2から収益情報の統計を取得
⑤. S3へ収益の統計情報をアップロード
⑥. [Lambda②]nicehash-draw-figure
のinvoke
⑦. 残高/円相場の統計情報をグラフ描画し、S3へアップロード
⑧. S3から統計情報グラフをダウンロード
⑨. POSTで通知メッセージ/統計情報グラフをLINE Notifyへ渡して
スマホへLINE上で通知
2-1.Lambdaの構築
2-1-1.Lambda関数の作成
呼び出されるLambda関数本体を作成する
・Lambdaデプロイ上限250MB回避のため、Lambda関数を2つに分けて作成
【Lambda①】
関数名:「nicehash-balance-notification-LINE」
ランタイム:「Python 3.6」
※DBへアクセスするためVPC指定も必要
【Lambda②】
関数名:「nicehash-draw-figure」
ランタイム:「Python 3.7」
2-1-2.IAMロールの権限付与
サービス間アクセスに必要となる権限をLambdaへ付与する
- Lambda①:
nicehash-balance-notification-LINE
に対して、下記の権限を付与する。- S3に対するread/write権限
- EC2に対するアクセス権限
- Lambdaに対するinvoke権限
- Lambda②:
nicehash-draw-figure
に対して、下記の権限を付与する。- S3に対するwrite権限
2-1-3.ソースコード
2-1-3-1.Lambda①:nicehash-balance-notification-LINE のソース
nicehash-balance-notification-LINE/
├ lambda_function.py
├ db_data_deal.py
├ nicehash.py
├ marketrate.py
├ s3_deal.py
├ create_message.py
├ line_config.py
├ mysql_config.py
├ nicehash_config.py
└ s3_config.py
Lambda①メインプログラム
import json
import requests
import os
import datetime
import boto3
import db_data_deal
import s3_deal
import create_message
import mysql_config as MYSQLconfig
import s3_config as S3config
import line_config as LINEconfig
### AWS Lambda handler method
def lambda_handler(event, context):
Messenger = create_message.create_messenger()
(today_balance,market_price) = Messenger.get_balance_and_rate()
Sqldealer = db_data_deal.sqldealer()
db_data_dict = Sqldealer.road_data()
Sqldealer.insert_data(today_balance,market_price)
db_data_dict = Sqldealer.road_data()
Sqldealer.save_dict_to_json(df_dict = db_data_dict, out_path = S3config.dict_file_path)
S3dealer = s3_deal.s3_dealer(bucket=S3config.bucket)
S3dealer.save_dict_to_s3(local_file_path=S3config.dict_file_path,file_name_base=S3config.dict_file_name_base)
### Call Lambda②(nicehash-draw-figure)
response = boto3.client('lambda').invoke(
FunctionName='nicehash-draw-figure',
InvocationType='RequestResponse',
Payload=json.dumps(db_data_dict, cls = db_data_deal.DateTimeEncoder)
)
get_file_local_path = S3dealer.get_s3_file_item(file_name_base=S3config.figure_file_name_base)
msg = Messenger.create_notification_msg(db_data_dict)
notify(msg,get_file_local_path)
print(msg)
def notify(msg, *args):
headers = {"Authorization": "Bearer %s" % LINEconfig.LINE_NOTIFY_ACCESS_TOKEN}
url = "https://notify-api.line.me/api/notify"
payload = {'message': msg}
if len(args) == 0:
requests.post(url, data=payload, headers=headers)
else:
files = {"imageFile": open(args[0], "rb")}
requests.post(url, data=payload, headers=headers,files=files)
os.remove(args[0])
return 0
DBからの情報取得/DB更新処理を行うクラス
import os
import json
from json import JSONEncoder
import mysql.connector
import boto3
import datetime
import mysql_config as SQLconfig
class sqldealer:
def __init__(self):
self.connection = mysql.connector.connect(user=SQLconfig.user,
password=SQLconfig.password, host=SQLconfig.host, database=SQLconfig.database)
self.columns = ['db-id','date','jpy_balance','jpy_diff','market','btc_balance','btc_diff']
self.db_data_dict = dict()
### Get statistics information from MySQL
def road_data(self):
try:
with self.connection.cursor() as cur:
select_sql = 'SELECT * FROM nicehash_info;'
cur.execute(select_sql)
row_db_data = cur.fetchall()
except:
err_msg = 'Error001:DB-data取得に失敗'
print(err_msg)
return err_msg
return self.datashaping_tuple_to_dict(row_db_data)
### Insert today's balance record into MySQL
def insert_data(self,today_balance,market_price):
try:
db_id = self.db_data_dict['db-id'][-1] + 1
date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y-%m-%d')
jpy_balance = today_balance
jpy_diff = today_balance - self.db_data_dict['jpy_balance'][-1]
btc_balance = today_balance/market_price
btc_diff = btc_balance - self.db_data_dict['btc_balance'][-1]
insert_info = str(db_id)+','+'"'+date+'"'+','+str(jpy_balance)+','+str(jpy_diff)+','+str(market_price)+','+str(btc_balance)+','+str(btc_diff)
insert_sql = 'INSERT INTO nicehash_info VALUES('+insert_info+');'
print(insert_sql)
with self.connection.cursor() as cur:
cur.execute(insert_sql)
cur.execute('commit;')
except:
err_msg = 'Error002:DB更新に失敗'
print(err_msg)
return err_msg
### Cast DB row data to dict type
def datashaping_tuple_to_dict(self,tupple_data):
try:
db_data_list = [[],[],[],[],[],[],[]]
db_data_dict = dict()
for i in range(len(tupple_data)):
for j in range(len(tupple_data[i])):
db_data_list[j].append(tupple_data[i][j])
self.db_data_dict = dict(zip(self.columns, db_data_list))
except:
err_msg = 'Error003:DBデータの型変換に失敗'
print(err_msg)
return err_msg
return self.db_data_dict
### Save dict object in json format
def save_dict_to_json(self,df_dict,out_path):
if os.path.exists(out_path):
with open(out_path,'w') as json_obj:
json_obj.write("")
print("jsonファイル作成")
with open(out_path, 'w') as json_file:
json.dump(df_dict, json_file, cls = DateTimeEncoder)
class DateTimeEncoder(JSONEncoder):
### Override the default method
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
NiceHashから残高情報を取得するためのクラス
from datetime import datetime
from time import mktime
import uuid
import hmac
import requests
import json
from hashlib import sha256
import optparse
import sys
class private_api:
def __init__(self, host, organisation_id, key, secret, verbose=False):
self.key = key
self.secret = secret
self.organisation_id = organisation_id
self.host = host
self.verbose = verbose
def request(self, method, path, query, body):
xtime = self.get_epoch_ms_from_now()
xnonce = str(uuid.uuid4())
message = bytearray(self.key, 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray(str(xtime), 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray(xnonce, 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray(self.organisation_id, 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray(method, 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray(path, 'utf-8')
message += bytearray('\x00', 'utf-8')
message += bytearray(query, 'utf-8')
if body:
body_json = json.dumps(body)
message += bytearray('\x00', 'utf-8')
message += bytearray(body_json, 'utf-8')
digest = hmac.new(bytearray(self.secret, 'utf-8'), message, sha256).hexdigest()
xauth = self.key + ":" + digest
headers = {
'X-Time': str(xtime),
'X-Nonce': xnonce,
'X-Auth': xauth,
'Content-Type': 'application/json',
'X-Organization-Id': self.organisation_id,
'X-Request-Id': str(uuid.uuid4())
}
s = requests.Session()
s.headers = headers
url = self.host + path
if query:
url += '?' + query
if self.verbose:
print(method, url)
if body:
response = s.request(method, url, data=body_json)
else:
response = s.request(method, url)
if response.status_code == 200:
return response.json()
elif response.content:
raise Exception(str(response.status_code) + ": " + response.reason + ": " + str(response.content))
else:
raise Exception(str(response.status_code) + ": " + response.reason)
def get_epoch_ms_from_now(self):
now = datetime.now()
now_ec_since_epoch = mktime(now.timetuple()) + now.microsecond / 1000000.0
return int(now_ec_since_epoch * 1000)
CoinGeckoを利用して仮装通貨相場をリアルタイムで取得するクラス
import requests
import json
class trade_table:
def __init__(self, market="BTC"):
### currency-name conversion table
self.currency_rename_table = {'BTC':'Bitcoin','ETH':'Ethereum','LTC':'Litecoin',
'XRP':'XRP','RVN':'Ravencoin','MATIC':'Polygon',
'BCH':'Bitcoin Cash','XLM':'Stellar','XMR':'Monero','DASH':'Dash'}
self.market = self.currency_rename_table[market]
def get_rate(self):
body = requests.get('https://api.coingecko.com/api/v3/coins/markets?vs_currency=jpy')
coingecko = json.loads(body.text)
idx = 0
while coingecko[idx]['name'] != self.market:
idx += 1
if idx > 100:
return "trade_table_err"
else:
return int(coingecko[idx]['current_price'])
残高の統計情報及び統計グラフをS3バケットへread/writeするクラス
import boto3
import json
import datetime
class s3_dealer:
def __init__(self, bucket = 'nice-hash-graph-backet'):
self.datestamp = str(datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y-%m-%d'))
self.s3 = boto3.resource('s3')
self.bucket = self.s3.Bucket(bucket)
def save_dict_to_s3(self, local_file_path, file_name_base = 'balance_stat_data'):
file_name = file_name_base + '_' + self.datestamp + '.json'
self.bucket.upload_file(Filename=local_file_path,Key=file_name)
print("Completed json object upload to s3...")
def save_figure_to_s3(self, local_file_path, file_name_base = 'balance_stat_graph'):
file_name = file_name_base + '_' + self.datestamp + '.png'
self.bucket.upload_file(Filename=local_file_path,Key=file_name)
print("Completed figure upload to s3...")
def get_s3_file_item(self, file_name_base = 'balance_stat_graph'):
file_name = file_name_base + '_' + self.datestamp + '.png'
local_file_path = '/tmp/'+file_name
self.bucket.download_file(Filename=local_file_path,Key=file_name)
print("Data download from s3 is completed...")
return local_file_path
LINE通知メッセージの作成するクラス
import datetime
import nicehash
import marketrate
import nicehash_config as NICEHASHconfig
class create_messenger:
def __init__(self):
self.host = 'https://api2.nicehash.com'
self.organisation_id = NICEHASHconfig.organisation_id
self.key = NICEHASHconfig.key
self.secret = NICEHASHconfig.secret
self.market='BTC'
def get_balance_and_rate(self):
host = 'https://api2.nicehash.com'
#Get mining information from NiceHash API
PrivateApi = nicehash.private_api(self.host, self.organisation_id, self.key, self.secret)
accounts_info = PrivateApi.get_accounts_for_currency(self.market)
balance_row = float(accounts_info['totalBalance'])
#Get currency_to_JPY_rate from CoinGecko API
TradeTable = marketrate.trade_table(self.market)
rate = TradeTable.get_rate()
balance_jpy = int(balance_row*rate)
return (balance_jpy,rate)
def create_notification_msg(self, df_dict):
rate = df_dict['market'][-1]
jpy_diff = df_dict['jpy_diff'][-1]
jpy_balance = df_dict['jpy_balance'][-1]
jpy_pre_balance = df_dict['jpy_balance'][-2]
jpy_symbol = "+" if jpy_diff > 0 else ""
btc_diff = df_dict['btc_diff'][-1]
btc_balance = df_dict['btc_balance'][-1]
btc_pre_balance = df_dict['btc_balance'][-2]
btc_symbol = "+" if btc_diff > 0 else ""
#Nortification message
time_text = "時刻: " + str(datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))))[:19]
market_text = "仮想通貨: " + self.market
rate_text = self.market + "-円相場: " + str(rate) + "円"
balance_text = "現在の残高: " + str(jpy_balance) + "円"
pre_balance_text = "前日の残高: " + str(jpy_pre_balance) + "円"
diff_label_text = "前日との差分:"
diff_text = "【"+str(jpy_symbol) + str(jpy_diff) + "円, "+ str(btc_symbol) + str(btc_diff) + self.market +"】"
msg = '\n'.join(["",time_text,market_text,rate_text,balance_text,pre_balance_text,diff_label_text,diff_text])
return msg
2-1-3-2.Lambda②:nicehash-draw-figure のソース
nicehash-draw-figure/
├ lambda_function.py
├ plot_stat.py
├ s3_deal.py
└ s3_config.py
nicehash-balance-notification-LINEによって呼び出されるメインプログラム
import json
import plot_stat
import s3_deal
import s3_config as S3config
### AWS Lambda handler method
def lambda_handler(event, context):
df_dict = event
### Graph drawing of statistical data
Statdrawer = plot_stat.statdrawer()
Statdrawer.drawfig(df_dict)
S3dealer = s3_deal.s3_dealer(bucket=S3config.bucket)
S3dealer.save_figure_to_s3(local_file_path=S3config.figure_file_path,file_name_base=S3config.figure_file_name_base)
return event
統計情報をグラフ描画するクラス
import matplotlib.pyplot as plt
import datetime
import s3_config as S3config
class statdrawer:
def __init__(self):
self.fig = plt.figure()
def drawfig(self,df_dict):
ax1 = self.fig.add_subplot(111)
#JPY-balance
x_date = [datetime.datetime.strptime(str(s),'%Y-%m-%d') for s in df_dict['date']]
ax1.bar(x_date,df_dict['jpy_balance'],
width=0.5,linewidth=0.5,label='Jpy-Balance')
h1, l1 = ax1.get_legend_handles_labels()
ax1.set_xlabel('Date')
ax1.set_ylabel('Jpy-Balance [yen]')
ax1.grid(True)
plt.xticks(rotation=45,fontsize=6)
#BTC-market
ax2 = ax1.twinx()
ax2.plot(x_date,df_dict['market'],
color="red", linestyle="solid", markersize=8, label='Btc-Rate')
ax2.set_ylabel('BTC-rate [million]')
h2, l2 = ax2.get_legend_handles_labels()
#BTC-balance
ax3 = ax1.twinx()
ax3.plot(x_date,df_dict['btc_balance'],
color="limegreen", linestyle="solid", markersize=8, label='Btc-Balance')
ax3.set_ylabel('Btc-Balance [btc]')
ax3.spines["right"].set_position(("axes", 1.15))
h3, l3 = ax3.get_legend_handles_labels()
ax3.legend(h1+h2+h3, l1+l2+l3, loc='lower left')
#Range adjustment
ax1_min,ax1_max = min(df_dict['jpy_balance']),max(df_dict['jpy_balance'])
ax2_min,ax2_max = min(df_dict['market']),max(df_dict['market'])
ax3_min,ax3_max = min(df_dict['btc_balance']),max(df_dict['btc_balance'])
ax1.set_ylim(ax1_min, ax1_max*1.01)
ax2.set_ylim(ax2_min*0.9, ax2_max*1.01)
ax3.set_ylim(ax3_min*0.9, ax3_max*1.1)
### Output of statistical graph
self.fig.subplots_adjust(bottom=0.1)
self.fig.savefig(S3config.figure_file_path,bbox_inches='tight')
2-1-3-3.コンフィグ
各サービス/外部APIと連携するためにコンフィグに必要な設定値を指定する
下記コンフィグの設定値詳細については、②NiceHashマイニング収益をAWS Lambda×LINE NotifyでLINE通知するを参照。
LINE_NOTIFY_ACCESS_TOKEN = '[LINEアクセストークン]'
### ※設定値は2-5節参照
user='[MySQLアクセスユーザ]'
password='[MySQLアクセスユーザpw]'
host='[EC2インスタンスの静的IP]'
database='[MySQLに構築したDatabase名]'
organisation_id = '[NiceHash組織ID]'
key = '[NiceHash APIアクセスキー]'
secret = '[NiceHash APIシークレットアクセスキー]'
### ※設定値は2-2節参照
bucket = '[S3バケット名]'
dict_file_name_base = 'balance_stat_data'
dict_file_path = '/tmp/balance_stat_data.json'
figure_file_name_base = 'balance_stat_graph'
figure_file_path = '/tmp/balance_stat_graph.png'
### ※設定値は2-4節参照
2-1-4.Module組み込み
実行に必要なパッケージを取り込む
・Lambda①:nicehash-balance-notification-LINE
には「mysql-connector-python」が必要なので、AWS Cloud9上でディレクトリを切って、下記コマンドを実行して環境を整備する。LambdaへのデプロイもCloud9上で行う。
ec2-user:~/environment (master) $ mkdir nicehash-balance-notification-LINE
ec2-user:~/environment (master) $ cd nicehash-balance-notification-LINE
ec2-user:~/environment/nicehash-balance-notification-LINE (master) $ pip install -t ./ mysql-connector-python
・Lambda②:nicehash-draw-figure
についても「matplotlib」が必要なので、同様にCloud9上にディレクトリを切って環境を整備しLambdaへデプロイする。
ec2-user:~/environment (master) $ mkdir nicehash-draw-figure
ec2-user:~/environment (master) $ cd nnicehash-draw-figure
ec2-user:~/environment/nicehash-draw-figure (master) $ pip install -t ./ matplotlib
2-1-5.基本設定の編集
メモリ/タイムアウトエラーを回避するために基本設定値を変更する
・Lambdaはデフォルトだと、メモリ:128MB
、タイムアウト:3秒
になっているため、実行状況の様子をみてメモリ「128MB ⇒ 200MB」、タイムアウト「3秒 ⇒ 15秒」程度へ変更しておく。
2-2.NiceHash APIによる収益情報取得
外部APIからマイニング収益情報を取得できるようKEYを取得する
・API Keys取得手順はこちらを参照。
2-3.EventBridgeによるトリガー定義
日次ジョブとしてLambdaをキックするためのトリガーを定義する
・下記トリガーを作成して、Lambda①:nicehash-balance-notification-LINE
にアタッチする
ルール:「新規ルールの作成」
ルール名:DailyTrigger
ルールタイプ:スケジュール式
スケジュール式:cron(0 15 * * ? *) # 毎日0:00に実行するcron
2-4.S3バケットの作成
ファイルの受け渡しを行うS3バケットを用意する
・s3_config.py
に指定したバケットをあらかじめS3上に作成しておく
2-5.LINE NotifyによるLINE通知
AWSへLINE Nortifyを連携するために必要なトークンを発行する
・LINE連携、Access tokenの取得方法については、こちら を参照。
2-6.RDBの作成
収益の統計情報を管理するDBを用意する
・RDBでNiceHash収益の統計情報を管理するために、EC2上にMySQLを導入する
※Amazon RDSを使うべきだが、料金的な都合からMySQL on EC2で代用
・Lambda①:nicehash-balance-notification-LINE
を配置したVPC上の同サブネットにEC2インスタンスを作成
・作成したEC2インスタンスにMySQLをインストール
※MySQLのインストールはこの辺を参照
・シンプルなテーブル一つで事足りるので、とりあえず下記のようにnicehash_infoテーブル
を定義
mysql> SHOW COLUMNS FROM nicehash_info;
+-------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| date | date | YES | | NULL | |
| jpy_balance | int | NO | | NULL | |
| jpy_diff | int | NO | | NULL | |
| market | int | NO | | NULL | |
| btc_balance | double(5,4) | NO | | NULL | |
| btc_diff | double(5,4) | NO | | NULL | |
+-------------+-------------+------+-----+---------+----------------+
7 rows in set (0.00 sec)
id:レコードID
date:日付
jpy_balance/btc_balance:残高
jpy_diff/btc_diff:前日残高との差分
market:BTCの円相場
3. 実行結果
・毎日0:00になると、日次収益と残高/円相場の変遷グラフがLINE通知されるようになりました。
・仮装通貨の換金タイミングを把握するため、リグの稼働状況をヘルスチェックするためにBtc-Balance
も描画するようにしました。
※ BTC残高の減少⇒別通貨への換金
※ BTC残高の増加⇒リグの正常稼働
4. 終わりに
・pandas.DataFrameなどは使わずlistやdict等の組み込み関数のみで実装したが、パッケージの都合上、250MB以内には収められなかったのでLambdaのデプロイ上限250MB(圧縮50MB)は結構ボトルネックになると痛感した。。。
・そもそもLambdaは、機能毎に分割した最小単位で定義してシステムはLambdaを組み合わせて構築するものという前提があるのかと思った。
5. 更新履歴
ver. 1.0 初版投稿 2021/07/24
ver. 2.0 BTC単位の残高も描画するよう改修 2021/07/25