0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Storage で構築した Snowflake Open Catalog の外部テーブルに INSERT する際のエラーに対する暫定対応方法

Last updated at Posted at 2025-03-04

概要

Snowflake Open CatalogのチュートリアルであるGetting started with Snowflake Open Catalogのユースケース1をAzure Storageで実施した際、INSERT処理がエラーとなった際の対応方法を共有します。OSS の Polaris Catalog のドキュメントにて、下記のように記述されているため、Spark では動作しないようです。なお、本記事での検証はGoogle ColabのSpark環境を利用しています。

image.png

Apache Spark can’t use credential vending

出所:Overview | Apache Polaris

Snowflake Open Catalog に関する基本的な情報

本記事の作業内容は、Sparkを使ってSnowflake Open Catalogを通じ、Azure Storage(今回はS3 bucketではなくAzure Storage)にデータを書き込む処理です。

image.png

出所:Getting started with Snowflake Open Catalog | Snowflake Documentation

公式ドキュメントでは以下のコードが提示されていますが、このままではエラーが発生しました。

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,org.apache.iceberg:iceberg-azure-bundle:1.5.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.uri','https://ab12345.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.credential','000000000000000000000000000=:1111111111111111111111111111111111111111111=') \
    .config('spark.sql.catalog.opencatalog.warehouse','Catalog1') \
    .config('spark.sql.catalog.opencatalog.scope','PRINCIPAL_ROLE:data_engineer') \
    .getOrCreate()

image.png

出所:サービス接続の登録 | Snowflake Documentation

公式ドキュメントでは、BlobとADLSの両方が指定可能となっていますが、ADLSでの動作方法については十分に確認できていません。また、BlobのURIについて、公式ドキュメントでは「abfss」と記載されていますが、実際は「wasbs」が正しい表記となります。

image.png

出所:Create a catalog | Snowflake Documentation

事前準備

Google Colab 上でノートブックを作成して Spark が動作することを確認

!pyspark --version

image.png

Blob エンドポイントの Snowflake Open Catalog カタログを作成

image.png

正常に動作した方法

Blob エンドポイントを登録したカタログに対する INSERT 処理

# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10"
principal_role_name = "spark_all"

image.png

# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeiceberg0123456"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="

image.png

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages',
            'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
            'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
            'org.apache.hadoop:hadoop-azure:3.4.1') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
    .config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
    .config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
    .config(f'spark.hadoop.fs.azure.account.key.{azure_storage_account_name}.blob.core.windows.net', azure_storage_account_key) \
    .getOrCreate()

image.png

spark.sql("create namespace if not exists spark_demo")
spark.sql("create or replace table spark_demo.test_table (col1 int) using iceberg")

image.png

# insert a record in the table
spark.sql("insert into spark_demo.test_table values (1)");

spark.table("spark_demo.test_table").show()

image.png

エラーとなった方法

ドキュメントのまま実行する方法

# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10"
principal_role_name = "spark_all"

image.png

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,com.azure:azure-storage-blob:12.14.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
    .config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
    .config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
    .getOrCreate()
#insert a record in the table
spark.sql("insert into spark_demo.test_table values (1)");

Py4JJavaError: An error occurred while calling o47.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (3754e32b0da4 executor driver): java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found

image.png

org.apache.hadoop:hadoop-azure を追加

# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10"
principal_role_name = "spark_all"

image.png

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages',
            'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
            'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
            'org.apache.hadoop:hadoop-azure:3.4.1') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
    .config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
    .config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
    .getOrCreate()
# insert a record in the table
spark.sql("insert into spark_demo.test_table values (1)");

Py4JJavaError: An error occurred while calling o47.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (2a8859124b60 executor driver): org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: wasbs://snowflake@snowflake.blob.core.windows.net/blob_test/spark_demo/test_table/data/00000-1-0261d99b-77be-4b63-b670-1d71b31f93c2-00001.parquet

image.png

ADLS エンドポイントを登録したカタログに対する INSERT 処理

image.png

# Snowflake Open Catalog に関する情報をセット
open_catalog_account_identifier = "lampvrn-test123"
client_id = "8bq6hVXHOxV5ZKJQfb="
client_secret = "52/mD7sDxUQWjgZgxGCx37lO2x02fV="
catalog_name = "manabian_10__abfs"
principal_role_name = "spark_all"
# Azure Storage に関る情報をセット
azure_storage_account_name = "snowflakeiceberg0123456"
azure_storage_account_key = "kKRn6nruDHEhUF0AQzM1PFlXtXs9V0BJQSrn6Z8GvJOsb0JHflV9Fn=="

image.png

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
    .config('spark.jars.packages',
            'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1,'
            'org.apache.iceberg:iceberg-azure-bundle:1.5.2,'
            'org.apache.hadoop:hadoop-azure:3.4.1') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.defaultCatalog', 'opencatalog') \
    .config('spark.sql.catalog.opencatalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.opencatalog.type', 'rest') \
    .config('spark.sql.catalog.opencatalog.header.X-Iceberg-Access-Delegation','vended-credentials') \
    .config('spark.sql.catalog.opencatalog.uri',f'https://{open_catalog_account_identifier}.snowflakecomputing.com/polaris/api/catalog') \
    .config('spark.sql.catalog.opencatalog.credential',f'{client_id}:{client_secret}') \
    .config('spark.sql.catalog.opencatalog.warehouse',catalog_name) \
    .config('spark.sql.catalog.opencatalog.scope',f'PRINCIPAL_ROLE:{principal_role_name}') \
    .config(f'spark.hadoop.fs.azure.account.key.{azure_storage_account_name}.dfs.core.windows.net', azure_storage_account_key) \
    .getOrCreate()
spark.sql("create namespace if not exists spark_demo")
spark.sql("create or replace table spark_demo.test_table12 (col1 int) using iceberg")

Py4JJavaError: An error occurred while calling o51.sql.
: org.apache.iceberg.exceptions.BadRequestException: Malformed request: Cannot create table on external catalogs.

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?