1
1

More than 1 year has passed since last update.

TranferFamily-SFTP for EFS のパスワード認証を実装する

Posted at

はじめに

AWS TranferFamily-SFTP接続のパスワード認証の実装しました
参考記事が少なく結構苦労したので、Qiitaに載せて共有します

構成

  • TransferFamily
    • プロトコル:SFTP
    • エンドポイントタイプ:VPCエンドポイント
    • アクセス:インターネット向き
    • ドメイン:Amazon EFS

AWS構成図

AWS Transfer Family のカスタムIDプロバイダーで AWS Secrets Manager を使用して パスワード認証を有効にしてみる  | DevelopersIO

画像引用元:AWS Transfer Family のカスタムIDプロバイダーで AWS Secrets Manager を使用して パスワード認証を有効にしてみる

やってみる

前提

  • sam cliが使用できること
  • aws cliが使用できること
  • VPC、EC2、EFS、セキュリティグループが作成済みであること
  • 対象のEFSがディレクトリ「/fs-XXXXXX」にマウント済みであること

①CloudFormationテンプレートの編集

  • ここからテンプレートをダウンロード
  • 上記テンプレートを書き換えます

template.ymlを書き換える

  • 26~33行目をコメントアウト
  # TransferEndpointType:
    #   AllowedValues:
    #     - 'PUBLIC'
    #     - 'VPC'
    #   Type: String
    #   Default: 'PUBLIC'
    #   Description: Select PUBLIC if you want a public facing AWS Transfer endpoint or VPC if you want a VPC
    #     based endpoint. Note that only SFTP and FTPS are supported on public endpoints.
  • 43行目下に以下を追加
  SecurityGroupIds:
    Type: String
    Default: ''
    Description: Required if launching a SecurityGroupIds. 
  • 61~64行目をコメントアウト
    # TransferVPCEndpoint:
    #   Fn::Equals:
    #     - Ref: TransferEndpointType
    #     - 'VPC'
  • 90行目下に以下を追加
# ------------------------------------------------------------#
# EIP
# ------------------------------------------------------------#
  ElasticIP1:
    Type: AWS::EC2::EIP
  ElasticIP2:
    Type: AWS::EC2::EIP
  • 99行目以下の、TransferFamily作成の項目を、以下のように書き換える
  TransferServer:
    Type: AWS::Transfer::Server
    Condition: CreateServer
    Properties:
      Domain: EFS
      EndpointType: VPC
      # EndpointDetails:
      #   Fn::If:
      #     - TransferVPCEndpoint
      #     - SubnetIds:
      #         Fn::Split: [',', Ref: TransferSubnetIDs]
      #       VpcId:
      #         Ref: TransferVPCID
      #     - Ref: AWS::NoValue
      EndpointDetails:
        AddressAllocationIds:
          - !GetAtt ElasticIP1.AllocationId
          - !GetAtt ElasticIP2.AllocationId
        SecurityGroupIds:
          - Ref: SecurityGroupIds
        SubnetIds:
          Fn::Split: [',', Ref: TransferSubnetIDs]
        VpcId: 
          Ref: TransferVPCID
      IdentityProviderDetails:
        InvocationRole:
          Fn::GetAtt: TransferIdentityProviderRole.Arn
        Url:
          Fn::Join:
            - ''
            - - https://
              - Ref: CustomIdentityProviderApi
              - .execute-api.
              - Ref: AWS::Region
              - .amazonaws.com/
              - Ref: ApiStage
      IdentityProviderType: API_GATEWAY
      LoggingRole:
        Fn::GetAtt: TransferCWLoggingRole.Arn
  • template.yml全文
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: A basic template for creating a Lambda-backed API Gateway for use as
  a custom identity provider in AWS Transfer. It authenticates against an
  entry in AWS Secrets Manager of the format server-id/username. Additionaly, the secret
  must hold the key-value pairs for all user properties returned to AWS Transfer.
  If you choose to also launch the AWS Transfer endpoint it will be provisioned with the SFTP protocol.
  This can be changed after launch. Note that FTP is only supported on VPC endpoints
  You can modify the Lambda function code to do something different after deployment.
Parameters:
  CreateServer:
    AllowedValues:
      - 'true'
      - 'false'
    Type: String
    Description: Whether this stack creates an AWS Transfer endpoint or not. If the endpoint is created as
      part of the stack, the custom identity provider is automatically associated with it.
    Default: 'true'
  SecretsManagerRegion:
    Type: String
    Description: (Optional) The region the secrets are stored in. If this value is not provided, the
      region this stack is deployed in will be used. Use this field if you are deploying this stack in
      a region where SecretsMangager is not available.
    Default: ''
  # TransferEndpointType:
  #   AllowedValues:
  #     - 'PUBLIC'
  #     - 'VPC'
  #   Type: String
  #   Default: 'PUBLIC'
  #   Description: Select PUBLIC if you want a public facing AWS Transfer endpoint or VPC if you want a VPC
  #     based endpoint. Note that only SFTP and FTPS are supported on public endpoints.
  TransferSubnetIDs:
    Type: String
    Default: ''
    Description: Required if launching a VPC endpoint. Comma-seperated list of subnets that you would like
      the AWS Transfer endpoint to be provisioned into.
  TransferVPCID:
    Type: String
    Default: ''
    Description: Required if launching a VPC endpoint. The VPC ID that you would like the AWS Transfer endpoint
      to be provisioned into.
  SecurityGroupIds:
    Type: String
    Default: ''
    Description: Required if launching a SecurityGroupIds. 
Conditions:
  CreateServer:
    Fn::Equals:
      - Ref: CreateServer
      - 'true'
  NotCreateServer:
    Fn::Not:
      - Condition: CreateServer
  SecretsManagerRegionProvided:
    Fn::Not:
      - Fn::Equals:
          - Ref: SecretsManagerRegion
          - ''
  # TransferVPCEndpoint:
  #   Fn::Equals:
  #     - Ref: TransferEndpointType
  #     - 'VPC'
Outputs:
  ServerId:
    Value:
      Fn::GetAtt: TransferServer.ServerId
    Condition: CreateServer
  StackArn:
    Value:
      Ref: AWS::StackId
  TransferIdentityProviderUrl:
    Description: URL to pass to AWS Transfer CreateServer call as part of optional IdentityProviderDetails
    Value:
      Fn::Join:
      - ''
      - - https://
        - Ref: CustomIdentityProviderApi
        - .execute-api.
        - Ref: AWS::Region
        - .amazonaws.com/
        - Ref: ApiStage
    Condition: NotCreateServer
  TransferIdentityProviderInvocationRole:
    Description: IAM Role to pass to AWS Transfer CreateServer call as part of optional IdentityProviderDetails
    Value:
      Fn::GetAtt: TransferIdentityProviderRole.Arn
    Condition: NotCreateServer
Resources:
# ------------------------------------------------------------#
# EIP
# ------------------------------------------------------------#
  ElasticIP1:
    Type: AWS::EC2::EIP
  ElasticIP2:
    Type: AWS::EC2::EIP

# ------------------------------------------------------------#
# SFTP Server
# ------------------------------------------------------------#
  TransferServer:
    Type: AWS::Transfer::Server
    Condition: CreateServer
    Properties:
      Domain: EFS
      EndpointType: VPC
      # EndpointDetails:
      #   Fn::If:
      #     - TransferVPCEndpoint
      #     - SubnetIds:
      #         Fn::Split: [',', Ref: TransferSubnetIDs]
      #       VpcId:
      #         Ref: TransferVPCID
      #     - Ref: AWS::NoValue
      EndpointDetails:
        AddressAllocationIds:
          - !GetAtt ElasticIP1.AllocationId
          - !GetAtt ElasticIP2.AllocationId
        SecurityGroupIds:
          - Ref: SecurityGroupIds
        SubnetIds:
          Fn::Split: [',', Ref: TransferSubnetIDs]
        VpcId: 
          Ref: TransferVPCID
      IdentityProviderDetails:
        InvocationRole:
          Fn::GetAtt: TransferIdentityProviderRole.Arn
        Url:
          Fn::Join:
            - ''
            - - https://
              - Ref: CustomIdentityProviderApi
              - .execute-api.
              - Ref: AWS::Region
              - .amazonaws.com/
              - Ref: ApiStage
      IdentityProviderType: API_GATEWAY
      LoggingRole:
        Fn::GetAtt: TransferCWLoggingRole.Arn
  TransferCWLoggingRole:
    Description: IAM role used by Transfer to log API requests to CloudWatch
    Type: AWS::IAM::Role
    Condition: CreateServer
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - transfer.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - Fn::Sub: arn:${AWS::Partition}:iam::aws:policy/service-role/AWSTransferLoggingAccess
  CustomIdentityProviderApi:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Name: Transfer Family Secrets Manager Integration API
      Description: API used for Transfer Family to access user information in Secrets Manager
      FailOnWarnings: true
      EndpointConfiguration:
        Types:
        - REGIONAL
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      ManagedPolicyArns:
      - Fn::Sub: arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
      - PolicyName: LambdaSecretsPolicy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
            - secretsmanager:GetSecretValue
            Resource:
              Fn::Sub:
                - arn:${AWS::Partition}:secretsmanager:${SecretsRegion}:${AWS::AccountId}:secret:s-*
                - SecretsRegion:
                    Fn::If:
                      - SecretsManagerRegionProvided
                      - Ref: SecretsManagerRegion
                      - Ref: AWS::Region
  ApiCloudWatchLogsRole:
    Description: IAM role used by API Gateway to log API requests to CloudWatch
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - apigateway.amazonaws.com
          Action:
          - sts:AssumeRole
      Policies:
      - PolicyName: ApiGatewayLogsPolicy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
            - logs:CreateLogGroup
            - logs:CreateLogStream
            - logs:DescribeLogGroups
            - logs:DescribeLogStreams
            - logs:PutLogEvents
            - logs:GetLogEvents
            - logs:FilterLogEvents
            Resource: "*"
  ApiLoggingAccount:
    Type: AWS::ApiGateway::Account
    DependsOn:
    - CustomIdentityProviderApi
    Properties:
      CloudWatchRoleArn:
        Fn::GetAtt: ApiCloudWatchLogsRole.Arn
  ApiStage:
    Type: AWS::ApiGateway::Stage
    Properties:
      DeploymentId:
        Ref: ApiDeployment202008
      MethodSettings:
      - DataTraceEnabled: false
        HttpMethod: "*"
        LoggingLevel: INFO
        ResourcePath: "/*"
      RestApiId:
        Ref: CustomIdentityProviderApi
      StageName: prod
  ApiDeployment202008:
    DependsOn:
    - GetUserConfigRequest
    Type: AWS::ApiGateway::Deployment
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
  TransferIdentityProviderRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service: transfer.amazonaws.com
          Action:
          - sts:AssumeRole
      Policies:
      - PolicyName: TransferCanInvokeThisApi
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
            - execute-api:Invoke
            Resource:
              Fn::Sub: arn:${AWS::Partition}:execute-api:${AWS::Region}:${AWS::AccountId}:${CustomIdentityProviderApi}/prod/GET/*
      - PolicyName: TransferCanReadThisApi
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
            - apigateway:GET
            Resource: "*"
  GetUserConfigLambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Description: A function to lookup and return user data from AWS Secrets Manager.
      Handler: index.lambda_handler
      Role:
        Fn::GetAtt: LambdaExecutionRole.Arn
      Runtime: python3.7
      Environment:
        Variables:
          SecretsManagerRegion:
            Fn::If:
              - SecretsManagerRegionProvided
              - Ref: SecretsManagerRegion
              - Ref: AWS::Region
  GetUserConfigLambdaPermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:invokeFunction
      FunctionName:
        Fn::GetAtt: GetUserConfigLambda.Arn
      Principal: apigateway.amazonaws.com
      SourceArn:
        Fn::Sub: arn:${AWS::Partition}:execute-api:${AWS::Region}:${AWS::AccountId}:${CustomIdentityProviderApi}/*
  ServersResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
      ParentId:
        Fn::GetAtt:
        - CustomIdentityProviderApi
        - RootResourceId
      PathPart: servers
  ServerIdResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
      ParentId:
        Ref: ServersResource
      PathPart: "{serverId}"
  UsersResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
      ParentId:
        Ref: ServerIdResource
      PathPart: users
  UserNameResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
      ParentId:
        Ref: UsersResource
      PathPart: "{username}"
  GetUserConfigResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
      ParentId:
        Ref: UserNameResource
      PathPart: config
  GetUserConfigRequest:
    Type: AWS::ApiGateway::Method
    DependsOn: GetUserConfigResponseModel
    Properties:
      AuthorizationType: AWS_IAM
      HttpMethod: GET
      Integration:
        Type: AWS
        IntegrationHttpMethod: POST
        Uri:
          Fn::Join:
            - ""
            - - "arn:"
              - Ref: AWS::Partition
              - ":apigateway:"
              - Ref: AWS::Region
              - ":lambda:path/2015-03-31/functions/"
              - Fn::GetAtt:
                - GetUserConfigLambda
                - Arn
              - "/invocations"
        IntegrationResponses:
        - StatusCode: 200
        RequestTemplates:
          application/json: |
            {
              "username": "$util.urlDecode($input.params('username'))",
              "password": "$util.escapeJavaScript($input.params('Password')).replaceAll("\\'","'")",
              "protocol": "$input.params('protocol')",
              "serverId": "$input.params('serverId')",
              "sourceIp": "$input.params('sourceIp')"
            }
      RequestParameters:
        method.request.header.Password: false
      ResourceId:
        Ref: GetUserConfigResource
      RestApiId:
        Ref: CustomIdentityProviderApi
      MethodResponses:
      - StatusCode: 200
        ResponseModels:
          application/json: UserConfigResponseModel
  GetUserConfigResponseModel:
    Type: AWS::ApiGateway::Model
    Properties:
      RestApiId:
        Ref: CustomIdentityProviderApi
      ContentType: application/json
      Description: API response for GetUserConfig
      Name: UserConfigResponseModel
      Schema:
        "$schema": http://json-schema.org/draft-04/schema#
        title: UserUserConfig
        type: object
        properties:
          HomeDirectory:
            type: string
          Role:
            type: string
          Policy:
            type: string
          PublicKeys:
            type: array
            items:
              type: string

index.pyを書き換える

import os
import json
import boto3
import base64
from ipaddress import ip_network, ip_address
from botocore.exceptions import ClientError


def lambda_handler(event, context):
    # Get the required parameters
    required_param_list = ["serverId", "username", "protocol", "sourceIp"]
    for parameter in required_param_list:
        if parameter not in event:
            print("Incoming " + parameter + " missing - Unexpected")
            return {}

    input_serverId = event["serverId"]
    input_username = event["username"]
    input_protocol = event["protocol"]
    input_sourceIp = event["sourceIp"]
    input_password = event.get("password", "")

    print("ServerId: {}, Username: {}, Protocol: {}, SourceIp: {}"
          .format(input_serverId, input_username, input_protocol, input_sourceIp))

    # Check for password and set authentication type appropriately. No password means SSH auth
    print("Start User Authentication Flow")
    if input_password != "":
        print("Using PASSWORD authentication")
        authentication_type = "PASSWORD"
    else:
        if input_protocol == 'FTP' or input_protocol == 'FTPS':
            print("Empty password not allowed for FTP/S")
            return {}
        print("Using SSH authentication")
        authentication_type = "SSH"

    # Retrieve our user details from the secret. For all key-value pairs stored in SecretManager,
    # checking the protocol-specified secret first, then use generic ones.
    # e.g. If SFTPPassword and Password both exists, will be using SFTPPassword for authentication
    secret = get_secret(input_serverId + "/" + input_username)

    if secret is not None:
        secret_dict = json.loads(secret)
        # Run our password checks
        user_authenticated = authenticate_user(authentication_type, secret_dict, input_password, input_protocol)
        # Run sourceIp checks
        ip_match = check_ipaddress(secret_dict, input_sourceIp, input_protocol)

        if user_authenticated and ip_match:
            print("User authenticated, calling build_response with: " + authentication_type)
            return build_response(secret_dict, authentication_type, input_protocol)
        else:
            print("User failed authentication return empty response")
            return {}
    else:
        # Otherwise something went wrong. Most likely the object name is not there
        print("Secrets Manager exception thrown - Returning empty response")
        # Return an empty data response meaning the user was not authenticated
        return {}


def lookup(secret_dict, key, input_protocol):
    if input_protocol + key in secret_dict:
        print("Found protocol-specified {}".format(key))
        return secret_dict[input_protocol + key]
    else:
        return secret_dict.get(key, None)


def check_ipaddress(secret_dict, input_sourceIp, input_protocol):
    # accepted_ip_network = lookup(secret_dict, "AcceptedIpNetwork", input_protocol)
    accepted_ip_network = lookup(secret_dict, "AcceptedIPNetwork", input_protocol)
    if not accepted_ip_network:
        # No IP provided so skip checks
        print("No IP range provided - Skip IP check")
        return True

    net = ip_network(accepted_ip_network)
    if ip_address(input_sourceIp) in net:
        print("Source IP address match")
        return True
    else:
        print("Source IP address not in range")
        return False


def authenticate_user(auth_type, secret_dict, input_password, input_protocol):
    # Function returns True if: auth_type is password and passwords match or auth_type is SSH. Otherwise returns False
    if auth_type == "SSH":
        # Place for additional checks in future
        print("Skip password check as SSH login request")
        return True
    # auth_type could only be SSH or PASSWORD
    else:
        # Retrieve the password from the secret if exists
        password = lookup(secret_dict, "Password", input_protocol)
        if not password:
            print("Unable to authenticate user - No field match in Secret for password")
            return False

        if input_password == password:
            return True
        else:
            print("Unable to authenticate user - Incoming password does not match stored")
            return False


# Build out our response data for an authenticated response
def build_response(secret_dict, auth_type, input_protocol):
    response_data = {}
    # Check for each key value pair. These are required so set to empty string if missing
    role = lookup(secret_dict, "Role", input_protocol)
    if role:
        response_data["Role"] = role
    else:
        print("No field match for role - Set empty string in response")
        response_data["Role"] = ""

    # These are optional so ignore if not present
    policy = lookup(secret_dict, "Policy", input_protocol)
    if policy:
        response_data["Policy"] = policy

    # External Auth providers support chroot and virtual folder assignments so we'll check for that
    home_directory_details = lookup(secret_dict, "HomeDirectoryDetails", input_protocol)
    if home_directory_details:
        print("HomeDirectoryDetails found - Applying setting for virtual folders - "
              "Note: Cannot be used in conjunction with key: HomeDirectory")
        response_data["HomeDirectoryDetails"] = home_directory_details
        # If we have a virtual folder setup then we also need to set HomeDirectoryType to "Logical"
        print("Setting HomeDirectoryType to LOGICAL")
        response_data["HomeDirectoryType"] = "LOGICAL"

    # Note that HomeDirectory and HomeDirectoryDetails / Logical mode
    # can't be used together but we're not checking for this
    home_directory = lookup(secret_dict, "HomeDirectory", input_protocol)
    if home_directory:
        print("HomeDirectory found - Note: Cannot be used in conjunction with key: HomeDirectoryDetails")
        response_data["HomeDirectory"] = home_directory

    if auth_type == "SSH":
        public_key = lookup(secret_dict, "PublicKey", input_protocol)
        if public_key:
            response_data["PublicKeys"] = [public_key]
        else:
            # SSH Auth Flow - We don't have keys so we can't help
            print("Unable to authenticate user - No public keys found")
            return {}

    PosixProfile = lookup(secret_dict, "PosixProfile", input_protocol)
    if PosixProfile:
        print("Uid found")
        response_data["PosixProfile"] = [PosixProfile]

    return response_data


def get_secret(id):
    region = os.environ["SecretsManagerRegion"]
    print("Secrets Manager Region: " + region)
    print("Secret Name: " + id)

    # Create a Secrets Manager client
    client = boto3.session.Session().client(service_name="secretsmanager", region_name=region)

    try:
        resp = client.get_secret_value(SecretId=id)
        # Decrypts secret using the associated KMS CMK.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if "SecretString" in resp:
            print("Found Secret String")
            return resp["SecretString"]
        else:
            print("Found Binary Secret")
            return base64.b64decode(resp["SecretBinary"])
    except ClientError as err:
        print("Error Talking to SecretsManager: " + err.response["Error"]["Code"] + ", Message: " +
              err.response["Error"]["Message"])
        return None

構築する

スタックの作成

SecretsManagerで Secret を作成

{
  "Password": "test",
  "Role": "arn:aws:iam::588439015833:role/sftp-role",
  "SFTPAcceptedIPNetwork": "0.0.0.0/0",
  "PosixProfile": {
    "Uid": "「UserID」",
    "Gid": "「GroupID」"
  }
}

接続

  • 上記で接続できるようになったはずです
  • WinSCPなどのSFTPクライアントを使用して接続してみてください

接続できなかった場合

  • VPCエンドポイントのセキュリティグループの確認
  • lambdaのログを確認
  • SecretManagerのjsonのRoleの権限を確認(以下の権限が必要)
    • AmazonElasticFileSystemFullAccess
    • AmazonElasticFileSystemClientFullAccess

おまけ__SecretManagerでアクセス制御する

  • SecretManagerのjson「SFTPAcceptedIPNetwork」は、IPの範囲を指定しているので、特定のIPのみ接続を許可するように設定を変更する
  • SecretManagerのJsonを以下のように書き換えます
{
  "Password": "test",
  "Role": "arn:aws:iam::588439015833:role/sftp-role",
  "SFTPAcceptedIPNetwork": [
    "特定のIPアドレス",
    "特定のIPアドレス"
  ],
  "PosixProfile": {
    "Uid": "3101",
    "Gid": "3100"
  }
}
  • Lambdaの71~85行目をコメントアウト。88行目に以下のコードを追記します
  # def check_ipaddress(secret_dict, input_sourceIp, input_protocol):
  #     # accepted_ip_network = lookup(secret_dict, "AcceptedIpNetwork", input_protocol)
  #     accepted_ip_network = lookup(secret_dict, "AcceptedIPNetwork", input_protocol)
  #     if not accepted_ip_network:
  #         # No IP provided so skip checks
  #         print("No IP range provided - Skip IP check")
  #         return True

  #     net = ip_network(accepted_ip_network)
  #     if ip_address(input_sourceIp) in net:
  #         print("Source IP address match")
  #         return True
  #     else:
  #         print("Source IP address not in range")
  #         return False


  def check_ipaddress(secret_dict, input_sourceIp, input_protocol):
      # accepted_ip_network = lookup(secret_dict, "AcceptedIpNetwork", input_protocol)
      accepted_ip_network = lookup(secret_dict, "AcceptedIPNetwork", input_protocol)
      if not accepted_ip_network:
          # No IP provided so skip checks
          print("No IP range provided - Skip IP check")
          return True

      ALLOW_LIST = accepted_ip_network
      print(ALLOW_LIST)

      # net = ip_network(accepted_ip_network)
      for wl in ALLOW_LIST:
          ip_nw = wl
          if input_sourceIp == ip_nw:
              print(input_sourceIp)
              print("Source IP address match")
              return True
          else:
              print(input_sourceIp)
              print("Source IP address not match")
              return False

おわり

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1