概要
Snowflake Open CatalogのチュートリアルであるGetting started with Snowflake Open Catalog
のユースケース1をAzure Storageで実施した際、INSERT処理がエラーとなった際の対応方法を共有します。OSS の Polaris Catalog のドキュメントにて、下記のように記述されているため、Spark では動作しないようです。なお、本記事での検証はGoogle ColabのSpark環境を利用しています。
Apache Spark can’t use credential vending
Snowflake Open Catalog に関する基本的な情報
本記事の作業内容は、Sparkを使ってSnowflake Open Catalogを通じ、Azure Storage(今回はS3 bucketではなくAzure Storage)にデータを書き込む処理です。
出所:Getting started with Snowflake Open Catalog | Snowflake Documentation
公式ドキュメントでは以下のコードが提示されていますが、このままではエラーが発生しました。
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.credential','000000000000000000000000000=:1111111111111111111111111111111111111111111=') \
.config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
.config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:data_engineer') \
.getOrCreate()
出所:サービス接続の登録 | Snowflake Documentation
公式ドキュメントでは、BlobとADLSの両方が指定可能となっていますが、ADLSでの動作方法については十分に確認できていません。また、BlobのURIについて、公式ドキュメントでは「abfss」と記載されていますが、実際は「wasbs」が正しい表記となります。
出所:Create a catalog | Snowflake Documentation
事前準備
Google Colab 上でノートブックを作成して Spark が動作することを確認
!pyspark --version
Blob エンドポイントの Snowflake Open Catalog カタログを作成
正常に動作した方法
Blob エンドポイントを登録したカタログに対する INSERT 処理
# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10"
principal_role_name = "spark_all"
# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeiceberg0123456"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages',
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
'org.apache.hadoop:hadoop-azure:3.4.1') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
.config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
.config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
.config(f'spark.hadoop.fs.azure.account.key.{azure_storage_account_name}.blob.core.windows.net', azure_storage_account_key) \
.getOrCreate()
spark.sql("create namespace if not exists spark_demo")
spark.sql("create or replace table spark_demo.test_table (col1 int) using iceberg")
# insert a record in the table
spark.sql("insert into spark_demo.test_table values (1)");
spark.table("spark_demo.test_table").show()
エラーとなった方法
ドキュメントのまま実行する方法
# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10"
principal_role_name = "spark_all"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,com.azure:azure-storage-blob:12.14.2') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
.config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
.config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
.getOrCreate()
#insert a record in the table
spark.sql("insert into spark_demo.test_table values (1)");
Py4JJavaError: An error occurred while calling o47.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (3754e32b0da4 executor driver): java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
org.apache.hadoop:hadoop-azure を追加
# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10"
principal_role_name = "spark_all"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages',
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
'org.apache.hadoop:hadoop-azure:3.4.1') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
.config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
.config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
.getOrCreate()
# insert a record in the table
spark.sql("insert into spark_demo.test_table values (1)");
Py4JJavaError: An error occurred while calling o47.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (2a8859124b60 executor driver): org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: wasbs://snowflake@snowflake.blob.core.windows.net/blob_test/spark_demo/test_table/data/00000-1-0261d99b-77be-4b63-b670-1d71b31f93c2-00001.parquet
ADLS エンドポイントを登録したカタログに対する INSERT 処理
# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10__abfs"
principal_role_name = "spark_all"
# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeiceberg0123456"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.jars.packages',
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
'org.apache.hadoop:hadoop-azure:3.4.1') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config('spark.sql.defaultCatalog', 'opencatalog') \
.config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.opencatalog.type', 'rest') \
.config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
.config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
.config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
.config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
.config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
.config(f'spark.hadoop.fs.azure.account.key.{azure_storage_account_name}.dfs.core.windows.net', azure_storage_account_key) \
.getOrCreate()
spark.sql("create namespace if not exists spark_demo")
spark.sql("create or replace table spark_demo.test_table12 (col1 int) using iceberg")
Py4JJavaError: An error occurred while calling o51.sql.
: org.apache.iceberg.exceptions.BadRequestException: Malformed request: Cannot create table on external catalogs.