0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

このチュートリアルでは、Alibaba Cloud Function ComputeDirectMailを使用して、WebサイトのSSL証明書を自動的に監視する方法を学びます。

Alibaba Cloud Tech Share執筆者、ジョン・ハンリー。Tech Shareは、技術的な知識やベストプラクティスをクラウドコミュニティ内で共有することを奨励するAlibaba Cloudのインセンティブプログラムです。

Alibaba Cloudのサービスを利用していると、SSL証明書に依存しているサービスが何十、何百もあることに気づくかもしれません。これには、Elastic Compute Service(ECS)インスタンス、WebサイトAPIゲートウェイサービス、Function Compute機能、CDNエンドポイントなどが含まれます。この記事では、SSL証明書を監視し、SSL証明書のステータスをメールで送信する方法について説明します。自動化、監視、レポート作成にはFunction ComputeDirectMailを使用します。

この記事は、Alibaba CloudでLet's Encrypt SSL証明書を使うシリーズの追加記事です。これらの証明書は90日で有効期限が切れるため、証明書の有効期限を追跡することは非常に重要です。しかし、SSL証明書を利用しているサービスを追跡するのは面倒なので、忘れてしまうことが多いです。別の記事では、Let's Encrypt SSL証明書を自動更新できるソフトウェアを開発を書いています。
この記事の目的は、その方法を紹介することです。コードは本番品質ではなく、むしろ教育品質です。本番用にデプロイする予定のソフトウェアはすべて、品質と要件への適合性を確認し、テストする必要があります。
この記事は、Alibaba Cloud Function ComputeDirectMailの基本的な理解があることを前提としています。そうでない場合は、これらのサービスを理解するのに役立つように、この記事を書きました。このチュートリアルの最後の部分では、Alibaba Cloud FCLIコマンドラインプログラムを使用して、テストとアップデートを高速化する方法を紹介します。

SSL監視、Pythonコードのダウンロード

このリンクをクリックして、このチュートリアルで使用するPythonコードをダウンロードしてください:SSLチェック - Python 3 (Zip - 10 KB)
最終更新日:2018年6月24日
必要な要件:Python 3.6以降(Python 2には対応していません)
プラットフォーム:Windows 10およびファンクション・コンピュートでテスト済み

SSL証明書ステータスレポート

まずは、このプログラムが生成するものを確認してみましょう。以下の表は、このコードによって生成され、指定したメールアドレスに電子メールで送信されます。このレポートには5つの列の情報があります。各行には、1つのホスト名のステータスが記載されています。最後の行は黄色で表示されていることに注目してください。この行には、そのホストが到達不可能であるというエラーメッセージが含まれています。わざとこのエラーを発生させるために、"bad.neoprime.xyz "というホスト名を入れました。
NeoPrime SSL 証明書ステータスレポート
Sat, 23 Jun 2018 18:24:41 GMT

image.png

キーとなるカラムは「Status」と「Expires」です。ステータスが「OK」と表示されていれば問題ありません。そうでない場合は、"Expired "や "Time to Renew "などのメッセージが表示されます。

プログラム構成パラメータ

初期テスト中に、コマンドラインからプログラムを実行します。このソフトウェアは、資格情報ファイルからアリババの資格情報を抽出します。資格情報に DirectMail を呼び出す権限があることを確認してください。Function Computeに切り替える準備ができたら、g_program_mode = PROGRAM_MODE_CMDLINEの行をg_program_mode = PROGRAM_MODE_ACS_FUNCに変更します。

PROGRAM_MODE_CMDLINE = 0    # The program operates from the command line
PROGRAM_MODE_ACS_FUNC = 1    # The program operates as a Alibaba Cloud Function Compute function

g_program_mode = PROGRAM_MODE_CMDLINE

g_days_left = 14        # Warn if a certificate will expire in less than this number of days

g_no_send = False        # if set, don't actually send an email. This is used for debugging

g_only_send_notices = False    # If set, only send emails if a certificate will expire soon or on error

g_email_required = False    # This is set during processing if a warning or error was detected

監視するホスト名を設定します。この例では、4つのホスト名を監視しています。

g_hostnames = [
    "neoprime.xyz",
    "api.neoprime.xyz",
    "cdn.neoprime.xyz",
    "www.neoprime.xyz",
    ]

レポートの件名を設定し、メールアドレスに送信します。

email_params['Subject'] = 'NeoPrime SSL Cerificate Status Report'
email_params['To'] = 'someone@example.com’

Alibaba Cloud DirectMailアカウントのパラメータを設定します。この例では、リージョンにシンガポールを使用しています。

# From the DirectMail Console
dm_account['Debug'] = 0
dm_account['Account'] = 'sender_address_from_directmail_console'
dm_account['Alias'] = 'my_alias_name_such_as_NeoPrime'
dm_account['host'] = "dm.ap-southeast-1.aliyuncs.com"
dm_account['url'] = "https://dm.ap-southeast-1.aliyuncs.com/"
Retrieving an SSL Certificate from an Internet Host

インターネットホストからSSL証明書を取得する

インターネットホストからSSL証明書を返すfunctionです。このSSL証明書には、ドメイン名や有効期限などの証明書に関する情報が含まれています。

def ssl_get_cert(hostname):
    """ This function returns an SSL certificate from a host """

    context = ssl.create_default_context()

    conn = context.wrap_socket(
        socket.socket(socket.AF_INET),
        server_hostname=hostname)

    # 3 second timeout because Function Compute has runtime limitations
    conn.settimeout(3.0)

    try:
        conn.connect((hostname, 443))
    except Exception as ex:
        print("{}: Exception: {}".format(hostname, ex), file=sys.stderr)
        return False, str(ex)

    host_ssl_info = conn.getpeercert()

    return host_ssl_info, ''

SSL証明書の確認

このfunctionは各ホスト名をループし、SSL 証明書の有効期限 (notAfter) をチェックします。このfunctionはまた、SSL証明書から発行者名やサブジェクト名(SAN)などの他の情報も抽出します。各ホスト名に対して、「add_row()」でHTMLテーブルに行を追加します。このfunctionは、構築したHTMLボディを返します。このHTMLボディは、送信されるメールメッセージの一部となります。

def process_hostnames(msg_body, hostnames):
    """ Process the SSL certificate for each hostname """

    # pylint: disable=global-statement
    global g_email_required

    ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'

    for host in hostnames:
        f_expired = False

        print('Processing host:', host)

        ssl_info, err = get_ssl_info(host)

        if ssl_info is False:
            msg_body = add_row(msg_body, host, err, '', '', '', True)
            g_email_required = True
            continue

        #print(ssl_info)

        issuerName = get_ssl_issuer_name(ssl_info)

        altNames = get_ssl_subject_alt_names(ssl_info)

        l_expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)

        remaining = l_expires - datetime.datetime.utcnow()

        if remaining < datetime.timedelta(days=0):
            # cert has already expired - uhoh!
            cert_status = "Expired"
            f_expired = True
            g_email_required = True
        elif remaining < datetime.timedelta(days=g_days_left):
            # expires sooner than the buffer
            cert_status = "Time to Renew"
            f_expired = True
            g_email_required = True
        else:
            # everything is fine
            cert_status = "OK"
            f_expired = False

        msg_body = add_row(msg_body, host, cert_status, str(l_expires), issuerName, altNames, f_expired)

    return msg_body

例文

############################################################
# Version 0.90
# Date Created: 2018-06-11
# Last Update:  2018-06-23
# https://www.neoprime.io
# Copyright (c) 2018, NeoPrime, LLC
# Author: John Hanley
############################################################

""" Alibaba Cloud Function Compute Example """

import    sys
import    datetime
import    socket
import    json
import    ssl
import    time
import    myemail
import    myhtml

PROGRAM_MODE_CMDLINE = 0    # The program operates from the command line
PROGRAM_MODE_ACS_FUNC = 1    # The program operates as a Alibaba Cloud Function Compute function

g_program_mode = PROGRAM_MODE_ACS_FUNC
# g_program_mode = PROGRAM_MODE_CMDLINE

g_days_left = 14        # Warn if a certificate will expire in less than this number of days

g_no_send = False        # if set, don't actually send an email. This is used for debugging

g_only_send_notices = False    # If set, only send emails if a certificate will expire soon or on error

g_email_required = False    # This is set during processing if a warning or error was detected

g_hostnames = [
    "neoprime.xyz",
    "api.neoprime.xyz",
    "cdn.neoprime.xyz",
    "www.neoprime.xyz",
    ]

email_params = {
    'To': '',
    'Subject': '',
    'Body': '',
    'BodyText': ''
}

email_params['Subject'] = 'NeoPrime SSL Certificate Status Report'
email_params['To'] = 'someone@example.com'

dm_account = {
    'Debug': 0,    # Debug flag
    'Account': '',    # DirectMail account
    'Alias': '',    # DirectMail alias
    'host': '',    # HTTP Host header
    'url': ''    # URL for POST
    }

# From the DirectMail Console
dm_account['Debug'] = 0
dm_account['Account'] = ''
dm_account['Alias'] = ''
dm_account['host'] = "dm.ap-southeast-1.aliyuncs.com"
dm_account['url'] = "https://dm.ap-southeast-1.aliyuncs.com/"

def ssl_get_cert(hostname):
    """ This function returns an SSL certificate from a host """

    context = ssl.create_default_context()

    conn = context.wrap_socket(
        socket.socket(socket.AF_INET),
        server_hostname=hostname)

    # 3 second timeout because Function Compute has runtime limitations
    conn.settimeout(3.0)

    try:
        conn.connect((hostname, 443))
    except Exception as ex:
        print("{}: Exception: {}".format(hostname, ex), file=sys.stderr)
        return False, str(ex)

    host_ssl_info = conn.getpeercert()

    return host_ssl_info, ''

def add_row(body, domain, status, expires, issuerName, names, flag_hl):
    """ Add a row to the HTML table """

    #build the url
    url = '<a href="https://' + domain + '">' + domain + '</a>'

    # begin a new table row
    if flag_hl is False:
        body += '<tr>\n'
    else:
        body += '<tr bgcolor="#FFFF00">\n'    # yellow

    body += '<td>' + url + '</td>\n'
    body += '<td>' + status + '</td>\n'
    body += '<td>' + expires + '</td>\n'
    body += '<td>' + issuerName + '</td>\n'
    body += '<td>' + names + '</td>\n'

    return body + '</tr>\n'

# Email specific

def send(account, credentials, params):
    """ email send function """

    # pylint: disable=global-statement
    global g_only_send_notices
    global g_email_required

    # If set, only send emails if a certificate will expire soon or on error
    if g_only_send_notices is True:
        if g_email_required is False:
            print('')
            print('All hosts have valid certificates')
            print('Sending an email is not required')
            return

    myemail.sendEmail(credentials, account, params, g_no_send)

def get_ssl_info(host):
    """ This function retrieves the SSL certificate for host """
    # If we receive an error, retry up to three times waiting 10 seconds each time.

    retry = 0
    err = ''

    while retry < 3:
        ssl_info, err = ssl_get_cert(host)

        if ssl_info is not False:
            return ssl_info, ''

        retry += 1
        print('    retrying ...')
        time.sleep(10)

    return False, err

def get_ssl_issuer_name(ssl_info):
    """ Return the IssuerName from the SSL certificate """

    issuerName = ''

    issuer = ssl_info['issuer']

    # pylint: disable=line-too-long
    # issuer looks like this:
    # This is a set of a set of a set of key / value pairs.
    # ((('countryName', 'US'),), (('organizationName', "Let's Encrypt"),), (('commonName', "Let's Encrypt Authority X3"),))

    for item in issuer:
        # item will look like this as it goes thru the issuer set
        # Note that this is a set of a set
        #
        # (('countryName', 'US'),)
        # (('organizationName', "Let's Encrypt"),)
        # (('commonName', "Let's Encrypt Authority X3"),)

        s = item[0]

        # s will look like this as it goes thru the isser set
        # Note that this is now a set
        #
        # ('countryName', 'US')
        # ('organizationName', "Let's Encrypt")
        # ('commonName', "Let's Encrypt Authority X3")

        # break the set into "key" and "value" pairs
        k = s[0]
        v = s[1]

        if k == 'organizationName':
            if v != '':
                issuerName = v
                continue

        if k == 'commonName':
            if v != '':
                issuerName = v

    return issuerName

def get_ssl_subject_alt_names(ssl_info):
    """ Return the Subject Alt Names """

    altNames = ''

    subjectAltNames = ssl_info['subjectAltName']

    index = 0
    for item in subjectAltNames:
        altNames += item[1]
        index += 1

        if index < len(subjectAltNames):
            altNames += ', '

    return altNames

def process_hostnames(msg_body, hostnames):
    """ Process the SSL certificate for each hostname """

    # pylint: disable=global-statement
    global g_email_required

    ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'

    for host in hostnames:
        f_expired = False

        print('Processing host:', host)

        ssl_info, err = get_ssl_info(host)

        if ssl_info is False:
            msg_body = add_row(msg_body, host, err, '', '', '', True)
            g_email_required = True
            continue

        #print(ssl_info)

        issuerName = get_ssl_issuer_name(ssl_info)

        altNames = get_ssl_subject_alt_names(ssl_info)

        l_expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)

        remaining = l_expires - datetime.datetime.utcnow()

        if remaining < datetime.timedelta(days=0):
            # cert has already expired - uhoh!
            cert_status = "Expired"
            f_expired = True
            g_email_required = True
        elif remaining < datetime.timedelta(days=g_days_left):
            # expires sooner than the buffer
            cert_status = "Time to Renew"
            f_expired = True
            g_email_required = True
        else:
            # everything is fine
            cert_status = "OK"
            f_expired = False

        msg_body = add_row(msg_body, host, cert_status, str(l_expires), issuerName, altNames, f_expired)

    return msg_body

def main_cmdline():
    """ This is the main function """

    # My library for processing Alibaba Cloud Services (ACS) credentials
    # This library is only used when running from the desktop and not from the cloud
    import mycred_acs

    # Load the Alibaba Cloud Credentials (AccessKey)
    cred = mycred_acs.LoadCredentials()

    if cred is False:
        print('Error: Cannot load credentials', file=sys.stderr)
        sys.exit(1)

    now = datetime.datetime.utcnow()
    date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")

    msg_body = ''

    msg_body = myhtml.build_body_top()
    msg_body += '<h4>NeoPrime SSL Cerificate Status Report</h4>'
    msg_body += date + '<br />'
    msg_body += '<br />'
    msg_body = myhtml.build_table_top(msg_body)

    #
    # This is where the SSL processing happens
    #
    msg_body = process_hostnames(msg_body, g_hostnames)

    msg_body = myhtml.build_table_bottom(msg_body)
    msg_body = myhtml.build_body_bottom(msg_body)

    email_params['Body'] = msg_body
    email_params['BodyText'] = ''

    #print(msg_body)

    send(dm_account, cred, email_params)

def main_acs_func(event, context):
    """ This is the main function """

    cred = {
        'accessKeyId': '',
        'accessKeySecret': '',
        'securityToken': '',
        'Region': ''
    }

    cred['accessKeyId'] = context.credentials.accessKeyId
    cred['accessKeySecret'] = context.credentials.accessKeySecret
    cred['securityToken'] = context.credentials.securityToken

    now = datetime.datetime.utcnow()
    date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")

    msg_body = ''

    msg_body = myhtml.build_body_top()
    msg_body += '<h4>NeoPrime SSL Cerificate Status Report</h4>'
    msg_body += date + '<br />'
    msg_body += '<br />'
    msg_body = myhtml.build_table_top(msg_body)

    #
    # This is where the SSL processing happens
    #
    msg_body = process_hostnames(msg_body, g_hostnames)

    msg_body = myhtml.build_table_bottom(msg_body)
    msg_body = myhtml.build_body_bottom(msg_body)

    email_params['Body'] = msg_body
    email_params['BodyText'] = ''

    #print(msg_body)

    send(dm_account, cred, email_params)

    return msg_body

def handler(event, context):
    """ This is the Function Compute entry point """

    body = ""

    body = main_acs_func(event, context)

    res = {
        'isBase64Encoded': False,
        'statusCode': 200,
        'headers': {
            'content-type' : 'text/html'
        },
        'body': body
    }

    return json.dumps(res)

# Main Program
if g_program_mode == PROGRAM_MODE_CMDLINE:
    main_cmdline()

機能の作成

次のスクリーンショットは、functionに設定するパラメータを示しています。2番目のスクリーンショットは、このfunctionは定期的に呼び出されるので、「タイムトリガー」に設定するパラメータを示しています。ここでは、タイムトリガーを1日1回、午前8時PST(16時GMT)に設定しています。

image.png

image.png

Function Computeの権限

Function Computeは、DirectMailを使用してメールを送信するための権限を必要とします。これには2つの方法があります。ソースコードに資格情報をハードコーディングする方法(非常に悪い考えです)と、RAM(Resource Access Manager)を使って、Function Computeサービスに割り当てる "ロール "を作成する方法(非常に良い考えです)です。

Function Compute用のRAMロールを作成する手順
1、Alibabaコンソールにログイン
2、リソースアクセスマネージャに移動
3、ロールをクリック
4、ロール作成ボタンをクリック
5、サービスの役割を選択
6、FC Function Computeを選択
7、ロールの名前と説明を入力
8、作成をクリック
このロールは作成されていますが、ロールに権限が付与されていません。
ロールに権限(オーソライズ)を付与する手順です。

1、承認ボタンをクリック
2、「認証ポリシーの編集」ボタンをクリック
3、検索キーワードボックスに「ダイレクト」と入力
4、AliyunDirectMailFullAccessを選択
5、右矢印ボタンをクリックして、ポリシーを右側にコピー
6、OKをクリックします。

Function ComputeとRAMロールの重要な概念は、ロールがFunction Computeサービスに割り当てられているということです。サービスの下にあるすべてのFunctionは、このロールを継承します。これは、複数のFunctionを持つFunction Computeサービスがある場合、RAMロールは各Functionに必要なパーミッションの合計を必要とすることを意味します。より厳しいセキュリティが必要な場合は、ロールのパーミッションに基づいて個別のサービスを作成します。
RAMポリシーは、付与されたパーミッションを記述したJSONドキュメントを作成します。この場合、ロールは、すべてのリソース(Resource: )に対して、dmで始まるすべてのアクション(Action: dm: )を許可しています。

{
  "Version": "1",
  "Statement": [
    {
      "Action": "dm:*",
      "Resource": "*",
      "Effect": "Allow"
    }
  ]
}

RAM ロールをサービスに割り当てる際に見落とされることが多いのは、サービスがそのロールを想定するためのパーミッションを必要とすることです。RAM Roleには、ロールを引き受けるためのSTS(Security Token Service)パーミッションとロールパーミッションの2つのコンポーネントがあります。
このJSONは、AssumeRoleアクションを介して、Function Computeサービス自身がロールを引き受けるために必要なパーミッションを記述しています。サービス名 "fc.aliyncs.com "とアクション "sts:AssumeRole "に注目してください。

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "fc.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

Function Computeのデバッグ

Alibaba Function Compute Consoleでfunctionを手動で呼び出します。以下のタイプのエラーが表示された場合は、サービスにDirectMail権限を持つRAM Roleを割り当てるのを忘れている可能性があります。

Error Code: 404
Error: {
    "Recommend":"https://error-center.aliyun.com/status/search?Keyword=InvalidAccessKeyId.NotFound&source=PopGw",
    "Message":"Specified access key is not found.",
    "RequestId":"31BEFC34-DD4F-4916-927A-773A7C4F26C5",
    "HostId":"dm.ap-southeast-1.aliyuncs.com",
    "Code":"InvalidAccessKeyId.NotFound"
}

FCLIによる自動更新

それでは、Alibaba Cloud FCLIコマンドラインプログラムを使用して、テストとアップデートを高速化する方法を見てみましょう。
Function Computeの例は、いくつかのファイルで構成されています。Alibaba Consoleに行ってコードの変更をアップロードするのではなく、FCLIコマンドラインプログラムを使ってコマンドラインからFunction Computeを更新するのが好きです。以下は、私が使用しているWindows Cmd Promptバッチスクリプトです。
このコマンドは、index.zipという新しいパッケージを作成し、ソースファイルを追加します。そして、fcli.exeを使って、パッケージをFunction Computeにアップロードします。非常に簡単です。
DevOpsのもう一つの例:

del index.zip
pkzipc -add index.zip index.py myemail.py myhtml.py

fcli function update --code-file index.zip -s service_name -f function_name

FCLIを使ったfunctionの作成

このコマンドは、1ステップでコードを作成してアップロードします。アリババコンソールでfunctionのタイムトリガーを手動で作成する必要があります。この例では、サービス名「ssl」とfunction名「ssl_check」を使用しています。

fcli function create --code-file index.zip -t python3 -h index.handler -s ssl -f ssl_check

FCLIによるリモート実行

このコマンドは、リモートでfunctionを「起動」します。テストに便利な方法です。

fcli function invoke -s service_name -f function_name

FCLIでタイムトリガーを作成

このコマンドは、FCLI を使用したFunction Computeのタイムトリガーを作成します。コマンドと yaml 設定ファイルの 2 つのコンポーネントがあります。

triggerConfig:
    payload: ""
    cronExpression: "0 0/60 * * * *"
    enable: true

fcli trigger create -t OncePerHour -s ssl -f ssl_check -c TimeTrigger.yaml --type timer

その他のアイデア

タイムトリガーを変更して、このfunctionを15分ごとなど、より頻繁に呼び出すようにすることができます。そして、ソースコードのパラメータ g_only_send_notices = True を変更して、問題が発生した場合にのみ電子メールを受信するようにします。これは、HTTPS サービスのいずれかに障害が発生した場合に報告するサービスチェック機能になります。
もう一つのアイデアは、世界中の異なる地域に複数のfunctionを作成して、地域の顧客が経験する可能性のある問題を検出することです。
応答しない ECS インスタンスを再起動するコードを追加することもできます。
チェックするホスト名をあまり多く指定しないでください。function Compute には 300 秒の最大時間制限があります。これにより、functionは約10個のホスト名に制限され、30秒の失敗タイムアウトが可能になります。失敗のタイムアウトを減らすことができれば、各functionでより多くのホスト名を処理することができます。また、多くのホスト名を処理するために Function Compute で複数のfunctionを作成することもできます。失敗をリトライしない場合は、1つのfunctionにつき100個程度のホスト名が上限となります。Alibaba Consoleには、手動でfunctionを呼び出すための「Invoke」ボタンがあります。コンソールウィンドウの下部近くには、functionが実行された時間の統計情報が表示されます。これは、functionごとのホスト数を調整するのに役立ちます。

本ブログは英語版からの翻訳です。オリジナルはこちらからご確認いただけます。一部機械翻訳を使用しております。翻訳の間違いがありましたら、ご指摘いただけると幸いです。

アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。
アリババクラウドの詳細は、こちらからご覧ください。
アリババクラウドジャパン公式ページ

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?