Security Hub のマルチアカウント設定
AWS Security Hub は AWS環境全体のセキュリティとコンプライアンスの状況を確認可能なサービスです。
Security Hubでは、特定アカウントをマスターアカウントとし、他のアカウントを招待して
マルチアカウントの親子関係を組むことができます。
以下のようにマルチアカウント設定を行うと、メンバーアカウントの結果をマスターアカウント側で
確認できるようになります。
この時、マスターとなるアカウントは AWS Organizations のマスターアカウントである必要はありません。
また組織外のアカウントを招待して連結することも可能です。
通常、セキュリティ用のアカウントを用意し、そちらに統合して管理する方がよいかと思います。
連結を行う際、マスターアカウントから招待、メンバーアカウントからはその招待を承認する
という作業が必要なのですが、管理対象のアカウントが増えてくると都度作業するのはなかなか面倒です。
GitHub 上で AWS Security Hub Multiaccount Scripts が公開されており、
IAM ロールおよび Python の実行環境を用意できれば、上記ステップを自動化できます。
アカウント追加時に自動セットアップを行う
Multiaccount Scripts は既存アカウントへの設定が前提であるため、
新規アカウント作成時に自動でセットアップするための構成例を紹介します。
前提
- AWS Organizations でアカウントが管理されている
- AWS Organizations の Master Account から New Account を作成する
- Security Hubのマスターアカウントである Security Account から New Accountを招待する
流れ
マスターアカウントで Lambda 関数を起動し、以下の内容を実行します。
- AWS Organizations で新規アカウントを作成 1
- Assume Role で Security Account から New Account へ招待を送る 2, 3
- 内部的には CreateMembers API と InviteMembers API の実行が必要
- Assume Role で New Account の Security Hub を有効化し4、招待を承認する5, 6
Lambda 関数の例
ランタイムは python3.8 を想定しています。
アカウント作成部分のコードは AWS Landing Zone の Account Vending Machine(AVM)を参考にしています。
AVM は AWS Service Catalog の製品として利用可能で、アカウントの作成と事前定義された
ベースライン設定をセットアップすることができますコードは 以下の GitHub リポジトリで公開されています。
AWS Account Vending Machine
https://github.com/aws-samples/aws-account-vending-machine
以降は簡略化のため、アカウント作成と Security Hub 有効化に絞った形で記載しますが、
上記をカスタマイズして、Security Hub の有効化以外にもアカウント作成時に必要な
サービスの設定を組み込んでしまえば、共通的なセットアップ作業を自動化できます。
clinet の作成
指定されたサービスとリージョンで clinet を作成します。
Assume Role によるクレデンシャルが指定されている場合は、対象アカウントの一時認証情報で作成します。
def get_client(service, region, credentials=None):
"""Returns the client for the specified region"""
if credentials is None:
client = boto3.client(service, region_name=region)
else:
client = boto3.client(
service,
region_name=region,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return client
新規アカウントの作成
アカウント名とメールアドレスから AWS Organizations で新規アカウントを作成し、アカウントIDを返します。
def create_account(account_name, email):
"""Create account by AWS Organizations"""
account_id = 'None'
org = get_client('organizations', 'us-east-1')
try:
logger.info("Trying to create the account with %s", email)
create_account_response = org.create_account(
Email=email,
AccountName=account_name
)
sleep(10)
account_status = org.describe_create_account_status(
CreateAccountRequestId=create_account_response['CreateAccountStatus']['Id']
)
except ClientError as err:
logger.error("Create Account Request failed: %s", err.response['Error']['Message'])
else:
logger.info("Account Creation status: %s", account_status['CreateAccountStatus']['State'])
if account_status['CreateAccountStatus']['State'] == 'FAILED':
logger.error(
"Account Creation Failed. Reason : %s",
account_status['CreateAccountStatus']['FailureReason']
)
sys.exit(1)
account_status = org.describe_create_account_status(
CreateAccountRequestId=create_account_response['CreateAccountStatus']['Id']
)
account_id = account_status['CreateAccountStatus']['AccountId']
while account_id is None:
logger.info("Waiting create new account. Retrying...")
sleep(5)
account_status = org.describe_create_account_status(
CreateAccountRequestId=create_account_response['CreateAccountStatus']['Id']
)
account_id = account_status['CreateAccountStatus']['AccountId']
return account_id
Assume Role
指定されたアカウントID に Assume Role して一時クレデンシャルを返します。
def assume_role(account_id):
"""Assume role to account"""
sts = get_client('sts', 'us-east-1')
role_arn = f'arn:aws:iam::{account_id}:role/OrganizationAccountAccessRole'
try:
assumed_role_object = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="NewAccountSetUp"
)
except ClientError as err:
logger.error(
"Assume Role Request failed: %s, %s", err.response['Error']['Message'], account_id
)
sys.exit(1)
return assumed_role_object['Credentials']
招待の作成
Security Hub のマスターアカウトであるセキュリティアカウントから新規アカウントのIDと
メールアドレスを使用してを招待を送ります。
実際には CreateMembers API によりメンバーの関連付けを行ったあとに
InviteMembers API で招待を送っています。
この時点でメンバーアカウント側で Security Hub が有効化されていなくても招待することは可能です。
def create_invitation(email, account_id, available_regions, audit_credentials):
"""Invite Account to Security Hub"""
logger.info("Create Security Hub Invitation Start.")
for region in available_regions:
securityhub = get_client('securityhub', region, audit_credentials)
try:
securityhub.create_members(
AccountDetails=[{
'AccountId': account_id,
'Email': email
}]
)
response = securityhub.invite_members(
AccountIds=[account_id]
)
except ClientError as err:
logger.error(
"Create Invitation Request failed in %s: %s",
region, err.response['Error']['Message']
)
sys.exit(1)
else:
if response['UnprocessedAccounts'] == []:
logger.info("Invited: %s", region)
else:
logger.info("UnprocessedRegion: %s", region)
logger.info("Create Security Hub Invitation Successed.")
招待の承認
新規アカウントで Security Hub を有効化し、招待を承認します。
実際には ListInvitations API により InvitationId を確認し、
AcceptInvitation API で招待を承認します。
def enable_securityhub(audit_account_id, available_regions, credentials):
"""Accept invitation from audit account"""
logger.info("Enable Security Hub All Regions Start.")
for region in available_regions:
securityhub = get_client('securityhub', region, credentials)
# Enalbe Security Hub
try:
securityhub.enable_security_hub()
except ClientError as err:
logger.error(
"Enalbe Security Hub Request failed in %s: %s",
region, err.response['Error']['Message']
)
sys.exit(1)
# Accept Invitation
try:
response = securityhub.list_invitations()
except ClientError as err:
logger.error(
"List Invitaion Request failed in %s: %s", region, err.response['Error']['Message']
)
sys.exit(1)
if response['Invitations'] != []:
try:
securityhub.accept_invitation(
MasterId=audit_account_id,
InvitationId=response['Invitations'][0]['InvitationId']
)
except ClientError as err:
logger.error(
"Accept Invitation Request failed in %s: %s",
region, err.response['Error']['Message']
)
sys.exit(1)
else:
logger.error("No Invitaions in %s", region)
sys.exit(1)
logger.info("Enabled Security Hub in %s", region)
logger.info("Enalbe Security Hub All Regions Succeded.")
全体
クリックで展開します。
import os
from logging import getLogger, INFO
from time import sleep
import sys
import boto3
from botocore.exceptions import ClientError
logger = getLogger()
logger.setLevel(INFO)
def get_client(service, region, credentials=None):
"""Returns the client for the specified region"""
if credentials is None:
client = boto3.client(service, region_name=region)
else:
client = boto3.client(
service,
region_name=region,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return client
def create_account(account_name, email):
"""Create account by AWS Organizations"""
account_id = 'None'
org = get_client('organizations', 'us-east-1')
try:
logger.info("Trying to create the account with %s", email)
create_account_response = org.create_account(
Email=email,
AccountName=account_name
)
sleep(10)
account_status = org.describe_create_account_status(
CreateAccountRequestId=create_account_response['CreateAccountStatus']['Id']
)
except ClientError as err:
logger.error("Create Account Request failed: %s", err.response['Error']['Message'])
else:
logger.info("Account Creation status: %s", account_status['CreateAccountStatus']['State'])
if account_status['CreateAccountStatus']['State'] == 'FAILED':
logger.error(
"Account Creation Failed. Reason : %s",
account_status['CreateAccountStatus']['FailureReason']
)
sys.exit(1)
account_status = org.describe_create_account_status(
CreateAccountRequestId=create_account_response['CreateAccountStatus']['Id']
)
account_id = account_status['CreateAccountStatus']['AccountId']
while account_id is None:
logger.info("Waiting create new account. Retrying...")
sleep(5)
account_status = org.describe_create_account_status(
CreateAccountRequestId=create_account_response['CreateAccountStatus']['Id']
)
account_id = account_status['CreateAccountStatus']['AccountId']
return account_id
def assume_role(account_id):
"""Assume role to account"""
sts = get_client('sts', 'us-east-1')
role_arn = f'arn:aws:iam::{account_id}:role/OrganizationAccountAccessRole'
try:
assumed_role_object = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="NewAccountSetUp"
)
except ClientError as err:
logger.error(
"Assume Role Request failed: %s, %s", err.response['Error']['Message'], account_id
)
sys.exit(1)
return assumed_role_object['Credentials']
def get_region_list():
"""Return Available Region List"""
ec2 = get_client('ec2', 'us-east-1')
available_regions = map(lambda x: x['RegionName'], ec2.describe_regions()['Regions'])
return list(available_regions)
def create_invitation(email, account_id, available_regions, audit_credentials):
"""Invite Account to Security Hub"""
logger.info("Create Security Hub Invitation Start.")
for region in available_regions:
securityhub = get_client('securityhub', region, audit_credentials)
try:
securityhub.create_members(
AccountDetails=[{
'AccountId': account_id,
'Email': email
}]
)
response = securityhub.invite_members(
AccountIds=[account_id]
)
except ClientError as err:
logger.error(
"Create Invitation Request failed in %s: %s",
region, err.response['Error']['Message']
)
sys.exit(1)
else:
if response['UnprocessedAccounts'] == []:
logger.info("Invited: %s", region)
else:
logger.info("UnprocessedRegion: %s", region)
logger.info("Create Security Hub Invitation Successed.")
def enable_securityhub(audit_account_id, available_regions, credentials):
"""Accept invitation from audit account"""
logger.info("Enable Security Hub All Regions Start.")
for region in available_regions:
securityhub = get_client('securityhub', region, credentials)
# Enalbe Security Hub
try:
securityhub.enable_security_hub()
except ClientError as err:
logger.error(
"Enalbe Security Hub Request failed in %s: %s",
region, err.response['Error']['Message']
)
sys.exit(1)
# Accept Invitation
try:
response = securityhub.list_invitations()
except ClientError as err:
logger.error(
"List Invitaion Request failed in %s: %s", region, err.response['Error']['Message']
)
sys.exit(1)
if response['Invitations'] != []:
try:
securityhub.accept_invitation(
MasterId=audit_account_id,
InvitationId=response['Invitations'][0]['InvitationId']
)
except ClientError as err:
logger.error(
"Accept Invitation Request failed in %s: %s",
region, err.response['Error']['Message']
)
sys.exit(1)
else:
logger.error("No Invitaions in %s", region)
sys.exit(1)
logger.info("Enabled Security Hub in %s", region)
logger.info("Enalbe Security Hub All Regions Succeded.")
def lambda_handler(event, context):
"""main"""
account_name = os.environ['ACCOUNT_NAME']
email = os.environ['EMAIL']
audit_account_id = "111111111111"
account_id = create_account(account_name, email)
logger.info("Created acount: %s", account_id)
# Assume Role New Account and Audit Account
credentials = assume_role(account_id)
audit_credentials = assume_role(audit_account_id)
# Get Region List
available_regions = get_region_list()
# Secuirty Hub
create_invitation(email, account_id, available_regions, audit_credentials)
enable_securityhub(audit_account_id, available_regions, credentials)
以上です。参考になれば幸いです。