2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWSの利用をZscalerからに制限する

Last updated at Posted at 2024-05-28

概要

AWSのセキュリティを強化する機能としては、MFA(多要素認証)が一般的です。

しかしMFAを利用しても、自宅からアクセスできてしまう問題があります。

また複数のAWSアカウントを管理し、IAMユーザが数百人に達すると、全ユーザの動向を把握するのは難しくなります。
メンバ管理をしている責任者が適切に運用してくれないと、退職したユーザにアクセスされてしまう可能性もあります。

アクセスキーにMFAを強制するとロジックの作り込みが必要になりますし、人間がアクセスキーを使用する際には手順が増え、ユーザに説明するのも、自分が利用するのも手間がかかります。

このような問題は、IP制限を設定することで多くが解決できます。アクセスキーが漏洩した場合のリスクも大幅に抑えられます。

会社からのアクセスに限定するには、自社拠点のIPアドレスに制限すれば良いのですが、弊社では全クライアントにZscalerがインストールされているためこの方法が使えません。

本記事では、Zscaler環境下でAWSにIP制限をかける方法、特にその運用の自動化を解説します。

なお、これを読みに来た人はZscalerの利用者だと思いますので、Zscalerが何かとかは割愛します。

Zscalerに制限する方法

Zscalerを端末にインストールすると、ブラウザ等にプロキシを設定しなくても、全ての通信がZscalerのプロキシを通過するようになります。

そのため、AWSから見た送信元IPアドレスが、Zscalerのデータセンターになります。

なのでAWS側で、このZscalerのデータセンターIPレンジからに送信元を限定すれば、AWSの利用をZscalerからに制限することが可能になります。

今回具体的に制限を施すのはIAMポリシーとセキュリティグループになります。

IAMポリシーはIPアドレスで許可/拒否を定義することが可能です。
IAMユーザの権限をZscalerのIPレンジ以外からアクセスしたときは拒否するよう構成すれば、マネジメントコンソールやアクセスキーを使ったAPIの利用をZscalerからに制限できます。

AWS上に作成したリソースへのアクセスは、セキュリティグループで制限します。

制限する範囲をZscalerとするリスク

Zscalerはクラウドサービスですので、データセンターは他社のZscalerユーザとも共有しています。

Zscalerからに制限しても、それは自社からのアクセスに制限したことにはなりません。

流石に悪意あるユーザがわざわざZscalerを購入するとは考えにくいですが、Zscalerの利用者がボットに感染して…といった状況はあるかも知れません。

とは言えこれ以上の制限もできないので、ここは経営判断でリスクを許容してもらうしかありません。

筆者が管理するAWSでは要求されるセキュリティレベルに応じて、MFAと組み合わせたり、クライアント証明書を要求したり、環境へのアクセスをSSHトンネル経由に限定することで鍵認証させるなどの対策を採っています。

プレフィックスリストとは

唐突ですがプレフィックスリストについて説明させていただきます。
プレフィックスリストはIPをグループ化する機能です。
セキュリティグループからも参照できます。

セキュリティグループはVPCを跨いで共有することができません。
セキュリティグループでZscalerのIPレンジを直接設定すると、更新があるたびに各VPCのセキュリティグループを全て更新する必要があります。

そこで、プレフィックスリストを用いてZscalerのIPレンジをグループ化しておき、セキュリティグループのIPブロックにはプレフィックスリストを指定しておきます。

こうすることで、プレフィックスリストだけ更新すれば、全セキュリティグループのルールを更新することが可能になります。

先ほど「セキュリティグループで制限する」と言いましたが、運用上はプレフィックスリストをメンテナンスすることになります。

運用自動化の必要性

IAMポリシーとセキュリティグループでAWSの利用をZscalerに制限できると説明しましたが、Zscalerはユーザの増加に合わせてデータセンターを拡張しており、それに伴いIPレンジも追加されていきます。

そのため、IAMポリシーとセキュリティグループを、ZscalerのIPレンジ追加に追従するようメンテナンスし続ける必要があります。

日本のデータセンターに限定してもIP追加は年3回程度あるため、複数AWSを運用していると非常に面倒です。

なのでIAMポリシーとプレフィックスリストをLambdaで更新するようにしました。

ZscalerのIPレンジ

自動化のためにはZscalerのIPレンジを取得する必要がありますが、親切なことに全データセンターのIPレンジはJSON形式でWebから取得可能です。

  1. 「zscaler ip range」等でWeb検索
  2. Cloud Enforcement Node Ranges
  3. Current Data Centers
  4. Download: all required and optional destinations in one JSON formatted list.

IAMポリシー自動更新

IAMポリシー作成

下記のような内容で、任意の名前のポリシーを作成してください。
ZscalerのIPは後でLambdaで自動的に追加するので書かなくて大丈夫です。

Zscaler以外にも許可したいIPがあれば、手動で追加してください。
後で紹介するLambdaは、手動で追加したIPを変更したり削除したりはしません。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": "*",
            "Resource": "*",
            "Condition": {
                "NotIpAddress": {
                    "aws:SourceIp": [
                        "Zscaler以外の許可したいIP",
                        "111.222.333.444/23",
                        "555.666.777.888/32"
                    ]
                }
            }
        }
    ]
}

IAMポリシーの使い方

IP制限を掛けたいIAMユーザやIAMグループの権限に、作成したIAMポリシーを追加してください。

この操作をしているあなたが締め出されてしまわないように注意してください。

マネジメントコンソールを操作しているときは、基本的にはブラウザからAPIコールするのですが、たまにブラウザでなくAWS内からAPIコールされることがあります。(最近はほとんどなくなりましたが)
その場合、AWSから見た送信元IPはZscalerでなくAWSのIPになります。一方で実行者の権限はマネジメントコンソールにログインしているユーザのため、IP制限に引っかかって拒否されてしまいます。

このような場合に備え、同じ権限を持ちつつ、IP制限だけ掛かっていないロールを用意しましょう。
IP制限のせいで実行できない操作があるときは、そのロールにAssumeRoleしてから実行してもらいます。

ユーザ/グループ権限 ロール権限
割り当てたいポリシーたち 割り当てたいポリシーたち
AssumeRoleするためのポリシー
IP制限のポリシー

Lambda関数作成

コード

ランタイム:Python 3.12

import os
import json
import boto3
from urllib import request

iam = boto3.client('iam')

# 環境変数からIAMポリシー名とIPリストのURLを取得
IAM_POLICY_NAME = os.getenv("IAM_POLICY")
IP_LIST_URL = os.getenv("IP_LIST_JSON")

# IAMポリシー名からARNを取得
def get_policy_arn(policy_name):
    try:
        # AWSアカウントIDを取得
        aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

        # ポリシーのARNを構成
        policy_arn = f"arn:aws:iam::{aws_account_id}:policy/{policy_name}"
        return policy_arn
    except Exception as e:
        print(f"Error: {e}")
        return None

# IAMポリシーを取得
def get_iam_policy(policy_arn):
    try:
        response = iam.get_policy(PolicyArn=policy_arn)  # 最新バージョンを取得
        latest_version = response['Policy']['DefaultVersionId']
        response = iam.get_policy_version(PolicyArn=policy_arn, VersionId=latest_version)
        return response['PolicyVersion']['Document']
    except Exception as e:
        print(f"Error: {e}")
        return None

# IAMポリシーの最も古い非デフォルトバージョンを削除
def delete_oldest_non_default_version(policy_arn):
    try:
        # ポリシーのバージョン一覧を取得
        response = iam.list_policy_versions(PolicyArn=policy_arn)
        versions = response['Versions']

        # デフォルトバージョン以外のバージョンを取得
        non_default_versions = [v for v in versions if not v['IsDefaultVersion']]

        # 最も古い非デフォルトバージョンを特定
        oldest_version = min(non_default_versions, key=lambda x: x['CreateDate'])

        # 最も古い非デフォルトバージョンを削除
        iam.delete_policy_version(PolicyArn=policy_arn, VersionId=oldest_version['VersionId'])

        print(f"IAMポリシー {policy_arn} の最も古い非デフォルトバージョン {oldest_version['VersionId']} を削除しました。")
    except Exception as e:
        print(f"Error: {e}")

# IAMポリシーを更新する関数
def update_iam_policy(policy_arn, new_policy):
    try:
        # 更新前にポリシーを表示
        print("更新前のIAMポリシー:")
        print(json.dumps(new_policy, indent=2))

        response = iam.create_policy_version(
            PolicyArn=policy_arn,
            PolicyDocument=json.dumps(new_policy),
            SetAsDefault=True
        )

        # デフォルトバージョンを設定
        version_id = response['PolicyVersion']['VersionId']
        iam.set_default_policy_version(PolicyArn=policy_arn, VersionId=version_id)

        print(f"IAMポリシー {policy_arn} を更新し、バージョン {version_id} をデフォルトに設定しました。")
    except Exception as e:
        print(f"Error: {e}")

# Lambda関数のハンドラ
def lambda_handler(event, context):
    # 現在のIAMポリシーのARNを取得
    IAM_POLICY_ARN = get_policy_arn(IAM_POLICY_NAME)
    print(f"IAMポリシー名: {IAM_POLICY_NAME}")
    print(f"IAMポリシーのARN: {IAM_POLICY_ARN}")

    if IAM_POLICY_ARN:
        # 現在のIAMポリシーを取得
        IAM_POLICY = get_iam_policy(IAM_POLICY_ARN)

        if IAM_POLICY:
            # 現在のIAMポリシーから許可されたIPアドレスを取得
            allowed_ips = IAM_POLICY["Statement"][0]["Condition"]["NotIpAddress"]["aws:SourceIp"]

            # JSONからIPリストを取得
            try:
                with request.urlopen(IP_LIST_URL) as response:
                    ip_list = json.loads(response.read().decode())
            except Exception as e:
                print(f"IPリストの取得に失敗しました: {e}")
                return

            # JSONからIPを抽出
            new_allowed_ips  = [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Osaka I", [])]
            new_allowed_ips += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo IV", [])]
            new_allowed_ips += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo V", [])]

            new_allowed_ips.sort()
            
            print("新しいIPリスト:")
            print(new_allowed_ips)

            new_allowed_ips = sorted(list(set(allowed_ips + new_allowed_ips)))

            # 新しいIAMポリシーの定義を作成
            new_policy = IAM_POLICY.copy()
            new_policy["Statement"][0]["Condition"]["NotIpAddress"]["aws:SourceIp"] = new_allowed_ips

            # 新旧のIAMポリシーを比較し、変更があれば更新
            if set(new_allowed_ips) != set(allowed_ips):
                delete_oldest_non_default_version(IAM_POLICY_ARN)
                update_iam_policy(IAM_POLICY_ARN, new_policy)
            else:
                print("IAMポリシーは更新の必要がありません。")
        else:
            print("IAMポリシーの取得に失敗しました。")
    else:
        print("IAMポリシーのARNの取得に失敗しました。")

# 手元のpythonでデバッグするとき用
if __name__ == "__main__":
    lambda_handler(None, None)

設定

一般設定

  • タイムアウト:5分0秒

トリガー

  • ソース:EventBridge
  • Rule:Create a new rule
  • Rule name:UpdateZscalerIP
  • Rule description:任意の備考
  • Rule type:Schedule expression
  • Schedule expression:cron(5 13 ? * * *)
    • 時間はUTC
    • 日本時間だと毎日22:05

アクセス権限
この関数用のロールに下記権限を追加してください。

  • IAMFullAccess

環境変数

キー
IAM_POLICY 更新対象のIAMポリシー名
IP_LIST_JSON https://config.zscaler.com/api/zscaler.net/cenr/json

IAM_POLICYはポリシー名を指定してください。
ポリシー名はユニークなので、ARNはコードの中で自動的に特定しています。

IP_LIST_JSONはZscalerが公開しているJSON形式のデータセンターIPレンジのURLを指定してください。

プレフィックスリスト自動更新

プレフィックスリスト作成

プレフィックスリストはIPv4とIPv6を混在させることができないため、それぞれのプレフィックスリストを作成してください。

IPv4のプレフィックスリストを作成

  1. VPC
  2. マネージドプレフィックスリスト
  3. プレフィックスリストを作成
  4. 各種入力
    1. プレフィックスリスト名:任意の名前
    2. 最大エントリ:30 ※注意
    3. アドレスファミリー:IPv4
    4. プレフィックスリストのエントリ:空で良い

IPv6のプレフィックスリストを作成

  1. VPC
  2. マネージドプレフィックスリスト
  3. プレフィックスリストを作成
  4. 各種入力
    1. プレフィックスリスト名:任意の名前
    2. 最大エントリ:10 ※注意
    3. アドレスファミリー:IPv6
    4. プレフィックスリストのエントリ:空で良い

最大エントリに関する注意点

セキュリティグループにプレフィックスリストを適用すると、ルール数は最大エントリで計上されます。

例えばプレフィックスリストの現在エントリ数が20でも、最大エントリ数が30だと、セキュリティグループには30ルールが追加されたものと見做されます。

1つのセキュリティグループには60ルール、1つのENIには5セキュリティグループまで、という上限がデフォルトになっていますので、これを超えないように最大エントリを設定する必要があります。

どうしても上限に収まらない場合はクォータの引き上げをリクエストしてください。

Lambda関数作成

コード

ランタイム:Python 3.12

import os
import json
import boto3
from urllib import request

ec2 = boto3.client('ec2')

# 環境変数からプレフィックスリストIDとIPリストのURLを取得
PREFIX_LIST_ID4 = os.getenv("PREFIX_LIST_ID4")
PREFIX_LIST_ID6 = os.getenv("PREFIX_LIST_ID6")
IP_LIST_URL = os.getenv("IP_LIST_JSON")

# プレフィックスリストのエントリを取得
def get_prefix_list(prefix_list_id):
    try:
        response = ec2.get_managed_prefix_list_entries(PrefixListId=prefix_list_id)
        return [entry['Cidr'] for entry in response['Entries']]
    except Exception as e:
        print(f"Error: {e}")
        return []

# プレフィックスリストの現在のバージョンを取得
def get_prefix_list_version(prefix_list_id):
    try:
        response = ec2.describe_managed_prefix_lists(PrefixListIds=[prefix_list_id])
        version = response['PrefixLists'][0]['Version']
        return version
    except Exception as e:
        print(f"Error: {e}")
        return None

# プレフィックスリストを更新
def update_prefix_list(prefix_list_id, new_entries):
    try:
        # 現在のプレフィックスリストのエントリを取得
        current_entries = get_prefix_list(prefix_list_id)

        # 追加するエントリを決定(現在のエントリに含まれていないもの)
        entries_to_add = [entry for entry in new_entries if entry not in current_entries]

        if not entries_to_add:
            print(f"新しいエントリはありません (Prefix List ID: {prefix_list_id})。")
            return

        # 現在のプレフィックスリストバージョンを取得
        current_version = get_prefix_list_version(prefix_list_id)
        if current_version is None:
            print(f"プレフィックスリストのバージョン取得に失敗しました (Prefix List ID: {prefix_list_id})。")
            return

        # 新しいエントリを追加
        ec2.modify_managed_prefix_list(
            PrefixListId=prefix_list_id,
            CurrentVersion=current_version,
            AddEntries=[{'Cidr': cidr} for cidr in entries_to_add]
        )

        print(f"プレフィックスリスト {prefix_list_id} に新しいエントリを追加しました: {entries_to_add}")
    except Exception as e:
        print(f"Error: {e}")

# IPv4かどうかをチェック
def is_ipv4_cidr(cidr):
    return ':' not in cidr

# IPv6かどうかをチェック
def is_ipv6_cidr(cidr):
    return ':' in cidr

# Lambda関数のハンドラ
def lambda_handler(event, context):
    print(f"IPv4プレフィックスリストID: {PREFIX_LIST_ID4}")
    print(f"IPv6プレフィックスリストID: {PREFIX_LIST_ID6}")

    if not PREFIX_LIST_ID4 or not PREFIX_LIST_ID6:
        print("プレフィックスリストIDの取得に失敗しました。")
        return

    # JSONからIPリストを取得
    try:
        with request.urlopen(IP_LIST_URL) as response:
            ip_list = json.loads(response.read().decode())
    except Exception as e:
        print(f"IPリストの取得に失敗しました: {e}")
        return

    # JSONからIPを抽出し、IPv4とIPv6に分ける
    new_entries_ipv4  = [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Osaka I", []) if is_ipv4_cidr(ip_range["range"])]
    new_entries_ipv4 += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo IV", []) if is_ipv4_cidr(ip_range["range"])]
    new_entries_ipv4 += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo V", []) if is_ipv4_cidr(ip_range["range"])]

    new_entries_ipv6  = [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Osaka I", []) if is_ipv6_cidr(ip_range["range"])]
    new_entries_ipv6 += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo IV", []) if is_ipv6_cidr(ip_range["range"])]
    new_entries_ipv6 += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo V", []) if is_ipv6_cidr(ip_range["range"])]

    new_entries_ipv4 = sorted(list(set(new_entries_ipv4)))
    new_entries_ipv6 = sorted(list(set(new_entries_ipv6)))

    print("新しいIPv4 IPリスト:")
    print(new_entries_ipv4)

    print("新しいIPv6 IPリスト:")
    print(new_entries_ipv6)

    # 新しいエントリのみを追加
    update_prefix_list(PREFIX_LIST_ID4, new_entries_ipv4)
    update_prefix_list(PREFIX_LIST_ID6, new_entries_ipv6)

# 手元のpythonでデバッグするとき用
if __name__ == "__main__":
    lambda_handler(None, None)

設定

一般設定

  • タイムアウト:5分0秒

トリガー

  • ソース:EventBridge
  • ルール:既存のルール
  • 既存のルール:UpdateZscalerIP
    • IAMポリシー自動更新で作成したもの
    • 新しく作って別スケジュールにしても可

アクセス権限
この関数用のロールに下記権限を追加してください。

  • AmazonEC2FullAccess

環境変数

キー
IP_LIST_JSON https://config.zscaler.com/api/zscaler.net/cenr/json
PREFIX_LIST_ID4 IPv4タイプのプレフィックスリストID
PREFIX_LIST_ID6 IPv6タイプのプレフィックスリストID

プレフィックスリスト名は重複が許容されており、名前では一意に特定できないので、こちらはpl-から始まるIDを指定してください。

IP_LIST_JSONはZscalerが公開しているJSON形式のデータセンターIPレンジのURLを指定してください。

Zscaler環境におけるLambda関数テスト方法

ZscalerからマネジメントコンソールでLambdaのテストを実行すると、下記のようなエラーが出て実行できません。

API アクションの呼び出しに失敗しました。エラーメッセージ: Unexpected token '<', "<!--# Id: "... is not valid JSON
Deserialization error: to see the raw response, inspect the hidden field {error}.$response on this object.

仕方ないのでAWS CLIで実行してください。

$ aws lambda invoke --function-name <Lambda関数名> output.txt

実行ログはCloudWatch Logsで確認してください。

コードのメンテナンス

今回作成したコードは、Tokyo IV、Tokyo V、Osaka Iのデータセンターに限定しています。
プロジェクトの性質によっては国内からのアクセスに限定する必要があるのと、IAMポリシーとセキュリティグループにはそれぞれ文字数とルール数の上限があるため、ある程度絞り込みたかったからです。

なので国外のデータセンターも許可したいときや、国内のデータセンターが増設されたときは、コードの下記箇所を適宜修正してください。

new_allowed_ips  = [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Osaka I", [])]
new_allowed_ips += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo IV", [])]
new_allowed_ips += [ip_range["range"] for ip_range in ip_list.get("zscaler.net", {}).get("continent : APAC", {}).get("city : Tokyo V", [])]

クライアント側でデータセンターを変更するには

日本でZscalerを利用していれば、ほとんど東京か大阪のデータセンターが利用されます。
ただし確実ではなく、たまに国外のデータセンターに振り分けられてしまうこともあります。

自分が今、どのデータセンターを利用しているかは下記で確認できます。

上記ページにアクセスすると、下記のようなメッセージが表示されます。

You are accessing the Internet via Zscaler Cloud: Tokyo IV in the zscaler.net cloud.

これが日本のデータセンターでなかったときは、Zscalerを再起動して再振り分けしてもらいましょう。

Zscaler再起動.png

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?