今回の多部構成の記事では、Pythonを使ったLet's Encrypt ACME version 2 APIをSSL証明書に利用する方法を学びます。
本ブログは英語版からの翻訳です。オリジナルはこちらからご確認いただけます。一部機械翻訳を使用しております。翻訳の間違いがありましたら、ご指摘いただけると幸いです。
#APIエンドポイントを暗号化しよう
Let's Encrypt ACMEは、異なるエンドポイントを使用して2つのモードをサポートしています。本物の証明書を発行し、レートが制限される本番モードと、テスト用の証明書を発行するテスト用のステージングモードです。本番用のエンドポイントでは、1日にできるリクエストの数を制限します。Let's Encrypt 用のソフトウェアを開発している間は、ステージングエンドポイントを使用していることを確認してください。
Staging Endpoint:https://acme-staging-v02.api.letsencrypt.org/directory
Production Endpoint:https://acme-v02.api.letsencrypt.org/directory
#ACMEディレクトリ
最初のAPIコールでは、ACMEディレクトリを取得する必要があります。ディレクトリは、様々なコマンドのために呼び出すURLのリストです。getディレクトリからのレスポンスは、以下のようなJSON構造になっています。
{
"LPTIN-Jj4u0": "https://community.letsencrypt.org/t/adding-random-entries-to-the-directory/33417",
"keyChange": "https://acme-staging-v02.api.letsencrypt.org/acme/key-change",
"meta": {
"caaIdentities": [
"letsencrypt.org"
],
"termsOfService": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf",
"website": "https://letsencrypt.org/docs/staging-environment/"
},
"newAccount": "https://acme-staging-v02.api.letsencrypt.org/acme/new-acct",
"newNonce": "https://acme-staging-v02.api.letsencrypt.org/acme/new-nonce",
"newOrder": "https://acme-staging-v02.api.letsencrypt.org/acme/new-order",
"revokeCert": "https://acme-staging-v02.api.letsencrypt.org/acme/revoke-cert"
}
最初の行は無視してください。ACMEはランダムなキーと値を生成し、JSONの期待値をコードにハードコーディングしないようにしています。
返されるデータ部分をそれぞれ見てみましょう。
keyChange
このURLは、アカウントに関連付けられた公開鍵を変更するために使用されます。これは、鍵の危殆化から回復するために使用されます。
meta.caaIdentities
CAAレコードの検証のために、ACMEサーバが自分自身を参照していると認識するホスト名の配列。サンプルコードではこれらのレコードは使用していません。
meta.termsOfService
現在の利用規約を示すURLです。https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf の利用規約に関する参照文書を時間をかけて読んでください。サンプルコードでは、利用規約を受け入れるフィールドをデフォルトで設定しています。
meta.website
ACMEサーバーに関する詳細な情報を提供するウェブサイトを検索するURL。このレコードはサンプルコードでは使用していません。このレコードについての詳細はこちらをご覧ください https://letsencrypt.org/docs/staging-environment/
newAccount
これは重要なAPIであり、ACME APIを呼び出す際には2回目の呼び出しを行う必要があります。nonce は、リプレイ攻撃から保護する一意のランダムな値です。(ディレクトリを除く) 各APIコールは、一意のnonce値を必要とします。
newNonce
このエンドポイントは、新しいアカウントを作成するために使用されます。
newOrder
このエンドポイントは、このSSL証明書の発行を要求するために使用されます。
revokeCert
このエンドポイントは、同じアカウントで発行された既存の証明書を失効させるために使用されます。
#ACMEディレクトリを取得するためのコード例
これは、examples パッケージの中で最もシンプルな ACME API の例です。この例では、ACMEメインエンドポイントを呼び出すだけです。返されるデータは、上記で説明したようにAPIエンドポイントを定義するJSON構造体です。このプログラムからの出力を確認し、以下の例のほとんどで使用される様々な URL に精通してください。
Source: get_directory.py
""" Let's Encrypt ACME Version 2 Examples - Get Directory"""
# This example will call the ACME API directory and display the returned data
# Reference: https://ietf-wg-acme.github.io/acme/draft-ietf-acme-acme.html#rfc.section.7.1.1
import sys
import requests
import helper
path = 'https://acme-staging-v02.api.letsencrypt.org/directory'
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en'
}
try:
print('Calling endpoint:', path)
directory = requests.get(path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
sys.exit(1)
if directory.status_code < 200 or directory.status_code >= 300:
print('Error calling ACME endpoint:', directory.reason)
sys.exit(1)
# The output should be json. If not something is wrong
try:
acme_config = directory.json()
except Exception as ex:
print("Error: Cannot load returned data:", ex)
sys.exit(1)
print('')
print('Returned Data:')
print('****************************************')
print(directory.text)
acme_config = directory.json()
print('')
print('Formatted JSON:')
print('****************************************')
helper.print_dict(acme_config, 0)
#新規アカウントを作成するためのコード例
次のステップは、ACMEサーバーに新しいアカウントを作成することです。これには、パート2で作成したaccount.keyを使用します。ACMEサーバーは、アカウントデータベースに会社名などの情報を追跡しません。
この例では、EmailAddress パラメータを自分のメールアドレスに変更します。この例では、複数のメールアドレスを含める方法を示しています。ACME サーバでは、電子メール・アドレスの入力は任意であるため、必須ではありません。ACMEサーバはメールアドレスを確認しません。
このコードについて、いくつかの重要なポイントを確認してみましょう。
1. ACMEディレクトリを取得する
acme_config = get_directory()
2. 「newAccount」のURLを取得する
url = acme_config["newAccount"]
3. 最初の ACME API 呼び出しに対して nonce を要求
最初の ACME API 呼び出しの後、各 ACME API 呼び出しの後、ヘッダ "Replay-Nonce" に新しい nonce が返されます。
nonce = requests.head(acme_config['newNonce']).headers['Replay-Nonce']
4. HTMLヘッダを組み立てる
重要な項目は Content-Type: application/jose+json
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en',
'Content-Type': 'application/jose+json'
}
5. HTMLボディをACME APIパラメータで組み立てる
HTTPボディの作成については、その4で詳しく解説します。
payload = {}
payload["termsOfServiceAgreed"] = True
payload["contact"] = EmailAddresses
body_top = {
"alg": "RS256",
"jwk": myhelper.get_jwk(AccountKeyFile),
"url": url,
"nonce": nonce
}
6. HTML本体「jose」のデータ構造を組み立てる
すべてがbase64でエンコードされていることに注目してください。
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8"))
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8"))
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
7. 最後に、ACME API を呼び出します。
これは、JSONボディを持つHTTP POSTで行われます。
resp = requests.post(url, json=jose, headers=headers)
8. ACME APIの後、HTTPレスポンスヘッダには2つの項目が返されます。
LocationはアカウントのURLです。
Replay-Nonceは、次のACME APIコールの "nonce "値です。
resp.headers['Location']
resp.headers['Replay-Nonce']
ほとんどのACME APIコールでは、HTTPヘッダが含まれていることが必要です。
Content-Type: application/jose+json
ソース: new_account.py
""" Let's Encrypt ACME Version 2 Examples - New Account"""
# This example will call the ACME API directory and create a new account
# Reference: https://ietf-wg-acme.github.io/acme/draft-ietf-acme-acme.html#rfc.section.7.3.2
import os
import sys
import json
import requests
import myhelper
# Staging URL
path = 'https://acme-staging-v02.api.letsencrypt.org/directory'
# Production URL
# path = 'https://acme-v02.api.letsencrypt.org/directory'
AccountKeyFile = 'account.key'
EmailAddresses = ['mailto:someone@eexample.com', 'mailto:someone2@eexample.com']
def check_account_key_file():
""" Verify that the Account Key File exists and prompt to create if it does not exist """
if os.path.exists(AccountKeyFile) is not False:
return True
print('Error: File does not exist: {0}'.format(AccountKeyFile))
if myhelper.Confirm('Create new account private key (y/n): ') is False:
print('Cancelled')
return False
myhelper.create_rsa_private_key(AccountKeyFile)
if os.path.exists(AccountKeyFile) is False:
print('Error: File does not exist: {0}'.format(AccountKeyFile))
return False
return True
def get_directory():
""" Get the ACME Directory """
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en',
}
try:
print('Calling endpoint:', path)
directory = requests.get(path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
return False
if directory.status_code < 200 or directory.status_code >= 300:
print('Error calling ACME endpoint:', directory.reason)
print(directory.text)
return False
# The following statements are to understand the output
acme_config = directory.json()
return acme_config
def main():
""" Main Program Function """
headers = {
'User-Agent': 'neoprime.io-acme-client/1.0',
'Accept-Language': 'en',
'Content-Type': 'application/jose+json'
}
if check_account_key_file() is False:
sys.exit(1)
acme_config = get_directory()
if acme_config is False:
sys.exit(1)
url = acme_config["newAccount"]
# Get the URL for the terms of service
terms_service = acme_config.get("meta", {}).get("termsOfService", "")
print('Terms of Service:', terms_service)
nonce = requests.head(acme_config['newNonce']).headers['Replay-Nonce']
print('Nonce:', nonce)
print("")
# Create the account request
payload = {}
if terms_service != "":
payload["termsOfServiceAgreed"] = True
payload["contact"] = EmailAddresses
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8"))
body_top = {
"alg": "RS256",
"jwk": myhelper.get_jwk(AccountKeyFile),
"url": url,
"nonce": nonce
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8"))
data = "{0}.{1}".format(body_top_b64, payload_b64).encode("utf8")
signature = myhelper.sign(data, AccountKeyFile)
#
# Create the HTML request body
#
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
try:
print('Calling endpoint:', url)
resp = requests.post(url, json=jose, headers=headers)
except requests.exceptions.RequestException as error:
resp = error.response
print(resp)
except Exception as ex:
print(ex)
except BaseException as ex:
print(ex)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print('Status Code:', resp.status_code)
myhelper.process_error_message(resp.text)
sys.exit(1)
print('')
if 'Location' in resp.headers:
print('Account URL:', resp.headers['Location'])
else:
print('Error: Response headers did not contain the header "Location"')
main()
sys.exit(0)
#アカウント情報を取得するためのコード例
さて、account.keyを使ってアカウントを作成したので、ACMEサーバと通信して、サーバにどのような情報が保存されているかを確認してみましょう。この例では、設定パラメータをソースコードにハードコーディングする代わりに、設定ファイル「acme.ini」を導入しています。
acme.iniを修正して、メールアドレスなどの特定の情報を含めるようにします。
ソース: acme.ini
[acme-neoprime]
UserAgent = neoprime.io-acme-client/1.0
# [Required] ACME account key
AccountKeyFile = account.key
# Certifcate Signing Request (CSR)
CSRFile = example.com.csr
ChainFile = example.com.chain.pem
# ACME URL
# Staging URL
# https://acme-staging-v02.api.letsencrypt.org/directory
# Production URL
# https://acme-v02.api.letsencrypt.org/directory
ACMEDirectory = https://acme-staging-v02.api.letsencrypt.org/directory
# Email Addresses so that LetsEncrypt can notify about SSL renewals
Contacts = mailto:example.com;mailto:someone2@example.com
# Preferred Language
Language = en
ソース: get_acount_info.py
""" Let's Encrypt ACME Version 2 Examples - Get Account Information """
############################################################
# This example will call the ACME API directory and get the account information
# Reference: https://ietf-wg-acme.github.io/acme/draft-ietf-acme-acme.html#rfc.section.7.3.3
#
# This program uses the AccountKeyFile set in acme.ini to return information about the ACME account.
############################################################
import sys
import json
import requests
import helper
import myhelper
############################################################
# Start - Global Variables
g_debug = 0
acme_path = ''
AccountKeyFile = ''
EmailAddresses = []
headers = {}
# End - Global Variables
############################################################
############################################################
# Load the configuration from acme.ini
############################################################
def load_acme_parameters(debug=0):
""" Load the configuration from acme.ini """
global acme_path
global AccountKeyFile
global EmailAddresses
global headers
config = myhelper.load_acme_config(filename='acme.ini')
if debug is not 0:
print(config.get('acme-neoprime', 'accountkeyfile'))
print(config.get('acme-neoprime', 'csrfile'))
print(config.get('acme-neoprime', 'chainfile'))
print(config.get('acme-neoprime', 'acmedirectory'))
print(config.get('acme-neoprime', 'contacts'))
print(config.get('acme-neoprime', 'language'))
acme_path = config.get('acme-neoprime', 'acmedirectory')
AccountKeyFile = config.get('acme-neoprime', 'accountkeyfile')
EmailAddresses = config.get('acme-neoprime', 'contacts').split(';')
headers['User-Agent'] = config.get('acme-neoprime', 'UserAgent')
headers['Accept-Language'] = config.get('acme-neoprime', 'language')
headers['Content-Type'] = 'application/jose+json'
return config
############################################################
#
############################################################
def get_account_url(url, nonce):
""" Get the Account URL based upon the account key """
# Create the account request
payload = {}
payload["termsOfServiceAgreed"] = True
payload["contact"] = EmailAddresses
payload["onlyReturnExisting"] = True
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8"))
body_top = {
"alg": "RS256",
"jwk": myhelper.get_jwk(AccountKeyFile),
"url": url,
"nonce": nonce
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8"))
#
# Create the message digest
#
data = "{0}.{1}".format(body_top_b64, payload_b64).encode("utf8")
signature = myhelper.sign(data, AccountKeyFile)
#
# Create the HTML request body
#
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
#
# Make the ACME request
#
try:
print('Calling endpoint:', url)
resp = requests.post(url, json=jose, headers=headers)
except requests.exceptions.RequestException as error:
resp = error.response
print(resp)
except Exception as error:
print(error)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print('Status Code:', resp.status_code)
myhelper.process_error_message(resp.text)
sys.exit(1)
if 'Location' in resp.headers:
print('Account URL:', resp.headers['Location'])
else:
print('Error: Response headers did not contain the header "Location"')
# Get the nonce for the next command request
nonce = resp.headers['Replay-Nonce']
account_url = resp.headers['Location']
return nonce, account_url
############################################################
#
############################################################
def get_account_info(nonce, url, location):
""" Get the Account Information """
# Create the account request
payload = {}
payload_b64 = myhelper.b64(json.dumps(payload).encode("utf8"))
body_top = {
"alg": "RS256",
"kid": location,
"nonce": nonce,
"url": location
}
body_top_b64 = myhelper.b64(json.dumps(body_top).encode("utf8"))
#
# Create the message digest
#
data = "{0}.{1}".format(body_top_b64, payload_b64).encode("utf8")
signature = myhelper.sign(data, AccountKeyFile)
#
# Create the HTML request body
#
jose = {
"protected": body_top_b64,
"payload": payload_b64,
"signature": myhelper.b64(signature)
}
#
# Make the ACME request
#
try:
print('Calling endpoint:', url)
resp = requests.post(url, json=jose, headers=headers)
except requests.exceptions.RequestException as error:
resp = error.response
print(resp)
except Exception as error:
print(error)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print('Status Code:', resp.status_code)
myhelper.process_error_message(resp.text)
sys.exit(1)
nonce = resp.headers['Replay-Nonce']
# resp.text is the returned JSON data describing the account
return nonce, resp.text
############################################################
#
############################################################
def load_acme_urls(path):
""" Load the ACME Directory of URLS """
try:
print('Calling endpoint:', path)
resp = requests.get(acme_path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
sys.exit(1)
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print(resp.text)
sys.exit(1)
return resp.json()
############################################################
#
############################################################
def acme_get_nonce(urls):
""" Get the ACME Nonce that is used for the first request """
global headers
path = urls['newNonce']
try:
print('Calling endpoint:', path)
resp = requests.head(path, headers=headers)
except requests.exceptions.RequestException as error:
print(error)
return False
if resp.status_code < 200 or resp.status_code >= 300:
print('Error calling ACME endpoint:', resp.reason)
print(resp.text)
return False
return resp.headers['Replay-Nonce']
############################################################
# Main Program Function
############################################################
def main(debug=0):
""" Main Program Function """
acme_urls = load_acme_urls(acme_path)
url = acme_urls["newAccount"]
nonce = acme_get_nonce(acme_urls)
if nonce is False:
sys.exit(1)
nonce, account_url = get_account_url(url, nonce)
# resp is the returned JSON data describing the account
nonce, resp = get_account_info(nonce, account_url, account_url)
info = json.loads(resp)
if debug is not 0:
print('')
print('Returned Data:')
print('##################################################')
#print(info)
helper.print_dict(info)
print('##################################################')
print('')
print('ID: ', info['id'])
print('Contact: ', info['contact'])
print('Initial IP:', info['initialIp'])
print('Created At:', info['createdAt'])
print('Status: ', info['status'])
def is_json(data):
try:
json.loads(data)
except ValueError as e:
return False
return True
acme_config = load_acme_parameters(g_debug)
main(g_debug)
#概要
パート4では、ACME APIをさらに深く掘り下げて、JSONボディの各部分を構築し、ペイロードに署名し、結果を処理する方法を学びます。
アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。
アリババクラウドの詳細は、こちらからご覧ください。
アリババクラウドジャパン公式ページ