このチュートリアルでは、Alibaba Cloud Function ComputeとDirectMailを使用して、WebサイトのSSL証明書を自動的に監視する方法を学びます。
Alibaba Cloud Tech Share執筆者、ジョン・ハンリー。Tech Shareは、技術的な知識やベストプラクティスをクラウドコミュニティ内で共有することを奨励するAlibaba Cloudのインセンティブプログラムです。
Alibaba Cloudのサービスを利用していると、SSL証明書に依存しているサービスが何十、何百もあることに気づくかもしれません。これには、Elastic Compute Service(ECS)インスタンス、Webサイト、APIゲートウェイサービス、Function Compute機能、CDNエンドポイントなどが含まれます。この記事では、SSL証明書を監視し、SSL証明書のステータスをメールで送信する方法について説明します。自動化、監視、レポート作成にはFunction ComputeとDirectMailを使用します。
この記事は、Alibaba CloudでLet's Encrypt SSL証明書を使うシリーズの追加記事です。これらの証明書は90日で有効期限が切れるため、証明書の有効期限を追跡することは非常に重要です。しかし、SSL証明書を利用しているサービスを追跡するのは面倒なので、忘れてしまうことが多いです。別の記事では、Let's Encrypt SSL証明書を自動更新できるソフトウェアを開発を書いています。
この記事の目的は、その方法を紹介することです。コードは本番品質ではなく、むしろ教育品質です。本番用にデプロイする予定のソフトウェアはすべて、品質と要件への適合性を確認し、テストする必要があります。
この記事は、Alibaba Cloud Function ComputeとDirectMailの基本的な理解があることを前提としています。そうでない場合は、これらのサービスを理解するのに役立つように、この記事を書きました。このチュートリアルの最後の部分では、Alibaba Cloud FCLIコマンドラインプログラムを使用して、テストとアップデートを高速化する方法を紹介します。
SSL監視、Pythonコードのダウンロード
このリンクをクリックして、このチュートリアルで使用するPythonコードをダウンロードしてください:SSLチェック - Python 3 (Zip - 10 KB)
最終更新日:2018年6月24日
必要な要件:Python 3.6以降(Python 2には対応していません)
プラットフォーム:Windows 10およびファンクション・コンピュートでテスト済み
SSL証明書ステータスレポート
まずは、このプログラムが生成するものを確認してみましょう。以下の表は、このコードによって生成され、指定したメールアドレスに電子メールで送信されます。このレポートには5つの列の情報があります。各行には、1つのホスト名のステータスが記載されています。最後の行は黄色で表示されていることに注目してください。この行には、そのホストが到達不可能であるというエラーメッセージが含まれています。わざとこのエラーを発生させるために、"bad.neoprime.xyz "というホスト名を入れました。
NeoPrime SSL 証明書ステータスレポート
Sat, 23 Jun 2018 18:24:41 GMT
キーとなるカラムは「Status」と「Expires」です。ステータスが「OK」と表示されていれば問題ありません。そうでない場合は、"Expired "や "Time to Renew "などのメッセージが表示されます。
プログラム構成パラメータ
初期テスト中に、コマンドラインからプログラムを実行します。このソフトウェアは、資格情報ファイルからアリババの資格情報を抽出します。資格情報に DirectMail を呼び出す権限があることを確認してください。Function Computeに切り替える準備ができたら、g_program_mode = PROGRAM_MODE_CMDLINE
の行をg_program_mode = PROGRAM_MODE_ACS_FUNC
に変更します。
PROGRAM_MODE_CMDLINE = 0 # The program operates from the command line
PROGRAM_MODE_ACS_FUNC = 1 # The program operates as a Alibaba Cloud Function Compute function
g_program_mode = PROGRAM_MODE_CMDLINE
g_days_left = 14 # Warn if a certificate will expire in less than this number of days
g_no_send = False # if set, don't actually send an email. This is used for debugging
g_only_send_notices = False # If set, only send emails if a certificate will expire soon or on error
g_email_required = False # This is set during processing if a warning or error was detected
監視するホスト名を設定します。この例では、4つのホスト名を監視しています。
g_hostnames = [
"neoprime.xyz",
"api.neoprime.xyz",
"cdn.neoprime.xyz",
"www.neoprime.xyz",
]
レポートの件名を設定し、メールアドレスに送信します。
email_params['Subject'] = 'NeoPrime SSL Cerificate Status Report'
email_params['To'] = 'someone@example.com’
Alibaba Cloud DirectMailアカウントのパラメータを設定します。この例では、リージョンにシンガポールを使用しています。
# From the DirectMail Console
dm_account['Debug'] = 0
dm_account['Account'] = 'sender_address_from_directmail_console'
dm_account['Alias'] = 'my_alias_name_such_as_NeoPrime'
dm_account['host'] = "dm.ap-southeast-1.aliyuncs.com"
dm_account['url'] = "https://dm.ap-southeast-1.aliyuncs.com/"
Retrieving an SSL Certificate from an Internet Host
インターネットホストからSSL証明書を取得する
インターネットホストからSSL証明書を返すfunctionです。このSSL証明書には、ドメイン名や有効期限などの証明書に関する情報が含まれています。
def ssl_get_cert(hostname):
""" This function returns an SSL certificate from a host """
context = ssl.create_default_context()
conn = context.wrap_socket(
socket.socket(socket.AF_INET),
server_hostname=hostname)
# 3 second timeout because Function Compute has runtime limitations
conn.settimeout(3.0)
try:
conn.connect((hostname, 443))
except Exception as ex:
print("{}: Exception: {}".format(hostname, ex), file=sys.stderr)
return False, str(ex)
host_ssl_info = conn.getpeercert()
return host_ssl_info, ''
SSL証明書の確認
このfunctionは各ホスト名をループし、SSL 証明書の有効期限 (notAfter) をチェックします。このfunctionはまた、SSL証明書から発行者名やサブジェクト名(SAN)などの他の情報も抽出します。各ホスト名に対して、「add_row()」でHTMLテーブルに行を追加します。このfunctionは、構築したHTMLボディを返します。このHTMLボディは、送信されるメールメッセージの一部となります。
def process_hostnames(msg_body, hostnames):
""" Process the SSL certificate for each hostname """
# pylint: disable=global-statement
global g_email_required
ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
for host in hostnames:
f_expired = False
print('Processing host:', host)
ssl_info, err = get_ssl_info(host)
if ssl_info is False:
msg_body = add_row(msg_body, host, err, '', '', '', True)
g_email_required = True
continue
#print(ssl_info)
issuerName = get_ssl_issuer_name(ssl_info)
altNames = get_ssl_subject_alt_names(ssl_info)
l_expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
remaining = l_expires - datetime.datetime.utcnow()
if remaining < datetime.timedelta(days=0):
# cert has already expired - uhoh!
cert_status = "Expired"
f_expired = True
g_email_required = True
elif remaining < datetime.timedelta(days=g_days_left):
# expires sooner than the buffer
cert_status = "Time to Renew"
f_expired = True
g_email_required = True
else:
# everything is fine
cert_status = "OK"
f_expired = False
msg_body = add_row(msg_body, host, cert_status, str(l_expires), issuerName, altNames, f_expired)
return msg_body
例文
############################################################
# Version 0.90
# Date Created: 2018-06-11
# Last Update: 2018-06-23
# https://www.neoprime.io
# Copyright (c) 2018, NeoPrime, LLC
# Author: John Hanley
############################################################
""" Alibaba Cloud Function Compute Example """
import sys
import datetime
import socket
import json
import ssl
import time
import myemail
import myhtml
PROGRAM_MODE_CMDLINE = 0 # The program operates from the command line
PROGRAM_MODE_ACS_FUNC = 1 # The program operates as a Alibaba Cloud Function Compute function
g_program_mode = PROGRAM_MODE_ACS_FUNC
# g_program_mode = PROGRAM_MODE_CMDLINE
g_days_left = 14 # Warn if a certificate will expire in less than this number of days
g_no_send = False # if set, don't actually send an email. This is used for debugging
g_only_send_notices = False # If set, only send emails if a certificate will expire soon or on error
g_email_required = False # This is set during processing if a warning or error was detected
g_hostnames = [
"neoprime.xyz",
"api.neoprime.xyz",
"cdn.neoprime.xyz",
"www.neoprime.xyz",
]
email_params = {
'To': '',
'Subject': '',
'Body': '',
'BodyText': ''
}
email_params['Subject'] = 'NeoPrime SSL Certificate Status Report'
email_params['To'] = 'someone@example.com'
dm_account = {
'Debug': 0, # Debug flag
'Account': '', # DirectMail account
'Alias': '', # DirectMail alias
'host': '', # HTTP Host header
'url': '' # URL for POST
}
# From the DirectMail Console
dm_account['Debug'] = 0
dm_account['Account'] = ''
dm_account['Alias'] = ''
dm_account['host'] = "dm.ap-southeast-1.aliyuncs.com"
dm_account['url'] = "https://dm.ap-southeast-1.aliyuncs.com/"
def ssl_get_cert(hostname):
""" This function returns an SSL certificate from a host """
context = ssl.create_default_context()
conn = context.wrap_socket(
socket.socket(socket.AF_INET),
server_hostname=hostname)
# 3 second timeout because Function Compute has runtime limitations
conn.settimeout(3.0)
try:
conn.connect((hostname, 443))
except Exception as ex:
print("{}: Exception: {}".format(hostname, ex), file=sys.stderr)
return False, str(ex)
host_ssl_info = conn.getpeercert()
return host_ssl_info, ''
def add_row(body, domain, status, expires, issuerName, names, flag_hl):
""" Add a row to the HTML table """
#build the url
url = '<a href="https://' + domain + '">' + domain + '</a>'
# begin a new table row
if flag_hl is False:
body += '<tr>\n'
else:
body += '<tr bgcolor="#FFFF00">\n' # yellow
body += '<td>' + url + '</td>\n'
body += '<td>' + status + '</td>\n'
body += '<td>' + expires + '</td>\n'
body += '<td>' + issuerName + '</td>\n'
body += '<td>' + names + '</td>\n'
return body + '</tr>\n'
# Email specific
def send(account, credentials, params):
""" email send function """
# pylint: disable=global-statement
global g_only_send_notices
global g_email_required
# If set, only send emails if a certificate will expire soon or on error
if g_only_send_notices is True:
if g_email_required is False:
print('')
print('All hosts have valid certificates')
print('Sending an email is not required')
return
myemail.sendEmail(credentials, account, params, g_no_send)
def get_ssl_info(host):
""" This function retrieves the SSL certificate for host """
# If we receive an error, retry up to three times waiting 10 seconds each time.
retry = 0
err = ''
while retry < 3:
ssl_info, err = ssl_get_cert(host)
if ssl_info is not False:
return ssl_info, ''
retry += 1
print(' retrying ...')
time.sleep(10)
return False, err
def get_ssl_issuer_name(ssl_info):
""" Return the IssuerName from the SSL certificate """
issuerName = ''
issuer = ssl_info['issuer']
# pylint: disable=line-too-long
# issuer looks like this:
# This is a set of a set of a set of key / value pairs.
# ((('countryName', 'US'),), (('organizationName', "Let's Encrypt"),), (('commonName', "Let's Encrypt Authority X3"),))
for item in issuer:
# item will look like this as it goes thru the issuer set
# Note that this is a set of a set
#
# (('countryName', 'US'),)
# (('organizationName', "Let's Encrypt"),)
# (('commonName', "Let's Encrypt Authority X3"),)
s = item[0]
# s will look like this as it goes thru the isser set
# Note that this is now a set
#
# ('countryName', 'US')
# ('organizationName', "Let's Encrypt")
# ('commonName', "Let's Encrypt Authority X3")
# break the set into "key" and "value" pairs
k = s[0]
v = s[1]
if k == 'organizationName':
if v != '':
issuerName = v
continue
if k == 'commonName':
if v != '':
issuerName = v
return issuerName
def get_ssl_subject_alt_names(ssl_info):
""" Return the Subject Alt Names """
altNames = ''
subjectAltNames = ssl_info['subjectAltName']
index = 0
for item in subjectAltNames:
altNames += item[1]
index += 1
if index < len(subjectAltNames):
altNames += ', '
return altNames
def process_hostnames(msg_body, hostnames):
""" Process the SSL certificate for each hostname """
# pylint: disable=global-statement
global g_email_required
ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
for host in hostnames:
f_expired = False
print('Processing host:', host)
ssl_info, err = get_ssl_info(host)
if ssl_info is False:
msg_body = add_row(msg_body, host, err, '', '', '', True)
g_email_required = True
continue
#print(ssl_info)
issuerName = get_ssl_issuer_name(ssl_info)
altNames = get_ssl_subject_alt_names(ssl_info)
l_expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
remaining = l_expires - datetime.datetime.utcnow()
if remaining < datetime.timedelta(days=0):
# cert has already expired - uhoh!
cert_status = "Expired"
f_expired = True
g_email_required = True
elif remaining < datetime.timedelta(days=g_days_left):
# expires sooner than the buffer
cert_status = "Time to Renew"
f_expired = True
g_email_required = True
else:
# everything is fine
cert_status = "OK"
f_expired = False
msg_body = add_row(msg_body, host, cert_status, str(l_expires), issuerName, altNames, f_expired)
return msg_body
def main_cmdline():
""" This is the main function """
# My library for processing Alibaba Cloud Services (ACS) credentials
# This library is only used when running from the desktop and not from the cloud
import mycred_acs
# Load the Alibaba Cloud Credentials (AccessKey)
cred = mycred_acs.LoadCredentials()
if cred is False:
print('Error: Cannot load credentials', file=sys.stderr)
sys.exit(1)
now = datetime.datetime.utcnow()
date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")
msg_body = ''
msg_body = myhtml.build_body_top()
msg_body += '<h4>NeoPrime SSL Cerificate Status Report</h4>'
msg_body += date + '<br />'
msg_body += '<br />'
msg_body = myhtml.build_table_top(msg_body)
#
# This is where the SSL processing happens
#
msg_body = process_hostnames(msg_body, g_hostnames)
msg_body = myhtml.build_table_bottom(msg_body)
msg_body = myhtml.build_body_bottom(msg_body)
email_params['Body'] = msg_body
email_params['BodyText'] = ''
#print(msg_body)
send(dm_account, cred, email_params)
def main_acs_func(event, context):
""" This is the main function """
cred = {
'accessKeyId': '',
'accessKeySecret': '',
'securityToken': '',
'Region': ''
}
cred['accessKeyId'] = context.credentials.accessKeyId
cred['accessKeySecret'] = context.credentials.accessKeySecret
cred['securityToken'] = context.credentials.securityToken
now = datetime.datetime.utcnow()
date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")
msg_body = ''
msg_body = myhtml.build_body_top()
msg_body += '<h4>NeoPrime SSL Cerificate Status Report</h4>'
msg_body += date + '<br />'
msg_body += '<br />'
msg_body = myhtml.build_table_top(msg_body)
#
# This is where the SSL processing happens
#
msg_body = process_hostnames(msg_body, g_hostnames)
msg_body = myhtml.build_table_bottom(msg_body)
msg_body = myhtml.build_body_bottom(msg_body)
email_params['Body'] = msg_body
email_params['BodyText'] = ''
#print(msg_body)
send(dm_account, cred, email_params)
return msg_body
def handler(event, context):
""" This is the Function Compute entry point """
body = ""
body = main_acs_func(event, context)
res = {
'isBase64Encoded': False,
'statusCode': 200,
'headers': {
'content-type' : 'text/html'
},
'body': body
}
return json.dumps(res)
# Main Program
if g_program_mode == PROGRAM_MODE_CMDLINE:
main_cmdline()
機能の作成
次のスクリーンショットは、functionに設定するパラメータを示しています。2番目のスクリーンショットは、このfunctionは定期的に呼び出されるので、「タイムトリガー」に設定するパラメータを示しています。ここでは、タイムトリガーを1日1回、午前8時PST(16時GMT)に設定しています。
Function Computeの権限
Function Computeは、DirectMailを使用してメールを送信するための権限を必要とします。これには2つの方法があります。ソースコードに資格情報をハードコーディングする方法(非常に悪い考えです)と、RAM(Resource Access Manager)を使って、Function Computeサービスに割り当てる "ロール "を作成する方法(非常に良い考えです)です。
Function Compute用のRAMロールを作成する手順
1、Alibabaコンソールにログイン
2、リソースアクセスマネージャに移動
3、ロールをクリック
4、ロール作成ボタンをクリック
5、サービスの役割を選択
6、FC Function Computeを選択
7、ロールの名前と説明を入力
8、作成をクリック
このロールは作成されていますが、ロールに権限が付与されていません。
ロールに権限(オーソライズ)を付与する手順です。
1、承認ボタンをクリック
2、「認証ポリシーの編集」ボタンをクリック
3、検索キーワードボックスに「ダイレクト」と入力
4、AliyunDirectMailFullAccessを選択
5、右矢印ボタンをクリックして、ポリシーを右側にコピー
6、OKをクリックします。
Function ComputeとRAMロールの重要な概念は、ロールがFunction Computeサービスに割り当てられているということです。サービスの下にあるすべてのFunctionは、このロールを継承します。これは、複数のFunctionを持つFunction Computeサービスがある場合、RAMロールは各Functionに必要なパーミッションの合計を必要とすることを意味します。より厳しいセキュリティが必要な場合は、ロールのパーミッションに基づいて個別のサービスを作成します。
RAMポリシーは、付与されたパーミッションを記述したJSONドキュメントを作成します。この場合、ロールは、すべてのリソース(Resource: )に対して、dmで始まるすべてのアクション(Action: dm: )を許可しています。
{
"Version": "1",
"Statement": [
{
"Action": "dm:*",
"Resource": "*",
"Effect": "Allow"
}
]
}
RAM ロールをサービスに割り当てる際に見落とされることが多いのは、サービスがそのロールを想定するためのパーミッションを必要とすることです。RAM Roleには、ロールを引き受けるためのSTS(Security Token Service)パーミッションとロールパーミッションの2つのコンポーネントがあります。
このJSONは、AssumeRoleアクションを介して、Function Computeサービス自身がロールを引き受けるために必要なパーミッションを記述しています。サービス名 "fc.aliyncs.com "とアクション "sts:AssumeRole "に注目してください。
{
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": [
"fc.aliyuncs.com"
]
}
}
],
"Version": "1"
}
Function Computeのデバッグ
Alibaba Function Compute Consoleでfunctionを手動で呼び出します。以下のタイプのエラーが表示された場合は、サービスにDirectMail権限を持つRAM Roleを割り当てるのを忘れている可能性があります。
Error Code: 404
Error: {
"Recommend":"https://error-center.aliyun.com/status/search?Keyword=InvalidAccessKeyId.NotFound&source=PopGw",
"Message":"Specified access key is not found.",
"RequestId":"31BEFC34-DD4F-4916-927A-773A7C4F26C5",
"HostId":"dm.ap-southeast-1.aliyuncs.com",
"Code":"InvalidAccessKeyId.NotFound"
}
FCLIによる自動更新
それでは、Alibaba Cloud FCLIコマンドラインプログラムを使用して、テストとアップデートを高速化する方法を見てみましょう。
Function Computeの例は、いくつかのファイルで構成されています。Alibaba Consoleに行ってコードの変更をアップロードするのではなく、FCLIコマンドラインプログラムを使ってコマンドラインからFunction Computeを更新するのが好きです。以下は、私が使用しているWindows Cmd Promptバッチスクリプトです。
このコマンドは、index.zipという新しいパッケージを作成し、ソースファイルを追加します。そして、fcli.exeを使って、パッケージをFunction Computeにアップロードします。非常に簡単です。
DevOpsのもう一つの例:
del index.zip
pkzipc -add index.zip index.py myemail.py myhtml.py
fcli function update --code-file index.zip -s service_name -f function_name
FCLIを使ったfunctionの作成
このコマンドは、1ステップでコードを作成してアップロードします。アリババコンソールでfunctionのタイムトリガーを手動で作成する必要があります。この例では、サービス名「ssl」とfunction名「ssl_check」を使用しています。
fcli function create --code-file index.zip -t python3 -h index.handler -s ssl -f ssl_check
FCLIによるリモート実行
このコマンドは、リモートでfunctionを「起動」します。テストに便利な方法です。
fcli function invoke -s service_name -f function_name
FCLIでタイムトリガーを作成
このコマンドは、FCLI を使用したFunction Computeのタイムトリガーを作成します。コマンドと yaml 設定ファイルの 2 つのコンポーネントがあります。
triggerConfig:
payload: ""
cronExpression: "0 0/60 * * * *"
enable: true
fcli trigger create -t OncePerHour -s ssl -f ssl_check -c TimeTrigger.yaml --type timer
その他のアイデア
タイムトリガーを変更して、このfunctionを15分ごとなど、より頻繁に呼び出すようにすることができます。そして、ソースコードのパラメータ g_only_send_notices = True
を変更して、問題が発生した場合にのみ電子メールを受信するようにします。これは、HTTPS サービスのいずれかに障害が発生した場合に報告するサービスチェック機能になります。
もう一つのアイデアは、世界中の異なる地域に複数のfunctionを作成して、地域の顧客が経験する可能性のある問題を検出することです。
応答しない ECS インスタンスを再起動するコードを追加することもできます。
チェックするホスト名をあまり多く指定しないでください。function Compute には 300 秒の最大時間制限があります。これにより、functionは約10個のホスト名に制限され、30秒の失敗タイムアウトが可能になります。失敗のタイムアウトを減らすことができれば、各functionでより多くのホスト名を処理することができます。また、多くのホスト名を処理するために Function Compute で複数のfunctionを作成することもできます。失敗をリトライしない場合は、1つのfunctionにつき100個程度のホスト名が上限となります。Alibaba Consoleには、手動でfunctionを呼び出すための「Invoke」ボタンがあります。コンソールウィンドウの下部近くには、functionが実行された時間の統計情報が表示されます。これは、functionごとのホスト数を調整するのに役立ちます。
本ブログは英語版からの翻訳です。オリジナルはこちらからご確認いただけます。一部機械翻訳を使用しております。翻訳の間違いがありましたら、ご指摘いただけると幸いです。
アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。
アリババクラウドの詳細は、こちらからご覧ください。
アリババクラウドジャパン公式ページ