LoginSignup
2
3

More than 5 years have passed since last update.

Inspectorの実行結果をCSVファイルとして出力する

Last updated at Posted at 2018-03-26

Amazon Inspectorの結果をいい感じにレポートしたい の続き。

概要

Inspectorの実行後、Lambdaファンクションを起動。LambdaファンクションはInspectorの実行結果を取得し、検出された脆弱性のリストをCSVファイルにしてS3へアップロードする。ついでに、S3にアップロードしたらPre-Signed URLを生成して関係者へSNS経由で通知する。通知を受信した関係者は通知本文に含まれるURLリンクをクリックするだけでCSVファイルをダウンロードできる。

トリガー

Inspectorの実行完了イベント→SNSトピック→Lambdaファンクション というふうにして、Inspectorの実行完了をトリガーにしてLambdaファンクションを起動する。このときLambdaファンクションに連携されるSNSのメッセージは以下の通り。

message.json
{
    "template": "arn:aws:inspector:ap-northeast-1:111111111111:ターゲット名/0-XXXXXXXX/テンプレート名/0-AAAAAAAA",
    "findingsCount": "
        {
            arn:aws:inspector:ap-northeast-1:406045910587:rulespackage/0-7WNjqgGu=999, 
            arn:aws:inspector:ap-northeast-1:406045910587:rulespackage/0-gHP9oWNT=99, 
            arn:aws:inspector:ap-northeast-1:406045910587:rulespackage/0-knGBhqEu=9
        }",
    "run": "arn:aws:inspector:ap-northeast-1:111111111111:ターゲット名/0-XXXXXXXX/テンプレート名/0-AAAAAAAA/run/0-BBBBBBBB",
    "time": "2018-01-01T01:00:00.000Z",
    "event": "ASSESSMENT_RUN_COMPLETED",
    "target": "arn:aws:inspector:ap-northeast-1:111111111111:target/0-XXXXXXXX"
}

メッセージ中の"run"(つまりInspectorのassessmentRunArn)を使えば実行結果を取得できる。

検出された脆弱性(findings)を取得する

findingArnsのリストを取得

まず、assessmentRunArnをもとにfinding(=検出された脆弱性)のArnのリストを取得する。
findingのArnのリストは、list_findongsメソッドにassessmentRunArnsを与えることで取得できる。

sample.py
    inspector = boto3.client('inspector')
    findingArns = inspector.list_findings(
        assessmentRunArns=[
            assessmentRunArn
        ],
        maxResults=500
    )['findingArns']

ただし、一度に得られるfindingArnの数は500が上限であるため、それ以上になる場合はループ処理が必要。ここではpaginatorの機能を使って以下のように書き換えてみる。

sample-paginator.py
    inspector = boto3.client('inspector')
    paginator = inspector.get_paginator('list_findings')
    operation_parameters = {
        'assessmentRunArns': [
            assessmentRunArn
        ],
        'maxResults': 500
    }
    page_iterator = paginator.paginate(**operation_parameters)
    for page in page_iterator:
        findingArns.extend(page['findingArns'])

findingsを取得

list_findingsは個々のfindingのArnしか取得できない。詳細情報を取得するにはdescribe_findingsを使う。一度に与えられるfindingArnの上限は100個なので、それ以上の場合はループ処理が必要。

sample.py
    #100件(上限値)ずつfindingsを取得
    findings=[]
    for i in range(0, -(-len(findingArns) // 100 )): #findingArnsの件数を100で割り算して切り上げ
        response = inspector.describe_findings(
            findingArns=findingArns[ i*100 : min( i*100+100 , len(findingArns) ) ],
            locale='EN_US'
        )
        findings.extend(response['findings'])

さらに、あとでCSVファイルへ出力することを考慮してソートしておく。ここは好みに応じて。

sample-sort.py
    #ソート
    findings = sorted(
        findings,
        key=lambda x:(
            x['assetAttributes']['agentId'],
            x['serviceAttributes']['rulesPackageArn'],
            x['id'],
            x['title']
        ),
        reverse=False
    )

CSVファイルを作成する

CSVファイルのヘッダ

sample.py
#レポートのタイトル行
titles=[
    '#arn',
    'assessmentRunArn',
    'agentId',
    'rulesPackageName',
    'severity',
    'numericSeverity',
    'confidence',
    'id',
    'title',
    'description',
    'recommendation'
    ]

使わないかもしれないけど、使うかもしれない情報は一通り出力する方向で。
ところで、先程の処理で取得したfindingsにはrulesPackageArn(ルールパッケージARN)は含まれるものの、rulesPackageName(ルールパッケージ名)は含まれていない。

rulesPackagesの名称取得

現時点でAWSから提供されているInspectorのルールパッケージは

  • CIS Operating System Security Configuration Benchmarks-1.0
  • Common Vulnerabilities and Exposures-1.1
  • Security Best Practices-1.0
  • Runtime Behavior Analysis-1.0

の4つ。(参考:Amazon Inspector のルールパッケージの ARN
Arnと名称の組み合わせはそうそう変わることはないだろうからハードコーディングでも良いかもしれないが、せっかくなので動的に取得する。

sample.py
#rulesPackagesのリストを返す
def get_rules_packages():
    rulesPackageArns = inspector.list_rules_packages()['rulesPackageArns']
    rulesPackages = inspector.describe_rules_packages(
        rulesPackageArns=rulesPackageArns,
        locale='EN_US'
    )['rulesPackages']
    return sorted(rulesPackages, key=lambda x:x['name'],reverse=False)

CSVファイルに出力する都合上、rulesPackagename順でソートしておく。

書き込み内容の生成

sample.py
    #CSVファイル用データ
    rowList=[]
    rowList.append(titles)
    #finding1件につき一行としてCSVファイル用のデータを作成
    #AthenaがCSVの埋め込み改行に対応していないため、改行は"\\n"にエスケープする
    for finding in findings:
        rowList.append([
            finding['arn'],
            finding['serviceAttributes']['assessmentRunArn'],
            finding['assetAttributes']['agentId'],
            list(filter(lambda x: x['arn'] == finding['serviceAttributes']['rulesPackageArn'], rulesPackages))[0]['name'], #ルールパッケージ名
            finding['severity'],
            finding['numericSeverity'],
            finding['confidence'],
            finding['id'].replace("\n","\\n"),
            finding['title'].replace("\n","\\n"),
            finding['description'].replace("\n","\\n"),
            finding['recommendation'].replace("\n","\\n")
            ])

サンプルコード内のコメントにも記述しているが、CSVのカラム内に改行が含まれているとAthenaの処理に影響があるため、無害っぽいものに変換しておく。(その場しのぎ的な対応なので、最適解ではないかも・・)4カラム目はrulesPackageArnをキーにルールパッケージ名を取得している。

CSVファイル出力

いったん、Lambda実行環境上の/tmp配下へファイルを出力する。

sample.py
#2次元配列を受け取り、csvファイルへ出力する
def write_csv(rowList):
    filepath = '/tmp/inspector_report_'+datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')+'.csv'
    #CSVファイル書き込み
    try:
        with open(filepath, 'w+', encoding='cp932' , errors="ignore") as csvfile:
            writer = csv.writer(csvfile, lineterminator='\n')
            writer.writerows(rowList) 
    except FileNotFoundError as e:
        print(e)
    except csv.Error as e:
        print(e)
    return filepath

S3へアップロードしてPre-signed URLを取得する

S3操作用のクレデンシャル

通常、Lambdaファンクションは自身にアタッチされたIAMロールをに基づいて、S3を始めとする各種AWSリソースへのアクセスが拒否または許可される。なので、適切なS3の操作権限がIAMロールに含まれていれば、S3へのアップロードやPre-signed URLの生成は問題なく実行できる。しかし、ロールによって許可される権限は「一時的なもの」という制約がある。一時的な権限が切れると、その権限をもとに作成されたPre-signed URLは無効になってしまう!1時間~数時間(?)以上の有効期間をもつPre-signed URLを生成したい場合、IAMロールの権限で処理を実行してはいけない、ということになる。

この課題をクリアするため、S3のPre-signed URLを生成する専用の弱い権限を持ったIAMユーザを用意する。Lambdaファンクション内で、そのIAMユーザのクレデンシャルを使ってS3の操作を行う。IAMユーザの権限として、ここでは以下のポリシーをアタッチする。

iam-s3-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::hoge-bucket/*"
        }
    ]
}

GetObjectの権限があれば、Pre-signed URLを使ったファイルのダウンロードができるはず。"Resource"はファイル名も含め、可能な範囲で絞ればなお良い。今回作成するファンクションではこのIAMユーザの権限をつかってS3のアップロードも行うためPutObjectの権限も含めている。(ひょっとしたらS3アップロードはLambdaファンクションにアタッチするIAMロールで、Pre-signed URLの生成はGetObject権限のみをもつIAMユーザで行えばもっとセキュアなのかも・・・未検証ですが。)

パラメータストアへアクセス

IAMユーザのクレデンシャル情報(アクセスキー、シークレットキー)をLambdaファンクション内でどのように利用するか。コード内にべた書きはさすがによろしくない。Lambdaの環境変数+暗号化のはアリだけど、ここではパラメータストアを使ってみる。

事前に、SSMパラメータストアにSecureString形式でIAMユーザのアクセスキーとシークレットキーを登録しておく。Lambdaファンクション内では以下の処理でパラメータストアに登録したアクセスキーとシークレットキーを取得できる。

sample.py
def get_parameters():
    accesskey = ssm.get_parameter(
        Name=accesskey_name,
        WithDecryption=True
    )['Parameter']['Value']
    secretkey = ssm.get_parameter(
        Name=secretkey_name,
        WithDecryption=True
    )['Parameter']['Value']
    return accesskey,secretkey

accesskey_namesecretkey_nameはパラメータストアに登録したアクセスキーとシークレットキーのパラメータ名。

S3アップロード

先程用意したクレデンシャル情報を使ってS3アップロードする。

sample.py
    accesskey,secretkey = get_parameters()
    s3 = boto3.resource(
        's3',
        aws_access_key_id=accesskey,
        aws_secret_access_key=secretkey
    )
    #S3バケット名
    bucket = s3.Bucket('hoge-bucket')
    #S3ファイル名
    s3_filepath = "hoge-path/hoge-filename.csv"
    #S3アップロード
    try:
        bucket.upload_file(filepath, s3_filepath)
    except Exception as e:
        print(e)

ファイル名は実際には動的に生成するのが良い。これだと毎回上書きしてしまうので。例えばInspectorの実行完了時刻を含めたファイル名など。また、前述のとおりだがアップロードはIAMロールの権限でもいけるかも。(未検証。)

Pre-signed URL生成

先程用意したクレデンシャル情報を使ってS3ダウンロードのためのPre-Signed URLを作成する。

sample.py
    s3c = boto3.client(
        's3',
        aws_access_key_id=accesskey,
        aws_secret_access_key=secretkey
    )
    #Pre-SignedURLの作成
    try:
        url = s3c.generate_presigned_url(
            ClientMethod = 'get_object',
            Params = {
                'Bucket' : bucket.name,
                'Key' : s3_filepath
            },
            ExpiresIn = expired_hour*60*60,
            HttpMethod = 'GET'
        )
    except Exception as e:
        print(e)

SNSへのパブリッシュ

生成したPre-Signed URLを関係者へ通知する。

sample.py
def sns_publish(topicarn,url):
    #SNS title
    title = "Inspector結果レポート"
    #SNS本文
    message=u"""Inspector実行結果レポートを作成しました。
{}
こちらのURLからダウンロード可能です。
ダウンロード期限は{}時間です。

""".format(
        url,
        expired_hour
    )
    #SNS Publish
    try:
        response = sns.publish(
            TopicArn = topicarn,
            Message = message,
            Subject = title
        )
    except Exception as e:
        print(e)
        raise e

なお、ここではメールで関係者へアナウンスすることを想定している。メールの受信者はAWSのアカウントを持っている必要はなく、有効期間内にメール本文に記載されたURLへアクセスしさえすれば、generate_presigned_urlを実行したIAMユーザの権限にもとづいてS3ダウンロードの処理を実行することができる。可能であれば該当S3バケットのバケットポリシーで、IAMユーザのアクセス条件を制限すると良い。(例えば接続元IPアドレスが絞れる場合など)

まとめ

InspectorのAPIを叩いて終わり、と思っていましたが

  • paginator の処理
  • パラメータストア
  • Lambdaからのgenerate_presigned_url

と、いろいろ小ネタも盛り込まれる内容になりました。
次回、「ゴールその2 Inspectorの最新の実行結果と前回の実行結果の差分をレポートする」を目指します。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3