この記事は AWS Advent Calendar 2021 13日目の記事です。
はじめに
AWS re:Invent 2021 で 脆弱性管理サービス Amazon Inspector のリニューアルが発表され、新たに Amazon ECR に格納されたコンテナイメージに対するスキャンをサポートしました。Inspector によるイメージスキャン機能の提供により、 ECR のスキャン機能は基本スキャン (Basic scanning)、Inspector によるスキャン機能は拡張スキャン (Enhanced scanning) という名称になりました。
2019年に AWS Lambda を使用して 基本スキャンの結果を Slack に通知する方法について投稿したのですが、拡張スキャンの通知には対応していません。
本記事では 2021年12月時点で各スキャン結果を Slack 通知する方法をあらためてまとめます。
基本スキャンと拡張スキャンの違いについて
機能的な違いについては別途記事を書いたのでこちらを参照いただければと思います。
AWS Chatbot による通知
2021年の 4月に Amazon EventBridge によって処理されるすべてのサービスイベントを AWS Chatbot で通知できるようになりました。ただし通知される内容は AWS Chatbot 次第で、目的の情報が削られてしまっているケースもあります。通知内容をカスタマイズしたい場合は、後半の AWS Lambda で通知する方法を確認ください。
AWS Chatbot や EventBridge の設定方法には触れませんので、必要な場合は以下をご参照ください。
基本スキャンの通知例
以前は対象のリポジトリの ARN がだけが通知され、スキャン結果の内容は表示されていませんでしたが、
EventBridge → AWS Chatbot の通知内容は随時改善されていっているようで、2021年12月現在ではスキャン結果のサマリーが以下のように表示されます。基本スキャンの結果を通知するだけなら AWS Chatbot で十分ですね。
拡張スキャンの通知例
Inspector による拡張スキャンは ECR にイメージを Push した際に実行される初期スキャンと、脆弱性情報更新時に自動実行される連続スキャンの2種類があります。連続スキャンの通知結果はまだ確認できていませんが、初期スキャンについては2021年12月時点で以下のように通知されました。
現時点では AWS Chatbot で初期スキャンの結果を通知することはできないようです。
AWS Lambda による通知
基本スキャンと拡張スキャンの両方に対応する
イメージスキャンの実行を EventBridge で検知し、AWS Lambda で情報を整形して Slack に通知します。冒頭の記事を書いた2019年時点ではイメージが Push された際のイベントのみを検知できていたため、Lambda 関数から DescribeImages API でスキャン結果のサマリーを取得していました。現在は基本スキャンの実行結果についても以下のようなイベント情報として配信されます。
{
"version": "0",
"id": "85fc3613-e913-7fc4-a80c-a3753e4aa9ae",
"detail-type": "ECR Image Scan",
"source": "aws.ecr",
"account": "123456789012",
"time": "2019-10-29T02:36:48Z",
"region": "us-east-1",
"resources": [
"arn:aws:ecr:us-east-1:123456789012:repository/my-repository-name"
],
"detail": {
"scan-status": "COMPLETE",
"repository-name": "my-repository-name",
"finding-severity-counts": {
"CRITICAL": 10,
"MEDIUM": 9
},
"image-digest": "sha256:7f5b2640fe6fb4f46592dfd3410c4a79dac4f89e4782432e0378abcd1234",
"image-tags": []
}
}
拡張スキャンの初期スキャンについても同様に以下の形式でイベントが配信されます。
{
"version": "0",
"id": "739c0d3c-4f02-85c7-5a88-94a9EXAMPLE",
"detail-type": "Inspector2 Scan",
"source": "aws.inspector2",
"account": "123456789012",
"time": "2021-12-03T18:03:16Z",
"region": "us-east-2",
"resources": [
"arn:aws:ecr:us-east-2:123456789012:repository/amazon/amazon-ecs-sample"
],
"detail": {
"scan-status": "INITIAL_SCAN_COMPLETE",
"repository-name": "arn:aws:ecr:us-east-2:809632081692:repository/amazon/amazon-ecs-sample",
"finding-severity-counts": {
"CRITICAL": 7,
"HIGH": 61,
"MEDIUM": 62,
"TOTAL": 158
},
"image-digest": "sha256:36c7b282abd0186e01419f2e58743e1bf635808231049bbc9d77e5EXAMPLE",
"image-tags": [
"latest"
]
}
}
イベントの構造は同一です。detail-type
と source
の値が異なるだけで、その他はほぼ同じ情報で構成されていることがわかります。そのため Lambda 関数は以下のような共通のものを用意し、Event Bridge のイベントルール側でどちらのスキャンタイプを検知するかを制御すればよいことになります。
"""
This is a sample function to send ECR Image Scan findings severity counts to slack.
Environment variables:
WEBHOOK_URL: Incoming Webhook URL
"""
from datetime import datetime
from logging import getLogger, INFO
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
import json
import os
import re
logger = getLogger()
logger.setLevel(INFO)
def get_properties(finding_counts):
"""Returns the color setting of severity"""
if finding_counts['CRITICAL'] != 0:
properties = {'color': 'danger', 'icon': ':red_circle:'}
elif finding_counts['HIGH'] != 0:
properties = {'color': 'warning', 'icon': ':large_orange_diamond:'}
else:
properties = {'color': 'good', 'icon': ':green_heart:'}
return properties
def get_repo_name(repo_arn):
"""Return repository name from ARN"""
result = re.match('.*?(/.+).*', repo_arn)
repo_name = result.group(1)
return repo_name[1:]
def get_params(event):
"""Slack message formatting"""
severity_list = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW', 'INFORMAL', 'UNDEFINED']
finding_counts = event['detail']['finding-severity-counts']
for severity in severity_list:
finding_counts.setdefault(severity, 0)
text_properties = get_properties(finding_counts)
if event['source'] == "aws.inspector2":
scan_type = "Enhanced scanning"
repo_name = get_repo_name(event['detail']['repository-name'])
else:
scan_type = "Basic scanning"
repo_name = event['detail']['repository-name']
complete_at = datetime.strftime(
datetime.strptime(event['time'], '%Y-%m-%dT%H:%M:%S%z'), '%Y-%m-%d %H:%M:%S %Z')
slack_message = {
'username': 'Amazon ECR',
'icon_emoji': ':ecr:',
'text': f'''*ECR Image Scan findings | {event['region']} | Account:{event['account']}*''',
'attachments': [
{
'fallback': 'AmazonECR Image Scan Findings Description.',
'color': text_properties['color'],
'title': f'''{text_properties['icon']} {repo_name}:{
event['detail']['image-tags'][0]}''',
'title_link': f'''https://console.aws.amazon.com/ecr/repositories/private/{
event['account']}/{repo_name}/image/{
event['detail']['image-digest']}/scan-results/?region={event['region']}''',
'text': f'''*Scan Type:* {scan_type}\n*Scan Status:* {
event['detail']['scan-status']}\n*Timestamp:* {complete_at}''',
'fields': [
{'title': 'Critical', 'value': finding_counts['CRITICAL'], 'short': True},
{'title': 'High', 'value': finding_counts['HIGH'], 'short': True},
{'title': 'Medium', 'value': finding_counts['MEDIUM'], 'short': True},
{'title': 'Low', 'value': finding_counts['LOW'], 'short': True},
{'title': 'Informational', 'value': finding_counts['INFORMAL'], 'short': True},
{'title': 'Undefined', 'value': finding_counts['UNDEFINED'], 'short': True},
]
}
]
}
return slack_message
def get_error_params(event):
"""Slack error message formatting"""
slack_message = {
'username': 'Amazon ECR',
'icon_emoji': ':ecr:',
'text': f'''*ECR Image Scan findings | {event['region']} | Account:{event['account']}*''',
'attachments': [
{
'fallback': 'AmazonECR Image Scan Findings Description.',
'color': 'danger',
'title': f''':red_circle: {event['detail']['repository-name']}:{
event['detail']['image-tags'][0]}''',
'title_link': f'''https://console.aws.amazon.com/ecr/repositories/private/{
event['account']}/{event['detail']['repository-name']}/image/{
event['detail']['image-digest']}/details/?region={event['region']}''',
'text': f'''*Scan Status:* {event['detail']['scan-status']}''',
}
]
}
return slack_message
def lambda_handler(event, context):
"""AWS Lambda Function to send ECR Image Scan findings severity counts to Slack"""
response = 1
if event['detail']['scan-status'] == 'FAILED':
slack_message = get_error_params(event)
else:
slack_message = get_params(event)
req = Request(os.environ['WEBHOOK_URL'], json.dumps(slack_message).encode('utf-8'))
try:
with urlopen(req) as res:
res.read()
logger.info("Message posted.")
except HTTPError as err:
logger.error("Request failed: %d %s", err.code, err.reason)
except URLError as err:
logger.error("Server connection failed: %s", err.reason)
else:
response = 0
return response
基本スキャンの通知例
拡張スキャンの通知例
EventBridge の設定
基本スキャンの結果を検知したい場合、イベントパターンは以下のように設定します。
{
"source": ["aws.ecr"],
"detail-type": ["ECR Image Scan"]
}
拡張スキャンの場合は以下です。
{
"source": ["aws.inspector2"],
"detail-type": ["Inspector2 Scan"]
}
公式ドキュメント
以上です。
参考になれば幸いです。