LoginSignup
1
1

More than 1 year has passed since last update.

AWS上にリグの監視/自動復旧システムを作ってみた(①監視/自動復旧編)

Last updated at Posted at 2022-01-10

#目次

#1.背景
 Windows10×NiceHashのリグが謎のBSODで死ぬことが何度かあり収益が落ち込むことがあったのでどうにかしたかった。OS側が死ぬとquickminer側のErrorHanding(プロセス再起動)ではどうしようもできないので対策が必要だった。

#2.構成
 NiceHash APIで定期的にリグステータスを取得、ステータスに応じてOSハングを検知する。ハングしていた場合には、swichbot経由で物理的にリグをリブートとするというゴリ押し構成。
※ Win10をホストOSとしてHyper-V上でNiceHashを動かすことも考えたが、仮想化するとGPU性能をフルに活用できなそうだったので、泣く泣く物理的に落としにいく構成に...

全体構成図.png

処理の流れ
 ①10分に1度、リグ監視用のLambdaがキックされる
 ②LambdaからDBサーバへアクセスして監視フラグが有効な場合には、
  NiceHash APIによりリグのマイニングステータスを取得する
 ③マイニングステータスに変化があった場合には、DBを更新してLINE通知する
 ④マイニングステータスが異常の場合には、switchbot APIで対象リグを強制リブートする
全体構成図_ピックアップ.png

##2-1.監視ロジック
監視状態(監視フラグ/障害フラグ/異常カウンタ)をDBサーバ上で管理して
フラグと取得マイニングステータスの状態に応じて実行する処理を制御する
フローチャート.png

##2-2.Lambda
###2-2-1.Lambda関数の作成
呼び出されるLambda関数本体を作成する

【Lambda】
 関数名:「nicehash-surveillance」
 ランタイム:「Python 3.6」
 ※DBサーバへアクセスできるようにVPC設定も必要

###2-2-2.プログラムソース
Lambdaにデプロイするプログラム

nicehash-surveillance
nicehash-surveillance/
├ lambda_function.py
├ db_data_deal.py
├ nicehash.py
├ rig_healthcheck.py
├ line_notify.py
├ switchbot.py
├ line_config.py
├ mysql_config.py
├ nicehash_config.py
└ switchbot_config.py

Lambdaメインプログラム

lambda_function.py
import json
import requests
import os
import datetime
import boto3

import db_data_deal
import nicehash
import rig_healthcheck
import line_notify
import switchbot

#Function kicked by AWS Lambda
def lambda_handler(event, context):
    Sqldealer = db_data_deal.sqldealer()
    Sqldealer.get_rig_db_info()

    # 監視対象Rigが少なくとも一つある場合
    if 1 in Sqldealer.db_data_dict['surveillance_FLG']:
        # NiceHach API 情報取得
        Nicehash = nicehash.private_api()
        rig_miner_status_dict = Nicehash.get_miner_status(Sqldealer.db_data_dict)
        # Rigの正常性確認
        hang_up_rig_name_list = rig_healthcheck.get_hang_up_rig_name_list(rig_miner_status_dict)
        # Rigで既に障害が発生していた場合
        if 1 in Sqldealer.db_data_dict['incident_FLG']:
            # Rigが正常の場合
            if len(hang_up_rig_name_list) == 0:
                line_notify.send_restore_msg()
                {Sqldealer.update_incident_flg_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']}
                {Sqldealer.update_error_cnt_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']}
                print("return3 監視対象Rigの復旧を確認しました...")
                return 3
            # Rigが復旧できなかった場合    
            else:
                # 復旧試行回数が5回以上の場合
                if 5 in Sqldealer.db_data_dict['error_CNT']:
                    line_notify.send_surveillance_stop_msg(hang_up_rig_name_list)
                    {Sqldealer.update_surveillance_flg_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']}
                    print("return4 監視対象Rigを復旧できませんでした監視を停止します...")
                    return 4
                # 復旧試行回数が5回未満の場合
                else:
                    line_notify.send_reboot_retly_msg(hang_up_rig_name_list)
                    Sqldealer.db_data_dict = rig_healthcheck.add_reboot_req_flg(Sqldealer.db_data_dict, hang_up_rig_name_list)
                    # Rigの再起動
                    switchbot.hang_up_rig_reboot(Sqldealer.db_data_dict)
                    {Sqldealer.update_error_cnt_increment(rig_name) for rig_name in hang_up_rig_name_list}
                    print("return5 監視対象Rigの復旧を確認できませんでした再度再起動します...")
                    return 5
        else:
            # Rigが正常の場合
            if len(hang_up_rig_name_list) == 0:
                print("return1 監視対象Rigは正常です...")
                return 1
            # Rigで新たに障害を検知した場合
            else:
                line_notify.send_incident_msg(hang_up_rig_name_list)
                Sqldealer.db_data_dict = rig_healthcheck.add_reboot_req_flg(Sqldealer.db_data_dict, hang_up_rig_name_list)
                # Rigの再起動
                switchbot.hang_up_rig_reboot(Sqldealer.db_data_dict)
                {Sqldealer.update_incident_flg_to_1(rig_name) for rig_name in hang_up_rig_name_list}
                print("return2 監視対象Rigで障害を検知復旧を試みました...")
                return 2
    else:
        print("return0 監視対象Rigがないため処理を終了します...")
        return 0

DBからの情報取得/DB更新処理を行うクラス

db_data_deal.py
### Module
### pip install -t ./ mysql-connector-python
import os
import json
from json import JSONEncoder
import mysql.connector
import boto3
import datetime

import mysql_config as SQLconfig

class sqldealer:
    def __init__(self):
        self.connection = mysql.connector.connect(user=SQLconfig.user, 
                          password=SQLconfig.password, host=SQLconfig.host, database=SQLconfig.database)
        self.db_data_dict = dict()
        
    def road_data(self,sql):
        with self.connection.cursor() as cur:
            select_sql = sql
            cur.execute(select_sql)
            row_db_data = cur.fetchall()
        return row_db_data
        
    def commit_data(self,sql):
        with self.connection.cursor() as cur:
            cur.execute(sql)
            cur.execute('commit;')
        
    def get_rig_db_info(self):
        sql = 'SELECT * FROM nicehash_surveillance_info;'
        columns = ["rig_no","rig_id","rig_name","surveillance_FLG","incident_FLG","error_CNT","switchbot_dev_id"]
        self.db_data_dict = {key:[] for key in columns}
        
        row_db_data = self.road_data(sql)
        print("DB-info01:success road_data")
        
        for j, col in enumerate(columns):
            for i in range(len(row_db_data)):
                self.db_data_dict[col].append(row_db_data[i][j])
        print("DB-info02:success get_rig_db_info")
    
    def update_incident_flg_to_0(self,rig_name):
        sql = 'UPDATE nicehash_surveillance_info SET incident_FLG=0 WHERE rig_name="'+rig_name+'";'
        self.commit_data(sql)
        print("DB-info03:success update_incident_flg_to_0")
        
    def update_incident_flg_to_1(self,rig_name):
        sql = 'UPDATE nicehash_surveillance_info SET incident_FLG=1 WHERE rig_name="'+rig_name+'";'
        self.commit_data(sql)
        print("DB-info04:success update_incident_flg_to_1")
        
    def update_surveillance_flg_to_0(self,rig_name):
        sql = 'UPDATE nicehash_surveillance_info SET surveillance_FLG=0 WHERE rig_name="'+rig_name+'";'
        self.commit_data(sql)
        print("DB-info05:success update_surveillance_flg_to_0")
        
    def update_surveillance_flg_to_1(self,rig_name):
        sql = 'UPDATE nicehash_surveillance_info SET surveillance_FLG=1 WHERE rig_name="'+rig_name+'";'
        self.commit_data(sql)
        print("DB-info06:success update_surveillance_flg_to_1")
        
    def update_error_cnt_increment(self,rig_name):
        idx = self.db_data_dict['rig_name'].index(rig_name)
        error_CNT = self.db_data_dict['error_CNT'][idx] + 1
        sql = 'UPDATE nicehash_surveillance_info SET error_CNT='+str(error_CNT)+' WHERE rig_name="'+rig_name+'";'
        self.commit_data(sql)
        print("DB-info07:success update_error_cnt_increment")
        
    def update_error_cnt_to_0(self,rig_name):
        sql = 'UPDATE nicehash_surveillance_info SET error_CNT=0 WHERE rig_name="'+rig_name+'";'
        self.commit_data(sql)
        print("DB-info08:success update_error_cnt_to_0")
        
class DateTimeEncoder(JSONEncoder):
    #Override the default method
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()

NiceHashからリグステータスを取得するためのクラス
NiceHash APIのリクエスト/引数については、公式のdocsを参照。

nicehash.py
from datetime import datetime
from time import mktime
import uuid
import hmac
import requests
import json
from hashlib import sha256
import optparse
import sys

import nicehash_config as NICEHASHconfig

class private_api:
  def __init__(self, verbose=False):
      self.key = NICEHASHconfig.key
      self.secret = NICEHASHconfig.secret
      self.organisation_id = NICEHASHconfig.organisation_id
      self.host = NICEHASHconfig.host
      self.verbose = verbose
      
  def request(self, method, path, query, body):
      xtime = self.get_epoch_ms_from_now()
      xnonce = str(uuid.uuid4())
      message = bytearray(self.key, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(str(xtime), 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(xnonce, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(self.organisation_id, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(method, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(path, 'utf-8')
      message += bytearray('\x00', 'utf-8')
      message += bytearray(query, 'utf-8')
      if body:
          body_json = json.dumps(body)
          message += bytearray('\x00', 'utf-8')
          message += bytearray(body_json, 'utf-8')
      digest = hmac.new(bytearray(self.secret, 'utf-8'), message, sha256).hexdigest()
      xauth = self.key + ":" + digest
      headers = {
          'X-Time': str(xtime),
          'X-Nonce': xnonce,
          'X-Auth': xauth,
          'Content-Type': 'application/json',
          'X-Organization-Id': self.organisation_id,
          'X-Request-Id': str(uuid.uuid4())
      }
      s = requests.Session()
      s.headers = headers
      url = self.host + path
      if query:
          url += '?' + query
      if self.verbose:
          print(method, url)
      if body:
          response = s.request(method, url, data=body_json)
      else:
          response = s.request(method, url)
      if response.status_code == 200:
          print("NiceHash-info01:success request")
          return response.json()
      elif response.content:
          raise Exception(str(response.status_code) + ": " + response.reason + ": " + str(response.content))
      else:
          raise Exception(str(response.status_code) + ": " + response.reason)

  def get_epoch_ms_from_now(self):
      now = datetime.now()
      now_ec_since_epoch = mktime(now.timetuple()) + now.microsecond / 1000000.0
      return int(now_ec_since_epoch * 1000)

  def get_miner_status(self, rig_db_info):
    rig_miner_status_dict = dict()
    for rig_id,rig_name in zip(rig_db_info['rig_id'],rig_db_info['rig_name']):
        rig_info = self.request("GET", "/main/api/v2/mining/rig2/" + rig_id, "", None)
        rig_miner_status_dict[rig_name] = rig_info['minerStatus']
    print("NiceHash-info02:success get_miner_status")
    return rig_miner_status_dict

リグステータスからヘルスチェックする関数リスト

rig_healthcheck.py
def get_hang_up_rig_name_list(rig_miner_status_dict):
    # BENCHMARKING/MINING/STOPPEDであれば正常と判定
    ACTIVE_STATUS_LIST = ["BENCHMARKING","MINING","STOPPED"]
    hang_up_rig_name_list = list()
    for rig_name in  rig_miner_status_dict.keys():
        if rig_miner_status_dict[rig_name] not in ACTIVE_STATUS_LIST:
            hang_up_rig_name_list.append(rig_name)
    
    print("HC-info01:success get_hang_up_rig_name_list")
    return hang_up_rig_name_list
    
def add_reboot_req_flg(db_data_dict, hang_up_rig_name_list):
    db_data_dict['reboot_req_flg'] = [0]*len(db_data_dict['rig_name'])
    for err_rig_name in hang_up_rig_name_list:
        idx = db_data_dict['rig_name'].index(err_rig_name)
        db_data_dict['reboot_req_flg'][idx] = 1
    return db_data_dict

switchbot経由でリグを再起動する関数
SwitchBot APIのリクエストや引数については、公式Githubを参照。

switchbot.py
import json
import requests
import os
import datetime
import boto3

import switchbot_config as sb_cnf

def hang_up_rig_reboot(db_data_dict):
    headers = {
        'Content-Type': 'application/json; charset: utf8',
        'Authorization': sb_cnf.access_token
    }
    body = {
        "command":"turnOn",
        "parameter":"default",
        "commandType":"command"
    }
    input_action = json.dumps(body)
    print(input_action)
    
    for i,dev_id in enumerate(db_data_dict['switchbot_dev_id']):
        if db_data_dict['reboot_req_flg'][i] == 1:
            url = sb_cnf.api_url + "/v1.0/devices/" + dev_id + "/commands"
            result = requests.post(url, data=input_action, headers=headers)
            print(result)

リグ監視状況をLINE通知する関数

line_notify.py
import json
import requests

import line_config as LINEconfig

def send_msg(msg):
    headers = {"Authorization": "Bearer %s" % LINEconfig.LINE_NOTIFY_ACCESS_TOKEN}
    url = LINEconfig.NOTIFICATION_URL
    payload = {'message': msg}
    requests.post(url, data=payload, headers=headers)
    
def send_incident_msg(hang_up_rig_name_list):
    err_rigs = ','.join(hang_up_rig_name_list)
    msg = err_rigs+' で障害発生。'+'\n'+'対象リグを再起動します。'
    send_msg(msg)
    
def send_restore_msg():
    msg = '監視対象リグの復旧を確認。'+'\n'+'監視を継続します。'
    send_msg(msg)
    
def send_reboot_retly_msg(hang_up_rig_name_list):
    err_rigs = ','.join(hang_up_rig_name_list)
    msg = err_rigs+' の復旧を確認できません。'+'\n'+'再度リブートします。'
    send_msg(msg)
    
def send_surveillance_stop_msg(hang_up_rig_name_list):
    err_rigs = ','.join(hang_up_rig_name_list)
    msg = err_rigs+' を復旧できませんでした。'+'\n'+'リグの監視を中止します。'
    send_msg(msg)

###2-2-3.コンフィグ
各サービス/外部APIと連携するためにコンフィグに必要な設定値を指定する
下記コンフィグの設定値詳細については、2-2-5項を参照。

line_config.py
NOTIFICATION_URL = "https://notify-api.line.me/api/notify"
LINE_NOTIFY_ACCESS_TOKEN = "[LINEアクセストークン]"
mysql_config.py
user = "[MySQLアクセスユーザ]"
password = "[MySQLアクセスユーザpw]"
host = "[DBサーバの静的IP]"
database = "[MySQLで構築したDatabase名]"
nicehash_config.py
host = "https://api2.nicehash.com"
organisation_id = "[NiceHash組織ID]"
key = "[NiceHash APIアクセスキー]"
secret = "[NiceHash APIシークレットアクセスキー]"
switchbot_config.py
api_url = "https://api.switch-bot.com"
access_token = "[switchbotアクセストークン]"

###2-2-4.モジュールの導入
Lambdaの実行に必要なパッケージを取り込む
・Lambda:nicehash-surveillanceには「mysql-connector-python」が必要なので、AWS Cloud9上でディレクトリを切って、下記コマンドを実行して環境を整備する。LambdaへのデプロイもCloud9上で行うと楽なのでおすすめ。

nicehash-surveillance
ec2-user:~/environment (master) $ mkdir nicehash-surveillance
ec2-user:~/environment (master) $ cd nicehash-surveillance
ec2-user:~/environment/nicehash-surveillance (master) $ pip install -t ./ mysql-connector-python

##2-2-5.外部APIキーの取得
外部APIの認証に必要な鍵情報/トークンを取得する
##2-2-5-1.NiceHash
APIキー(コンフィグに記載)
こちら記載の手順を元にAPIキーを取得する。
リグID(DBに登録)
・NiceHashのダッシュボード or アプリから確認する。
 ※ダッシュボードの場合、以下の黄色部分に記載されている。
image.png

##2-2-5-2.LINE
アクセストークン(コンフィグに記載)
こちら記載の手順を元に取得する。

##2-2-5-3.SwitchBot
アクセストークン(コンフィグに記載)
・アクセストークンの取得方法はこちらの記事を参照。
・外部サービスと連携するために、SwitchBotアプリ側でも「クラウドサービス」をONにしておく。
deviceId(DBに登録)
・下記curlコマンドで各botのdeviceIdを取得する。

/usr/bin/curl -X GET "https://api.switch-bot.com/v1.0/devices"  -H "Authorization:[Access_token]"

##2-2-6.EventBridgeによるトリガー定義
定期ジョブとしてLambdaをキックするためのトリガーを定義する
・下記トリガーを作成して、Lambda:nicehash-surveillanceにアタッチする

ルール:「新規ルールの作成」
ルール名:DailyTrigger
ルールタイプ:スケジュール式
固定速度ごと:10分

##2-3.RDB
リグ情報、監視ステータスを管理するDBを用意する
##2-3-1.DB作成
節約のためRDSは使用せずに、UNIX OSのEC2インスタンスにMySQLを直接インストールしてDBを構築する
※MySQLのインストールはこの辺を参照
##2-3-2.テーブル作成
nicehash_surveillance_infoテーブルをDB上に定義する、最低限以下定義があればOK。
※以下はリグ3個で運用している場合の例

mysql> SHOW COLUMNS FROM nicehash_surveillance_info;
+------------------+--------------+------+-----+---------+-------+
| Field            | Type         | Null | Key | Default | Extra |
+------------------+--------------+------+-----+---------+-------+
| rig_no           | int(11)      | YES  |     | NULL    |       |
| rig_id           | varchar(100) | YES  |     | NULL    |       |
| rig_name         | varchar(100) | YES  |     | NULL    |       |
| surveillance_FLG | int(11)      | YES  |     | NULL    |       |
| incident_FLG     | int(11)      | YES  |     | NULL    |       |
| error_CNT        | int(11)      | YES  |     | NULL    |       |
| switchbot_dev_id | varchar(100) | YES  |     | NULL    |       |
+------------------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)

mysql> select * from nicehash_surveillance_info;
+--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+
| rig_no | rig_id                   | rig_name     | surveillance_FLG | incident_FLG | error_CNT | switchbot_dev_id             |
+--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+
|      1 | [myExcavator1のリグID]   | myExcavator1 |                1 |            0 |         0 | [myExcavator1BOTdeviceId] |
|      2 | [myExcavator2のリグID]   | myExcavator2 |                1 |            0 |         0 | [myExcavator2BOTdeviceId] |
|      3 | [MainPCのリグID]         | MainPC       |                1 |            0 |         0 | [MainPCBOTdeviceId]       |
+--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+
3 rows in set (0.00 sec)

rig_no:リグ番号
rig_id:NiceHash上のリグID
rig_name:NiceHash上のリグ名称
surveillance_FLG:監視状態を制御するフラグ(1:監視有効/0:監視無効)
incident_FLG:障害発生状態管理するフラグ(1:障害発生中/0:正常稼働中)
error_CNT:再起動処理のエラーカウンタ
switchbot_dev_id:SwitchBot上のdeviceId

##2-4.電源スイッチとBOTの設置
SwitchBotでリグを再起動できるように電源スイッチにBOTを設置する

でかでか電源ボタンをリグにつないで、BOTがResetボタンを押せるよう固定する。
リグが複数ある場合には、それぞれでかでか電源ボタンとBOTのセットを設置する

BOT×電源スイッチ.png

#3. 実行結果
リグがBSODで死にっぱなしになることがなくなった...!
LINEで通知されるメッセージは以下の通り。
結果.png

#4. 終わりに
HiveOSなら問題にならないかもですが、NiceHashではBSODで悩まされされることがあるので、物理的に無理やりOS立ち上げるゴリ押し運用でどうにかできるようにしました...(笑)
(それにしても、BSODの原因は何なのだろうか。)
(マザボ/ライザー/電源/複数GPU間のコンパチが良くないのだろうか...)
(そもそも、1万程度のマザボに品質を求めるのはおかしいのか...)

また、常時リグが監視がされている状態だとメンテなどで止める際も、障害と誤検知されてしまい不便なので、LINEから監視を制御できるようにもしてます。詳細は次回の「AWS上にリグの監視/自動復旧システムを作ってみた(②監視抑止操作編)」にまとめます。

#5. 更新履歴
ver. 1.0 初版投稿 2022/01/10

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1