2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflake外部関数 × AWS API Gateway + Lambda連携ハンズオン

Last updated at Posted at 2025-07-07

はじめに

Snowflakeの外部関数(External Function)を使って、AWS API Gateway + Lambdaと連携するハンズオンを紹介します。
今回は売上データの金額をUSDに変換する外部関数を作成しています。

1. 概要

  • 目的: Snowflakeから外部API(AWS Lambda)を呼び出すための設定方法を確認する
  • 技術要素: Snowflake External Function, AWS Lambda, API Gateway, IAM

2. 構成

3. AWS側の準備

3.1 Lambda関数の作成

1.AWS Lambdaで新規関数を作成する
2.ランタイムはPythonを選択する
3.以下のコードを設定:

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    Snowflake外部関数用のLambda関数
    Snowflakeから送られてきた売上データ(金額と通貨)を、USD(ドル)に換算して返す
    """
    try:
        logger.info(f"Received event: {json.dumps(event)}")
        
        # イベントからデータを取得
        data = event.get('data', [])
        logger.info(f"Processing {len(data)} rows of data")
        
        # 為替レート(固定値にしています)
        rates = {"JPY": 0.0065, "EUR": 1.1, "USD": 1.0, "CNY": 0.14}
        
        # 処理結果を格納するリスト
        results = []
        
        for i, row in enumerate(data):
            try:
                logger.info(f"Processing row {i}: {row}")
                
                # 各行のデータを処理(インデックス, 金額, 通貨の順)
                if len(row) < 3:
                    logger.warning(f"Row {i}: Insufficient data, expected 3 values, got {len(row)}")
                    results.append([i, None])
                    continue
                
                index = row[0]  # インデックス
                amount = row[1]  # 金額
                currency = row[2]  # 通貨
                
                logger.info(f"Row {i}: index={index}, amount={amount}, currency={currency}")
                
                # データ型の検証
                if amount is None or currency is None:
                    logger.warning(f"Row {i}: Null values detected - amount: {amount}, currency: {currency}")
                    results.append([index, None])
                    continue
                
                # 数値変換
                try:
                    amount = float(amount)
                except (ValueError, TypeError):
                    logger.warning(f"Row {i}: Invalid amount value: {amount}")
                    results.append([index, None])
                    continue
                
                # USD換算
                if currency in rates:
                    usd_amount = amount * rates[currency]
                else:
                    logger.warning(f"Row {i}: Unknown currency: {currency}, using USD rate")
                    usd_amount = amount * rates.get("USD", 1.0)
                
                logger.info(f"Row {i}: {amount} {currency} = {usd_amount} USD")
                
                # 結果を配列として追加(Snowflakeが期待する形式: [インデックス, 値])
                results.append([index, usd_amount])
                
            except Exception as row_error:
                logger.error(f"Row {i}: Error processing row: {str(row_error)}")
                results.append([i, None])
        
        # Snowflakeが期待する形式でレスポンスを返す
        response = {
            'data': results
        }
        
        logger.info(f"Final response: {json.dumps(response)}")
        
        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps(response)
        }
        
    except Exception as e:
        logger.error(f"Critical error: {str(e)}")
        import traceback
        logger.error(f"Traceback: {traceback.format_exc()}")
        return {
            'statusCode': 500,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps({'error': str(e)})
        }

3.2 IAMロールの設定

1.IAMロールの作成では、信頼されたエンティティタイプでAWSアカウントを選択して作成する
(ポリシーのアタッチは不要)
タイトルなし.png

3.3 API Gatewayの作成

1.REST APIを新規作成する
2./ ルートにPOSTメソッドで認可をAWS_IAMを選択してリソースを作成し、上記Lambdaを統合する
タイトルなし.png

3.(プロキシ統合の有効化を飛ばして進めてしまったため)統合レスポンスのマッピングテンプレートを以下のように設定する

#set($inputRoot = $input.path('$'))
$inputRoot.body

タイトルなし.png

4.デプロイしてエンドポイントURLを取得(例: https://xxxxxx.execute-api-ap-northeast-1.amazonaws.com/dev
image.png
タイトルなし.png

5.APIGatewayのリソースポリシーが以下のように設定されていることを確認する

{
    "Version": "2012-10-17",
    "Statement":
    [
        {
        "Effect": "Allow",
        "Principal":
            {
            "AWS": "arn:aws:sts::<accountID>:assumed-role/<external_function_role>/snowflake"
            },
        "Action": "execute-api:Invoke",
        "Resource": "<metchod_arn>"
        }
    ]
}
  • <accountID>:AWSのアカウントID
  • <external_function_role>:作成したIAMロール名
  • <metchod_arn>:APIGatewayのメソッドARN

4. Snowflake側の準備

4.1 テーブル作成&テストデータ投入

Salesテーブルを作成し、テストデータを投入する

CREATE OR REPLACE TABLE sales (
  id INT,
  customer_name STRING,
  amount FLOAT,
  currency STRING,
  sales_date DATE
);

INSERT INTO sales (id, customer_name, amount, currency, sales_date) VALUES
  (1, '田中商事', 1000, 'JPY', '2024-06-01'),
  (2, 'Suzuki Ltd.', 200, 'USD', '2024-06-02'),
  (3, 'Müller GmbH', 150, 'EUR', '2024-06-03'),
  (4, 'Wang Corp', 800, 'CNY', '2024-06-04'),
  (5, '田中商事', 2000, 'JPY', '2024-06-15'),
  (6, 'Suzuki Ltd.', 500, 'USD', '2024-06-16'),
  (7, 'Müller GmbH', 300, 'EUR', '2024-06-17'),
  (8, 'Wang Corp', 1200, 'CNY', '2024-06-18'),
  (9, '田中商事', 1500, 'JPY', '2024-07-01'),
  (10, 'Suzuki Ltd.', 800, 'USD', '2024-07-02'),
  (11, 'Müller GmbH', 400, 'EUR', '2024-07-03'),
  (12, 'Wang Corp', 2000, 'CNY', '2024-07-04');

4.2 API INTEGRATION作成

以下コマンドでAPI統合を作成する

CREATE OR REPLACE API INTEGRATION my_api_integration
  API_PROVIDER = aws_api_gateway
  API_AWS_ROLE_ARN = '<API_GATEWAY用のIAMロールARN>'
  ENABLED = TRUE
  API_ALLOWED_PREFIXES = ('<API呼び出しURL>');

作成したAPI統合の詳細をdescribe integrationで確認する
タイトルなし.png

  • 実行結果のAPI_AWS_IAM_USER_ARNAPI_AWS_EXTERNAL_IDの値をメモする

4.3 外部関数作成

CREATE OR REPLACE EXTERNAL FUNCTION convert_to_usd(amount FLOAT, currency STRING)
  RETURNS FLOAT
  API_INTEGRATION = my_api_integration
  HEADERS = ( 'Content-Type' = 'application/json' )
  MAX_BATCH_ROWS = 100
  COMPRESSION = NONE
  AS 'https://xxxxxx.execute-api-ap-northeast-1.amazonaws.com/dev/';

5. IAMロールの信頼ポリシー修正

API統合の確認でメモしたAPI_AWS_IAM_USER_ARNAPI_AWS_EXTERNAL_IDで信頼ポリシーを次のように修正する

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
				"AWS": "<API_AWS_IAM_USER_ARN>"
			},
			"Action": "sts:AssumeRole",
			"Condition": {
			    "StringEquals": {
			        "sts:ExternalId": "<API_AWS_EXTERNAL_ID>" 
			}
			}
		}
	]
}

6. 実践クエリ例

6.1 売上データをUSD換算で一覧表示

SELECT
  id,
  customer_name,
  amount,
  currency,
  sales_date,
  convert_to_usd(amount, currency) AS usd_amount
FROM sales;

image.png

6.2 月別・通貨別の売上合計とUSD換算合計

SELECT
  TO_CHAR(sales_date, 'YYYY-MM') AS month,
  currency,
  SUM(amount) AS total_amount,
  SUM(convert_to_usd(amount, currency)) AS total_usd
FROM sales
GROUP BY month, currency
ORDER BY month, currency;

image.png

6.3 USD換算で売上上位の顧客ランキング

SELECT
  customer_name,
  SUM(convert_to_usd(amount, currency)) AS total_usd
FROM sales
GROUP BY customer_name
ORDER BY total_usd DESC
LIMIT 5;

image.png

7. やってみた感想

今回はUDFやストアドプロシージャを使えばSnowflake内部で完結できる処理の内容でしたが、外部関数を使うことで使用できる言語の幅が広がることはメリットかなと感じました。また、AWSの他サービスとの連携を必要とするような高度な処理を行う際には必要となってくる機能だと思います。

気になった方は是非試してみてください。

8.追記

プロキシ統合を有効化した場合はLambda側でeventから値の受け取る部分を少し修正する必要があります。
コードを以下のように修正することで動作しました。

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    Snowflake外部関数用のLambda関数 - API Gatewayプロキシ統合対応版
    API Gatewayプロキシ統合が有効な場合のイベント構造に対応
    """
    try:
        logger.info(f"Received event: {json.dumps(event)}")
        
        # プロキシ統合の場合、bodyは文字列として送信される
        body = event.get('body', '')
        logger.info(f"Body type: {type(body)}")
        logger.info(f"Body content: {body}")
        
        # bodyが空でないかチェック
        if not body:
            logger.error("Body is empty")
            return {
                'statusCode': 400,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': json.dumps({'error': 'Empty request body'})
            }
        
        # JSON解析
        try:
            parsed_body = json.loads(body)
            logger.info(f"Parsed body: {json.dumps(parsed_body)}")
        except json.JSONDecodeError as json_error:
            logger.error(f"JSON parsing error: {json_error}")
            return {
                'statusCode': 400,
                'headers': {
                    'Content-Type': 'application/json',
                    'Access-Control-Allow-Origin': '*'
                },
                'body': json.dumps({'error': f'Invalid JSON: {str(json_error)}'})
            }
        
        # Snowflakeからのデータを取得
        data = parsed_body.get('data', [])
        logger.info(f"Processing {len(data)} rows of data")
        
        # 為替レート(固定値)
        rates = {"JPY": 0.0065, "EUR": 1.1, "USD": 1.0, "CNY": 0.14}
        
        # 処理結果を格納するリスト
        results = []
        
        for i, row in enumerate(data):
            try:
                logger.info(f"Processing row {i}: {row}")
                
                # 各行のデータを処理(インデックス, 金額, 通貨の順)
                if len(row) < 3:
                    logger.warning(f"Row {i}: Insufficient data, expected 3 values, got {len(row)}")
                    results.append([i, None])
                    continue
                
                index = row[0]  # インデックス
                amount = row[1]  # 金額
                currency = row[2]  # 通貨
                
                logger.info(f"Row {i}: index={index}, amount={amount}, currency={currency}")
                
                # データ型の検証
                if amount is None or currency is None:
                    logger.warning(f"Row {i}: Null values detected - amount: {amount}, currency: {currency}")
                    results.append([index, None])
                    continue
                
                # 数値変換
                try:
                    amount = float(amount)
                except (ValueError, TypeError):
                    logger.warning(f"Row {i}: Invalid amount value: {amount}")
                    results.append([index, None])
                    continue
                
                # USD換算
                if currency in rates:
                    usd_amount = amount * rates[currency]
                else:
                    logger.warning(f"Row {i}: Unknown currency: {currency}, using USD rate")
                    usd_amount = amount * rates.get("USD", 1.0)
                
                logger.info(f"Row {i}: {amount} {currency} = {usd_amount} USD")
                
                # 結果を配列として追加(Snowflakeが期待する形式: [インデックス, 値])
                results.append([index, usd_amount])
                
            except Exception as row_error:
                logger.error(f"Row {i}: Error processing row: {str(row_error)}")
                results.append([i, None])
        
        # Snowflakeが期待する形式でレスポンスを返す
        response = {
            'data': results
        }
        
        logger.info(f"Final response: {json.dumps(response)}")
        
        # プロキシ統合用のレスポンス形式
        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps(response)
        }
        
    except Exception as e:
        logger.error(f"Critical error: {str(e)}")
        import traceback
        logger.error(f"Traceback: {traceback.format_exc()}")
        return {
            'statusCode': 500,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps({'error': str(e)})
        }

参考

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?