はじめに
Snowflakeの外部関数(External Function)を使って、AWS API Gateway + Lambdaと連携するハンズオンを紹介します。
今回は売上データの金額をUSDに変換する外部関数を作成しています。
1. 概要
- 目的: Snowflakeから外部API(AWS Lambda)を呼び出すための設定方法を確認する
- 技術要素: Snowflake External Function, AWS Lambda, API Gateway, IAM
2. 構成
3. AWS側の準備
3.1 Lambda関数の作成
1.AWS Lambdaで新規関数を作成する
2.ランタイムはPythonを選択する
3.以下のコードを設定:
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Snowflake外部関数用のLambda関数
Snowflakeから送られてきた売上データ(金額と通貨)を、USD(ドル)に換算して返す
"""
try:
logger.info(f"Received event: {json.dumps(event)}")
# イベントからデータを取得
data = event.get('data', [])
logger.info(f"Processing {len(data)} rows of data")
# 為替レート(固定値にしています)
rates = {"JPY": 0.0065, "EUR": 1.1, "USD": 1.0, "CNY": 0.14}
# 処理結果を格納するリスト
results = []
for i, row in enumerate(data):
try:
logger.info(f"Processing row {i}: {row}")
# 各行のデータを処理(インデックス, 金額, 通貨の順)
if len(row) < 3:
logger.warning(f"Row {i}: Insufficient data, expected 3 values, got {len(row)}")
results.append([i, None])
continue
index = row[0] # インデックス
amount = row[1] # 金額
currency = row[2] # 通貨
logger.info(f"Row {i}: index={index}, amount={amount}, currency={currency}")
# データ型の検証
if amount is None or currency is None:
logger.warning(f"Row {i}: Null values detected - amount: {amount}, currency: {currency}")
results.append([index, None])
continue
# 数値変換
try:
amount = float(amount)
except (ValueError, TypeError):
logger.warning(f"Row {i}: Invalid amount value: {amount}")
results.append([index, None])
continue
# USD換算
if currency in rates:
usd_amount = amount * rates[currency]
else:
logger.warning(f"Row {i}: Unknown currency: {currency}, using USD rate")
usd_amount = amount * rates.get("USD", 1.0)
logger.info(f"Row {i}: {amount} {currency} = {usd_amount} USD")
# 結果を配列として追加(Snowflakeが期待する形式: [インデックス, 値])
results.append([index, usd_amount])
except Exception as row_error:
logger.error(f"Row {i}: Error processing row: {str(row_error)}")
results.append([i, None])
# Snowflakeが期待する形式でレスポンスを返す
response = {
'data': results
}
logger.info(f"Final response: {json.dumps(response)}")
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(response)
}
except Exception as e:
logger.error(f"Critical error: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({'error': str(e)})
}
3.2 IAMロールの設定
1.IAMロールの作成では、信頼されたエンティティタイプでAWSアカウントを選択して作成する
(ポリシーのアタッチは不要)

3.3 API Gatewayの作成
1.REST APIを新規作成する
2./ ルートにPOSTメソッドで認可をAWS_IAMを選択してリソースを作成し、上記Lambdaを統合する

3.(プロキシ統合の有効化を飛ばして進めてしまったため)統合レスポンスのマッピングテンプレートを以下のように設定する
#set($inputRoot = $input.path('$'))
$inputRoot.body
4.デプロイしてエンドポイントURLを取得(例: https://xxxxxx.execute-api-ap-northeast-1.amazonaws.com/dev)


5.APIGatewayのリソースポリシーが以下のように設定されていることを確認する
{
"Version": "2012-10-17",
"Statement":
[
{
"Effect": "Allow",
"Principal":
{
"AWS": "arn:aws:sts::<accountID>:assumed-role/<external_function_role>/snowflake"
},
"Action": "execute-api:Invoke",
"Resource": "<metchod_arn>"
}
]
}
-
<accountID>:AWSのアカウントID -
<external_function_role>:作成したIAMロール名 -
<metchod_arn>:APIGatewayのメソッドARN
4. Snowflake側の準備
4.1 テーブル作成&テストデータ投入
Salesテーブルを作成し、テストデータを投入する
CREATE OR REPLACE TABLE sales (
id INT,
customer_name STRING,
amount FLOAT,
currency STRING,
sales_date DATE
);
INSERT INTO sales (id, customer_name, amount, currency, sales_date) VALUES
(1, '田中商事', 1000, 'JPY', '2024-06-01'),
(2, 'Suzuki Ltd.', 200, 'USD', '2024-06-02'),
(3, 'Müller GmbH', 150, 'EUR', '2024-06-03'),
(4, 'Wang Corp', 800, 'CNY', '2024-06-04'),
(5, '田中商事', 2000, 'JPY', '2024-06-15'),
(6, 'Suzuki Ltd.', 500, 'USD', '2024-06-16'),
(7, 'Müller GmbH', 300, 'EUR', '2024-06-17'),
(8, 'Wang Corp', 1200, 'CNY', '2024-06-18'),
(9, '田中商事', 1500, 'JPY', '2024-07-01'),
(10, 'Suzuki Ltd.', 800, 'USD', '2024-07-02'),
(11, 'Müller GmbH', 400, 'EUR', '2024-07-03'),
(12, 'Wang Corp', 2000, 'CNY', '2024-07-04');
4.2 API INTEGRATION作成
以下コマンドでAPI統合を作成する
CREATE OR REPLACE API INTEGRATION my_api_integration
API_PROVIDER = aws_api_gateway
API_AWS_ROLE_ARN = '<API_GATEWAY用のIAMロールARN>'
ENABLED = TRUE
API_ALLOWED_PREFIXES = ('<API呼び出しURL>');
作成したAPI統合の詳細をdescribe integrationで確認する

- 実行結果の
API_AWS_IAM_USER_ARNとAPI_AWS_EXTERNAL_IDの値をメモする
4.3 外部関数作成
CREATE OR REPLACE EXTERNAL FUNCTION convert_to_usd(amount FLOAT, currency STRING)
RETURNS FLOAT
API_INTEGRATION = my_api_integration
HEADERS = ( 'Content-Type' = 'application/json' )
MAX_BATCH_ROWS = 100
COMPRESSION = NONE
AS 'https://xxxxxx.execute-api-ap-northeast-1.amazonaws.com/dev/';
5. IAMロールの信頼ポリシー修正
API統合の確認でメモしたAPI_AWS_IAM_USER_ARNとAPI_AWS_EXTERNAL_IDで信頼ポリシーを次のように修正する
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "<API_AWS_IAM_USER_ARN>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<API_AWS_EXTERNAL_ID>"
}
}
}
]
}
6. 実践クエリ例
6.1 売上データをUSD換算で一覧表示
SELECT
id,
customer_name,
amount,
currency,
sales_date,
convert_to_usd(amount, currency) AS usd_amount
FROM sales;
6.2 月別・通貨別の売上合計とUSD換算合計
SELECT
TO_CHAR(sales_date, 'YYYY-MM') AS month,
currency,
SUM(amount) AS total_amount,
SUM(convert_to_usd(amount, currency)) AS total_usd
FROM sales
GROUP BY month, currency
ORDER BY month, currency;
6.3 USD換算で売上上位の顧客ランキング
SELECT
customer_name,
SUM(convert_to_usd(amount, currency)) AS total_usd
FROM sales
GROUP BY customer_name
ORDER BY total_usd DESC
LIMIT 5;
7. やってみた感想
今回はUDFやストアドプロシージャを使えばSnowflake内部で完結できる処理の内容でしたが、外部関数を使うことで使用できる言語の幅が広がることはメリットかなと感じました。また、AWSの他サービスとの連携を必要とするような高度な処理を行う際には必要となってくる機能だと思います。
気になった方は是非試してみてください。
8.追記
プロキシ統合を有効化した場合はLambda側でeventから値の受け取る部分を少し修正する必要があります。
コードを以下のように修正することで動作しました。
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Snowflake外部関数用のLambda関数 - API Gatewayプロキシ統合対応版
API Gatewayプロキシ統合が有効な場合のイベント構造に対応
"""
try:
logger.info(f"Received event: {json.dumps(event)}")
# プロキシ統合の場合、bodyは文字列として送信される
body = event.get('body', '')
logger.info(f"Body type: {type(body)}")
logger.info(f"Body content: {body}")
# bodyが空でないかチェック
if not body:
logger.error("Body is empty")
return {
'statusCode': 400,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({'error': 'Empty request body'})
}
# JSON解析
try:
parsed_body = json.loads(body)
logger.info(f"Parsed body: {json.dumps(parsed_body)}")
except json.JSONDecodeError as json_error:
logger.error(f"JSON parsing error: {json_error}")
return {
'statusCode': 400,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({'error': f'Invalid JSON: {str(json_error)}'})
}
# Snowflakeからのデータを取得
data = parsed_body.get('data', [])
logger.info(f"Processing {len(data)} rows of data")
# 為替レート(固定値)
rates = {"JPY": 0.0065, "EUR": 1.1, "USD": 1.0, "CNY": 0.14}
# 処理結果を格納するリスト
results = []
for i, row in enumerate(data):
try:
logger.info(f"Processing row {i}: {row}")
# 各行のデータを処理(インデックス, 金額, 通貨の順)
if len(row) < 3:
logger.warning(f"Row {i}: Insufficient data, expected 3 values, got {len(row)}")
results.append([i, None])
continue
index = row[0] # インデックス
amount = row[1] # 金額
currency = row[2] # 通貨
logger.info(f"Row {i}: index={index}, amount={amount}, currency={currency}")
# データ型の検証
if amount is None or currency is None:
logger.warning(f"Row {i}: Null values detected - amount: {amount}, currency: {currency}")
results.append([index, None])
continue
# 数値変換
try:
amount = float(amount)
except (ValueError, TypeError):
logger.warning(f"Row {i}: Invalid amount value: {amount}")
results.append([index, None])
continue
# USD換算
if currency in rates:
usd_amount = amount * rates[currency]
else:
logger.warning(f"Row {i}: Unknown currency: {currency}, using USD rate")
usd_amount = amount * rates.get("USD", 1.0)
logger.info(f"Row {i}: {amount} {currency} = {usd_amount} USD")
# 結果を配列として追加(Snowflakeが期待する形式: [インデックス, 値])
results.append([index, usd_amount])
except Exception as row_error:
logger.error(f"Row {i}: Error processing row: {str(row_error)}")
results.append([i, None])
# Snowflakeが期待する形式でレスポンスを返す
response = {
'data': results
}
logger.info(f"Final response: {json.dumps(response)}")
# プロキシ統合用のレスポンス形式
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(response)
}
except Exception as e:
logger.error(f"Critical error: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({'error': str(e)})
}



