Amazon Inspectorの結果をいい感じにレポートしたい の続き。
概要
Inspectorの実行後、Lambdaファンクションを起動。LambdaファンクションはInspectorの実行結果を取得し、検出された脆弱性のリストをCSVファイルにしてS3へアップロードする。ついでに、S3にアップロードしたらPre-Signed URLを生成して関係者へSNS経由で通知する。通知を受信した関係者は通知本文に含まれるURLリンクをクリックするだけでCSVファイルをダウンロードできる。
トリガー
Inspectorの実行完了イベント→SNSトピック→Lambdaファンクション というふうにして、Inspectorの実行完了をトリガーにしてLambdaファンクションを起動する。このときLambdaファンクションに連携されるSNSのメッセージは以下の通り。
{
"template": "arn:aws:inspector:ap-northeast-1:111111111111:ターゲット名/0-XXXXXXXX/テンプレート名/0-AAAAAAAA",
"findingsCount": "
{
arn:aws:inspector:ap-northeast-1:406045910587:rulespackage/0-7WNjqgGu=999,
arn:aws:inspector:ap-northeast-1:406045910587:rulespackage/0-gHP9oWNT=99,
arn:aws:inspector:ap-northeast-1:406045910587:rulespackage/0-knGBhqEu=9
}",
"run": "arn:aws:inspector:ap-northeast-1:111111111111:ターゲット名/0-XXXXXXXX/テンプレート名/0-AAAAAAAA/run/0-BBBBBBBB",
"time": "2018-01-01T01:00:00.000Z",
"event": "ASSESSMENT_RUN_COMPLETED",
"target": "arn:aws:inspector:ap-northeast-1:111111111111:target/0-XXXXXXXX"
}
メッセージ中の"run"(つまりInspectorのassessmentRunArn
)を使えば実行結果を取得できる。
検出された脆弱性(findings)を取得する
findingArns
のリストを取得
まず、assessmentRunArn
をもとにfinding
(=検出された脆弱性)のArnのリストを取得する。
finding
のArnのリストは、list_findongs
メソッドにassessmentRunArns
を与えることで取得できる。
inspector = boto3.client('inspector')
findingArns = inspector.list_findings(
assessmentRunArns=[
assessmentRunArn
],
maxResults=500
)['findingArns']
ただし、一度に得られるfindingArn
の数は500が上限であるため、それ以上になる場合はループ処理が必要。ここではpaginatorの機能を使って以下のように書き換えてみる。
inspector = boto3.client('inspector')
paginator = inspector.get_paginator('list_findings')
operation_parameters = {
'assessmentRunArns': [
assessmentRunArn
],
'maxResults': 500
}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
findingArns.extend(page['findingArns'])
findings
を取得
list_findings
は個々のfinding
のArnしか取得できない。詳細情報を取得するにはdescribe_findings
を使う。一度に与えられるfindingArn
の上限は100個なので、それ以上の場合はループ処理が必要。
#100件(上限値)ずつfindingsを取得
findings=[]
for i in range(0, -(-len(findingArns) // 100 )): #findingArnsの件数を100で割り算して切り上げ
response = inspector.describe_findings(
findingArns=findingArns[ i*100 : min( i*100+100 , len(findingArns) ) ],
locale='EN_US'
)
findings.extend(response['findings'])
さらに、あとでCSVファイルへ出力することを考慮してソートしておく。ここは好みに応じて。
#ソート
findings = sorted(
findings,
key=lambda x:(
x['assetAttributes']['agentId'],
x['serviceAttributes']['rulesPackageArn'],
x['id'],
x['title']
),
reverse=False
)
CSVファイルを作成する
CSVファイルのヘッダ
#レポートのタイトル行
titles=[
'#arn',
'assessmentRunArn',
'agentId',
'rulesPackageName',
'severity',
'numericSeverity',
'confidence',
'id',
'title',
'description',
'recommendation'
]
使わないかもしれないけど、使うかもしれない情報は一通り出力する方向で。
ところで、先程の処理で取得したfindings
にはrulesPackageArn
(ルールパッケージARN)は含まれるものの、rulesPackageName
(ルールパッケージ名)は含まれていない。
rulesPackages
の名称取得
現時点でAWSから提供されているInspectorのルールパッケージは
- CIS Operating System Security Configuration Benchmarks-1.0
- Common Vulnerabilities and Exposures-1.1
- Security Best Practices-1.0
- Runtime Behavior Analysis-1.0
の4つ。(参考:Amazon Inspector のルールパッケージの ARN)
Arnと名称の組み合わせはそうそう変わることはないだろうからハードコーディングでも良いかもしれないが、せっかくなので動的に取得する。
#rulesPackagesのリストを返す
def get_rules_packages():
rulesPackageArns = inspector.list_rules_packages()['rulesPackageArns']
rulesPackages = inspector.describe_rules_packages(
rulesPackageArns=rulesPackageArns,
locale='EN_US'
)['rulesPackages']
return sorted(rulesPackages, key=lambda x:x['name'],reverse=False)
CSVファイルに出力する都合上、rulesPackage
のname
順でソートしておく。
書き込み内容の生成
#CSVファイル用データ
rowList=[]
rowList.append(titles)
#finding1件につき一行としてCSVファイル用のデータを作成
#AthenaがCSVの埋め込み改行に対応していないため、改行は"\\n"にエスケープする
for finding in findings:
rowList.append([
finding['arn'],
finding['serviceAttributes']['assessmentRunArn'],
finding['assetAttributes']['agentId'],
list(filter(lambda x: x['arn'] == finding['serviceAttributes']['rulesPackageArn'], rulesPackages))[0]['name'], #ルールパッケージ名
finding['severity'],
finding['numericSeverity'],
finding['confidence'],
finding['id'].replace("\n","\\n"),
finding['title'].replace("\n","\\n"),
finding['description'].replace("\n","\\n"),
finding['recommendation'].replace("\n","\\n")
])
サンプルコード内のコメントにも記述しているが、CSVのカラム内に改行が含まれているとAthenaの処理に影響があるため、無害っぽいものに変換しておく。(その場しのぎ的な対応なので、最適解ではないかも・・)4カラム目はrulesPackageArn
をキーにルールパッケージ名を取得している。
CSVファイル出力
いったん、Lambda実行環境上の/tmp
配下へファイルを出力する。
#2次元配列を受け取り、csvファイルへ出力する
def write_csv(rowList):
filepath = '/tmp/inspector_report_'+datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')+'.csv'
#CSVファイル書き込み
try:
with open(filepath, 'w+', encoding='cp932' , errors="ignore") as csvfile:
writer = csv.writer(csvfile, lineterminator='\n')
writer.writerows(rowList)
except FileNotFoundError as e:
print(e)
except csv.Error as e:
print(e)
return filepath
S3へアップロードしてPre-signed URLを取得する
S3操作用のクレデンシャル
通常、Lambdaファンクションは自身にアタッチされたIAMロールをに基づいて、S3を始めとする各種AWSリソースへのアクセスが拒否または許可される。なので、適切なS3の操作権限がIAMロールに含まれていれば、S3へのアップロードやPre-signed URLの生成は問題なく実行できる。しかし、ロールによって許可される権限は「一時的なもの」という制約がある。一時的な権限が切れると、その権限をもとに作成されたPre-signed URLは無効になってしまう!1時間~数時間(?)以上の有効期間をもつPre-signed URLを生成したい場合、IAMロールの権限で処理を実行してはいけない、ということになる。
この課題をクリアするため、S3のPre-signed URLを生成する専用の弱い権限を持ったIAMユーザを用意する。Lambdaファンクション内で、そのIAMユーザのクレデンシャルを使ってS3の操作を行う。IAMユーザの権限として、ここでは以下のポリシーをアタッチする。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::hoge-bucket/*"
}
]
}
GetObject
の権限があれば、Pre-signed URLを使ったファイルのダウンロードができるはず。"Resource"はファイル名も含め、可能な範囲で絞ればなお良い。今回作成するファンクションではこのIAMユーザの権限をつかってS3のアップロードも行うためPutObject
の権限も含めている。(ひょっとしたらS3アップロードはLambdaファンクションにアタッチするIAMロールで、Pre-signed URLの生成はGetObject
権限のみをもつIAMユーザで行えばもっとセキュアなのかも・・・未検証ですが。)
パラメータストアへアクセス
IAMユーザのクレデンシャル情報(アクセスキー、シークレットキー)をLambdaファンクション内でどのように利用するか。コード内にべた書きはさすがによろしくない。Lambdaの環境変数+暗号化のはアリだけど、ここではパラメータストアを使ってみる。
事前に、SSMパラメータストアにSecureString形式でIAMユーザのアクセスキーとシークレットキーを登録しておく。Lambdaファンクション内では以下の処理でパラメータストアに登録したアクセスキーとシークレットキーを取得できる。
def get_parameters():
accesskey = ssm.get_parameter(
Name=accesskey_name,
WithDecryption=True
)['Parameter']['Value']
secretkey = ssm.get_parameter(
Name=secretkey_name,
WithDecryption=True
)['Parameter']['Value']
return accesskey,secretkey
accesskey_name
とsecretkey_name
はパラメータストアに登録したアクセスキーとシークレットキーのパラメータ名。
S3アップロード
先程用意したクレデンシャル情報を使ってS3アップロードする。
accesskey,secretkey = get_parameters()
s3 = boto3.resource(
's3',
aws_access_key_id=accesskey,
aws_secret_access_key=secretkey
)
#S3バケット名
bucket = s3.Bucket('hoge-bucket')
#S3ファイル名
s3_filepath = "hoge-path/hoge-filename.csv"
#S3アップロード
try:
bucket.upload_file(filepath, s3_filepath)
except Exception as e:
print(e)
ファイル名は実際には動的に生成するのが良い。これだと毎回上書きしてしまうので。例えばInspectorの実行完了時刻を含めたファイル名など。また、前述のとおりだがアップロードはIAMロールの権限でもいけるかも。(未検証。)
Pre-signed URL生成
先程用意したクレデンシャル情報を使ってS3ダウンロードのためのPre-Signed URLを作成する。
s3c = boto3.client(
's3',
aws_access_key_id=accesskey,
aws_secret_access_key=secretkey
)
#Pre-SignedURLの作成
try:
url = s3c.generate_presigned_url(
ClientMethod = 'get_object',
Params = {
'Bucket' : bucket.name,
'Key' : s3_filepath
},
ExpiresIn = expired_hour*60*60,
HttpMethod = 'GET'
)
except Exception as e:
print(e)
SNSへのパブリッシュ
生成したPre-Signed URLを関係者へ通知する。
def sns_publish(topicarn,url):
#SNS title
title = "Inspector結果レポート"
#SNS本文
message=u"""Inspector実行結果レポートを作成しました。
{}
こちらのURLからダウンロード可能です。
ダウンロード期限は{}時間です。
""".format(
url,
expired_hour
)
#SNS Publish
try:
response = sns.publish(
TopicArn = topicarn,
Message = message,
Subject = title
)
except Exception as e:
print(e)
raise e
なお、ここではメールで関係者へアナウンスすることを想定している。メールの受信者はAWSのアカウントを持っている必要はなく、有効期間内にメール本文に記載されたURLへアクセスしさえすれば、generate_presigned_url
を実行したIAMユーザの権限にもとづいてS3ダウンロードの処理を実行することができる。可能であれば該当S3バケットのバケットポリシーで、IAMユーザのアクセス条件を制限すると良い。(例えば接続元IPアドレスが絞れる場合など)
まとめ
InspectorのAPIを叩いて終わり、と思っていましたが
- paginator の処理
- パラメータストア
- Lambdaからの
generate_presigned_url
と、いろいろ小ネタも盛り込まれる内容になりました。
次回、「ゴールその2 Inspectorの最新の実行結果と前回の実行結果の差分をレポートする」を目指します。