LoginSignup
1
1

More than 1 year has passed since last update.

③NiceHashマイニング収益をLambda×LINE Notifyでグラフ化してLINE通知する

Last updated at Posted at 2021-07-24

目次

1.背景

 「②NiceHashマイニング収益をAWS Lambda×LINE NotifyでLINE通知する」で日々のマイニング収益を定期的に通知するシステムを構築してから数カ月経って、統計データがたまってきていたので収益と円相場の変遷をグラフ化したいと思った。

2.構成

システム構成は、AWS Lambdaベースのアーキテクチャ。

処理の流れ
 ①. EventBridgeの日次実行cronがトリガーとなり、
  [Lambda①]nicehash-balance-notification-LINEがキックされる
 ②. 外部APIから当日のマイニング収益情報を取得
 ③. MySQL on EC2に当日の収益情報をレコード追加
 ④. MySQL on EC2から収益情報の統計を取得
 ⑤. S3へ収益の統計情報をアップロード
 ⑥. [Lambda②]nicehash-draw-figureのinvoke
 ⑦. 残高/円相場の統計情報をグラフ描画し、S3へアップロード
 ⑧. S3から統計情報グラフをダウンロード
 ⑨. POSTで通知メッセージ/統計情報グラフをLINE Notifyへ渡して
   スマホへLINE上で通知
nicehash通知システム図.png

2-1.Lambdaの構築

2-1-1.Lambda関数の作成

呼び出されるLambda関数本体を作成する
・Lambdaデプロイ上限250MB回避のため、Lambda関数を2つに分けて作成

【Lambda①】
 関数名:「nicehash-balance-notification-LINE」
 ランタイム:「Python 3.6」
 ※DBへアクセスするためVPC指定も必要

【Lambda②】
 関数名:「nicehash-draw-figure」
 ランタイム:「Python 3.7」

2-1-2.IAMロールの権限付与

サービス間アクセスに必要となる権限をLambdaへ付与する

  • Lambda①:nicehash-balance-notification-LINEに対して、下記の権限を付与する。
    • S3に対するread/write権限
    • EC2に対するアクセス権限
    • Lambdaに対するinvoke権限
  • Lambda②:nicehash-draw-figureに対して、下記の権限を付与する。
    • S3に対するwrite権限

2-1-3.ソースコード

2-1-3-1.Lambda①:nicehash-balance-notification-LINE のソース

nicehash-balance-notification-LINE
nicehash-balance-notification-LINE/
├ lambda_function.py
├ db_data_deal.py
├ nicehash.py
├ marketrate.py
├ s3_deal.py
├ create_message.py
├ line_config.py
├ mysql_config.py
├ nicehash_config.py
└ s3_config.py

Lambda①メインプログラム

lambda_function.py
import json
import requests
import os
import datetime
import boto3

import db_data_deal
import s3_deal
import create_message

import mysql_config as MYSQLconfig
import s3_config as S3config
import line_config as LINEconfig

### AWS Lambda handler method
def lambda_handler(event, context):
    Messenger = create_message.create_messenger()
    (today_balance,market_price) = Messenger.get_balance_and_rate()

    Sqldealer = db_data_deal.sqldealer()
    db_data_dict = Sqldealer.road_data()
    Sqldealer.insert_data(today_balance,market_price)
    db_data_dict = Sqldealer.road_data()
    Sqldealer.save_dict_to_json(df_dict = db_data_dict, out_path = S3config.dict_file_path)

    S3dealer = s3_deal.s3_dealer(bucket=S3config.bucket)
    S3dealer.save_dict_to_s3(local_file_path=S3config.dict_file_path,file_name_base=S3config.dict_file_name_base)

    ### Call Lambda②(nicehash-draw-figure)
    response = boto3.client('lambda').invoke(
        FunctionName='nicehash-draw-figure',
        InvocationType='RequestResponse',
        Payload=json.dumps(db_data_dict, cls = db_data_deal.DateTimeEncoder)
    )
    get_file_local_path = S3dealer.get_s3_file_item(file_name_base=S3config.figure_file_name_base)

    msg = Messenger.create_notification_msg(db_data_dict)    
    notify(msg,get_file_local_path)
    print(msg)

def notify(msg, *args):
    headers = {"Authorization": "Bearer %s" % LINEconfig.LINE_NOTIFY_ACCESS_TOKEN}
    url = "https://notify-api.line.me/api/notify"
    payload = {'message': msg}

    if len(args) == 0:
        requests.post(url, data=payload, headers=headers)
    else:
        files = {"imageFile": open(args[0], "rb")}
        requests.post(url, data=payload, headers=headers,files=files)
        os.remove(args[0])
    return 0

DBからの情報取得/DB更新処理を行うクラス

db_data_deal.py
import os
import json
from json import JSONEncoder
import mysql.connector
import boto3
import datetime

import mysql_config as SQLconfig

class sqldealer:
    def __init__(self):
        self.connection = mysql.connector.connect(user=SQLconfig.user, 
                          password=SQLconfig.password, host=SQLconfig.host, database=SQLconfig.database)
        self.columns = ['db-id','date','jpy_balance','jpy_diff','market','btc_balance','btc_diff']
        self.db_data_dict = dict()

    ### Get statistics information from MySQL
    def road_data(self):
        try:
            with self.connection.cursor() as cur:
                select_sql = 'SELECT * FROM nicehash_info;'
                cur.execute(select_sql)
                row_db_data = cur.fetchall()
        except:
           err_msg = 'Error001:DB-data取得に失敗'
           print(err_msg)
           return err_msg
        return self.datashaping_tuple_to_dict(row_db_data)

    ### Insert today's balance record into MySQL       
    def insert_data(self,today_balance,market_price):
        try:
            db_id = self.db_data_dict['db-id'][-1] + 1
            date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y-%m-%d')
            jpy_balance = today_balance
            jpy_diff = today_balance - self.db_data_dict['jpy_balance'][-1]
            btc_balance = today_balance/market_price
            btc_diff = btc_balance - self.db_data_dict['btc_balance'][-1]

            insert_info = str(db_id)+','+'"'+date+'"'+','+str(jpy_balance)+','+str(jpy_diff)+','+str(market_price)+','+str(btc_balance)+','+str(btc_diff)
            insert_sql = 'INSERT INTO nicehash_info VALUES('+insert_info+');'
            print(insert_sql)

            with self.connection.cursor() as cur:
                cur.execute(insert_sql)
                cur.execute('commit;')                
        except:
           err_msg = 'Error002:DB更新に失敗'
           print(err_msg)
           return err_msg

    ### Cast DB row data to dict type
    def datashaping_tuple_to_dict(self,tupple_data):
        try:
            db_data_list = [[],[],[],[],[],[],[]]
            db_data_dict = dict()
            for i in range(len(tupple_data)):
                for j in range(len(tupple_data[i])):
                    db_data_list[j].append(tupple_data[i][j])

            self.db_data_dict = dict(zip(self.columns, db_data_list))
        except:
           err_msg = 'Error003:DBデータの型変換に失敗'
           print(err_msg)
           return err_msg
        return self.db_data_dict

    ### Save dict object in json format
    def save_dict_to_json(self,df_dict,out_path):
        if os.path.exists(out_path):
            with open(out_path,'w') as json_obj:
                json_obj.write("")
                print("jsonファイル作成")
        with open(out_path, 'w') as json_file:
            json.dump(df_dict, json_file, cls = DateTimeEncoder)

class DateTimeEncoder(JSONEncoder):
        ### Override the default method
        def default(self, obj):
            if isinstance(obj, (datetime.date, datetime.datetime)):
                return obj.isoformat()

NiceHashから残高情報を取得するためのクラス

nicehash.py
from datetime import datetime
from time import mktime
import uuid
import hmac
import requests
import json
from hashlib import sha256
import optparse
import sys

class private_api:
  def __init__(self, host, organisation_id, key, secret, verbose=False):
      self.key = key
      self.secret = secret
      self.organisation_id = organisation_id
      self.host = host
      self.verbose = verbose
  def request(self, method, path, query, body):
      xtime = self.get_epoch_ms_from_now()
      xnonce = str(uuid.uuid4())
      message = bytearray(self.key, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(str(xtime), 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(xnonce, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(self.organisation_id, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(method, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(path, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(query, 'utf-8')
      if body:
          body_json = json.dumps(body)
          message += bytearray('\x00', 'utf-8')
          message += bytearray(body_json, 'utf-8')
      digest = hmac.new(bytearray(self.secret, 'utf-8'), message, sha256).hexdigest()
      xauth = self.key + ":" + digest
      headers = {
          'X-Time': str(xtime),
          'X-Nonce': xnonce,
          'X-Auth': xauth,
          'Content-Type': 'application/json',
          'X-Organization-Id': self.organisation_id,
          'X-Request-Id': str(uuid.uuid4())
      }
      s = requests.Session()
      s.headers = headers
      url = self.host + path
      if query:
          url += '?' + query
      if self.verbose:
          print(method, url)
      if body:
          response = s.request(method, url, data=body_json)
      else:
          response = s.request(method, url)
      if response.status_code == 200:
          return response.json()
      elif response.content:
          raise Exception(str(response.status_code) + ": " + response.reason + ": " + str(response.content))
      else:
          raise Exception(str(response.status_code) + ": " + response.reason)

  def get_epoch_ms_from_now(self):
      now = datetime.now()
      now_ec_since_epoch = mktime(now.timetuple()) + now.microsecond / 1000000.0
      return int(now_ec_since_epoch * 1000)

CoinGeckoを利用して仮装通貨相場をリアルタイムで取得するクラス

marketrate.py
import requests
import json

class trade_table:
    def __init__(self, market="BTC"):
        ### currency-name conversion table
        self.currency_rename_table = {'BTC':'Bitcoin','ETH':'Ethereum','LTC':'Litecoin',
                                      'XRP':'XRP','RVN':'Ravencoin','MATIC':'Polygon',
                                      'BCH':'Bitcoin Cash','XLM':'Stellar','XMR':'Monero','DASH':'Dash'}
        self.market = self.currency_rename_table[market]

    def get_rate(self):
        body = requests.get('https://api.coingecko.com/api/v3/coins/markets?vs_currency=jpy')
        coingecko = json.loads(body.text)
        idx = 0
        while coingecko[idx]['name'] != self.market:
            idx += 1
            if idx > 100:
                return "trade_table_err"
        else:
            return int(coingecko[idx]['current_price'])

残高の統計情報及び統計グラフをS3バケットへread/writeするクラス

s3_deal.py
import boto3
import json
import datetime

class s3_dealer:
  def __init__(self, bucket = 'nice-hash-graph-backet'):
    self.datestamp = str(datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y-%m-%d'))
    self.s3 = boto3.resource('s3')
    self.bucket = self.s3.Bucket(bucket)

  def save_dict_to_s3(self, local_file_path, file_name_base = 'balance_stat_data'):
    file_name = file_name_base + '_' + self.datestamp + '.json'
    self.bucket.upload_file(Filename=local_file_path,Key=file_name)
    print("Completed json object upload to s3...")

  def save_figure_to_s3(self, local_file_path, file_name_base = 'balance_stat_graph'):
    file_name = file_name_base + '_' + self.datestamp + '.png'
    self.bucket.upload_file(Filename=local_file_path,Key=file_name)
    print("Completed figure upload to s3...")

  def get_s3_file_item(self, file_name_base = 'balance_stat_graph'):
    file_name = file_name_base + '_' + self.datestamp + '.png'
    local_file_path = '/tmp/'+file_name
    self.bucket.download_file(Filename=local_file_path,Key=file_name)
    print("Data download from s3 is completed...")
    return local_file_path

LINE通知メッセージの作成するクラス

create_message.py
import datetime

import nicehash
import marketrate

import nicehash_config as NICEHASHconfig

class create_messenger:
    def __init__(self):
        self.host = 'https://api2.nicehash.com'
        self.organisation_id = NICEHASHconfig.organisation_id
        self.key = NICEHASHconfig.key
        self.secret = NICEHASHconfig.secret
        self.market='BTC'

    def get_balance_and_rate(self):
        host = 'https://api2.nicehash.com'
        #Get mining information from NiceHash API
        PrivateApi = nicehash.private_api(self.host, self.organisation_id, self.key, self.secret)
        accounts_info = PrivateApi.get_accounts_for_currency(self.market)
        balance_row = float(accounts_info['totalBalance'])
        #Get currency_to_JPY_rate from CoinGecko API
        TradeTable = marketrate.trade_table(self.market)
        rate = TradeTable.get_rate()
        balance_jpy = int(balance_row*rate)
        return (balance_jpy,rate)

    def create_notification_msg(self, df_dict):
        rate = df_dict['market'][-1]
        jpy_diff = df_dict['jpy_diff'][-1]
        jpy_balance = df_dict['jpy_balance'][-1]
        jpy_pre_balance = df_dict['jpy_balance'][-2]
        jpy_symbol = "+" if jpy_diff > 0 else ""
        btc_diff = df_dict['btc_diff'][-1]
        btc_balance = df_dict['btc_balance'][-1]
        btc_pre_balance = df_dict['btc_balance'][-2]
        btc_symbol = "+" if btc_diff > 0 else ""
        #Nortification message
        time_text = "時刻: " + str(datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))))[:19]
        market_text = "仮想通貨: " + self.market
        rate_text = self.market + "-円相場: " + str(rate) + "円"
        balance_text = "現在の残高: " + str(jpy_balance) + "円"
        pre_balance_text = "前日の残高: " + str(jpy_pre_balance) + "円"
        diff_label_text = "前日との差分:"
        diff_text = "【"+str(jpy_symbol) + str(jpy_diff) + "円, "+ str(btc_symbol) + str(btc_diff) + self.market +"】"
        msg = '\n'.join(["",time_text,market_text,rate_text,balance_text,pre_balance_text,diff_label_text,diff_text])
        return msg

2-1-3-2.Lambda②:nicehash-draw-figure のソース

nicehash-draw-figure
nicehash-draw-figure/
├ lambda_function.py
├ plot_stat.py
├ s3_deal.py
└ s3_config.py

nicehash-balance-notification-LINEによって呼び出されるメインプログラム

lambda_function.py
import json

import plot_stat
import s3_deal

import s3_config as S3config

### AWS Lambda handler method
def lambda_handler(event, context):
    df_dict = event
    ### Graph drawing of statistical data
    Statdrawer = plot_stat.statdrawer()
    Statdrawer.drawfig(df_dict)

    S3dealer = s3_deal.s3_dealer(bucket=S3config.bucket)
    S3dealer.save_figure_to_s3(local_file_path=S3config.figure_file_path,file_name_base=S3config.figure_file_name_base)
    return event

統計情報をグラフ描画するクラス

plot_stat.py
import matplotlib.pyplot as plt
import datetime

import s3_config as S3config

class statdrawer:
    def __init__(self):
        self.fig = plt.figure()

    def drawfig(self,df_dict):
        ax1 = self.fig.add_subplot(111)

        #JPY-balance
        x_date = [datetime.datetime.strptime(str(s),'%Y-%m-%d') for s in df_dict['date']]
        ax1.bar(x_date,df_dict['jpy_balance'],
                    width=0.5,linewidth=0.5,label='Jpy-Balance')
        h1, l1 = ax1.get_legend_handles_labels()
        ax1.set_xlabel('Date')
        ax1.set_ylabel('Jpy-Balance [yen]')
        ax1.grid(True)
        plt.xticks(rotation=45,fontsize=6)

        #BTC-market
        ax2 = ax1.twinx()
        ax2.plot(x_date,df_dict['market'],
                     color="red", linestyle="solid", markersize=8, label='Btc-Rate')
        ax2.set_ylabel('BTC-rate [million]')
        h2, l2 = ax2.get_legend_handles_labels()

        #BTC-balance
        ax3 = ax1.twinx()
        ax3.plot(x_date,df_dict['btc_balance'],
                     color="limegreen", linestyle="solid", markersize=8, label='Btc-Balance')
        ax3.set_ylabel('Btc-Balance [btc]')
        ax3.spines["right"].set_position(("axes", 1.15))
        h3, l3 = ax3.get_legend_handles_labels()
        ax3.legend(h1+h2+h3, l1+l2+l3, loc='lower left')

        #Range adjustment
        ax1_min,ax1_max = min(df_dict['jpy_balance']),max(df_dict['jpy_balance'])
        ax2_min,ax2_max = min(df_dict['market']),max(df_dict['market'])
        ax3_min,ax3_max = min(df_dict['btc_balance']),max(df_dict['btc_balance'])
        ax1.set_ylim(ax1_min, ax1_max*1.01)
        ax2.set_ylim(ax2_min*0.9, ax2_max*1.01)
        ax3.set_ylim(ax3_min*0.9, ax3_max*1.1)

        ### Output of statistical graph
        self.fig.subplots_adjust(bottom=0.1)
        self.fig.savefig(S3config.figure_file_path,bbox_inches='tight')

2-1-3-3.コンフィグ

各サービス/外部APIと連携するためにコンフィグに必要な設定値を指定する
下記コンフィグの設定値詳細については、②NiceHashマイニング収益をAWS Lambda×LINE NotifyでLINE通知するを参照。

line_config.py
LINE_NOTIFY_ACCESS_TOKEN = '[LINEアクセストークン]'
### ※設定値は2-5節参照
mysql_config.py
user='[MySQLアクセスユーザ]'
password='[MySQLアクセスユーザpw]'
host='[EC2インスタンスの静的IP]'
database='[MySQLに構築したDatabase名]'
nicehash_config.py
organisation_id = '[NiceHash組織ID]'
key = '[NiceHash APIアクセスキー]'
secret = '[NiceHash APIシークレットアクセスキー]'
### ※設定値は2-2節参照
s3_config.py
bucket = '[S3バケット名]'
dict_file_name_base = 'balance_stat_data'
dict_file_path = '/tmp/balance_stat_data.json'
figure_file_name_base = 'balance_stat_graph'
figure_file_path = '/tmp/balance_stat_graph.png'
### ※設定値は2-4節参照

2-1-4.Module組み込み

実行に必要なパッケージを取り込む
・Lambda①:nicehash-balance-notification-LINEには「mysql-connector-python」が必要なので、AWS Cloud9上でディレクトリを切って、下記コマンドを実行して環境を整備する。LambdaへのデプロイもCloud9上で行う。

nicehash-balance-notification-LINE
ec2-user:~/environment (master) $ mkdir nicehash-balance-notification-LINE
ec2-user:~/environment (master) $ cd nicehash-balance-notification-LINE
ec2-user:~/environment/nicehash-balance-notification-LINE (master) $ pip install -t ./ mysql-connector-python

・Lambda②:nicehash-draw-figureについても「matplotlib」が必要なので、同様にCloud9上にディレクトリを切って環境を整備しLambdaへデプロイする。

nicehash-draw-figure
ec2-user:~/environment (master) $ mkdir nicehash-draw-figure
ec2-user:~/environment (master) $ cd nnicehash-draw-figure
ec2-user:~/environment/nicehash-draw-figure (master) $ pip install -t ./ matplotlib

2-1-5.基本設定の編集

メモリ/タイムアウトエラーを回避するために基本設定値を変更する
・Lambdaはデフォルトだと、メモリ:128MB、タイムアウト:3秒になっているため、実行状況の様子をみてメモリ「128MB200MB」、タイムアウト「3秒15秒」程度へ変更しておく。

2-2.NiceHash APIによる収益情報取得

外部APIからマイニング収益情報を取得できるようKEYを取得する
・API Keys取得手順はこちらを参照。

2-3.EventBridgeによるトリガー定義

日次ジョブとしてLambdaをキックするためのトリガーを定義する
・下記トリガーを作成して、Lambda①:nicehash-balance-notification-LINEにアタッチする

ルール:「新規ルールの作成」
ルール名:DailyTrigger
ルールタイプ:スケジュール式
スケジュール式:cron(0 15 * * ? *) # 毎日0:00に実行するcron

2-4.S3バケットの作成

ファイルの受け渡しを行うS3バケットを用意する
s3_config.pyに指定したバケットをあらかじめS3上に作成しておく

2-5.LINE NotifyによるLINE通知

AWSへLINE Nortifyを連携するために必要なトークンを発行する
・LINE連携、Access tokenの取得方法については、こちら を参照。

2-6.RDBの作成

収益の統計情報を管理するDBを用意する
・RDBでNiceHash収益の統計情報を管理するために、EC2上にMySQLを導入する
 ※Amazon RDSを使うべきだが、料金的な都合からMySQL on EC2で代用
・Lambda①:nicehash-balance-notification-LINEを配置したVPC上の同サブネットにEC2インスタンスを作成
・作成したEC2インスタンスにMySQLをインストール
 ※MySQLのインストールはこの辺を参照
・シンプルなテーブル一つで事足りるので、とりあえず下記のようにnicehash_infoテーブルを定義

nicehash_infoテーブル構造
mysql> SHOW COLUMNS FROM nicehash_info;
+-------------+-------------+------+-----+---------+----------------+
| Field       | Type        | Null | Key | Default | Extra          |
+-------------+-------------+------+-----+---------+----------------+
| id          | int         | NO   | PRI | NULL    | auto_increment |
| date        | date        | YES  |     | NULL    |                |
| jpy_balance | int         | NO   |     | NULL    |                |
| jpy_diff    | int         | NO   |     | NULL    |                |
| market      | int         | NO   |     | NULL    |                |
| btc_balance | double(5,4) | NO   |     | NULL    |                |
| btc_diff    | double(5,4) | NO   |     | NULL    |                |
+-------------+-------------+------+-----+---------+----------------+
7 rows in set (0.00 sec)

id:レコードID
date:日付
jpy_balance/btc_balance:残高
jpy_diff/btc_diff:前日残高との差分
market:BTCの円相場

3. 実行結果

・毎日0:00になると、日次収益と残高/円相場の変遷グラフがLINE通知されるようになりました。
・仮装通貨の換金タイミングを把握するため、リグの稼働状況をヘルスチェックするためにBtc-Balanceも描画するようにしました。
 ※ BTC残高の減少⇒別通貨への換金
 ※ BTC残高の増加⇒リグの正常稼働

S__35209345.jpg
・5月の暴落が顕著すぎて、分かってたけど悲しくなった。。。

4. 終わりに

・pandas.DataFrameなどは使わずlistやdict等の組み込み関数のみで実装したが、パッケージの都合上、250MB以内には収められなかったのでLambdaのデプロイ上限250MB(圧縮50MB)は結構ボトルネックになると痛感した。。。
・そもそもLambdaは、機能毎に分割した最小単位で定義してシステムはLambdaを組み合わせて構築するものという前提があるのかと思った。

5. 更新履歴

ver. 1.0 初版投稿 2021/07/24
ver. 2.0 BTC単位の残高も描画するよう改修 2021/07/25

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1