はじめに
AWS LambdaでWAFのIP setsを更新します。
ランタイムはpython
です。
WAFはCLOUDFRONT
のものとなるため、AWSリージョンはバージニア北部のus-east-1
となります。
WAFのIP Setsの更新は、渡したIPアドレスのリストで上書きされるので、登録・更新・削除が同じメソッドとなります。
リストip_adresses
は重複していても問題なく、WAFに登録されます。
Lambdaのスクリプト
Lambda
import boto3
waf = boto3.client('wafv2', region_name='us-east-1')
def handler(event, context):
ip_set_name = 'IP Sets名'
ip_adresses = ['IPアドレス']
response = update_ip_addresses(ip_set_name, ip_adresses)
return response
def update_ip_addresses(ip_set_name, ip_adresses, scope='CLOUDFRONT'):
"""[指定したIPアドレスでWAFを更新する]
Args:
ip_set_name ([str]): [IP sets名]
ip_adresses ([list]): [IPアドレス]
scope ([str], optional): [スコープ]. Defaults to CLOUDFRONT.
Returns:
[dict]: []
"""
try:
# IP setsの更新情報を取得する
ip_set_info = get_ip_set_info(ip_set_name)
response = waf.update_ip_set(
Name=ip_set_name,
Scope=scope,
Id=ip_set_info['IPSet']['Id'],
Addresses=ip_adresses,
LockToken=ip_set_info['LockToken']
)
except waf.exceptions.WAFOptimisticLockException:
update_ip_addresses(ip_set_name, ip_adresses)
return response
def get_ip_set_info(ip_set_name, scope='CLOUDFRONT'):
"""[指定したIP setsの詳細情報を取得する]
Args:
ip_set_name ([str]): [IP sets名]
scope ([str], optional): [スコープ]. Defaults to CLOUDFRONT.
Returns:
[dict]: [指定したIP setsの詳細情報]
"""
response = waf.get_ip_set(
Name=ip_set_name,
Scope=scope,
Id=get_filtering_ip_sets()[ip_set_name]['id']
)
return response
def get_filtering_ip_sets():
"""[取得したIP setsの情報をフィルタリングする]
Returns:
[dict]: [フィルタリングしたIP setsの情報]
"""
ip_sets_dict= {}
for ip_set in get_all_ip_sets():
ip_sets_dict[ip_set['Name']] = dict(
id=ip_set['Id'],
lock_token=ip_set['LockToken']
)
return ip_sets_dict
def get_all_ip_sets(scope='CLOUDFRONT', limit=100, next_marker=None):
"""[全てのIP setsの情報を取得する]
Args:
scope ([str], optional): [スコープ]. Defaults to CLOUDFRONT.
limit ([int]): [取得上限値]
next_marker ([list]): [次の取得マーカー]. Defaults to None.
Returns:
[list]: [IP setsの情報]
"""
all_ip_sets = []
# 初回実行の場合
if next_marker is None:
ret = waf.list_ip_sets(Scope=scope, Limit=limit)
# 次のデータが存在する場合
else:
ret = waf.list_ip_sets(Scope=scope, Limit=limit, NextMarker=next_marker)
if 'NextMarker' in ret:
all_ip_sets = ret['IPSets'] + get_all_ip_sets(next_marker=ret['NextMarker'])
else:
all_ip_sets = ret['IPSets']
return all_ip_sets