LoginSignup
3
4

More than 1 year has passed since last update.

Databricks(Spark)のPysparkにてsnowflakeをソースとしてSparkデータフレームとSparkテーブルを作成する方法

Last updated at Posted at 2021-07-11

概要

Databirkcs(Spark)からsnowflakeへ接続する方法として、jdbcによる方法とSparkコネクターによる方法があり、この記事では2つの方法にてSparkデータフレームとSparkテーブルを作成する方法を紹介します。

SnowflakeのドキュメントにてSparkコネクターを利用することが推奨されており、Sparkコネクターを優先して利用したほうがよさそうです。

Snowflake JDBC ドライバーと組み合わせたコネクターは2つのシステム間で大量のデータを転送するために最適化されているため、Spark用のSnowflakeコネクターを使用することをお勧めします。また、SparkからSnowflakeへのクエリプッシュダウンをサポートすることにより、パフォーマンスが向上します。

引用元:Sparkコネクターの概要 — Snowflake Documentation

Sparkコネクターとjdbcでは、設定するパラメーターが微妙に異なるので注意してください。
Sparkコネクターで設定するパラメーターについては、Sparkコネクタの使用 — Snowflake Documentationにて説明されております。
image.png

引用元:Sparkコネクタの使用 — Snowflake Documentation

jdbcで設定するパラメーターについては、JDBC ドライバーを構成する — Snowflake Documentationにて説明されております。

image.png

引用元:JDBC ドライバーを構成する — Snowflake Documentation

コードを含むノートブックの実行結果をGithub pagesにて公開しております。

実際に試したい方は、下記のファイルをインポートしてください。

https://github.com/manabian-/databricks_tecks_for_qiita/blob/main/tecks/create_df_table_from_snowflake/create_df_table_from_snowflake.dbc

実施手順

SparkコネクターによりSparkデータフレームを作成する方法

# ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'net.snowflake.spark.snowflake'
# driver = 'snowflake'

# 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
sfOptions = {
 "sfURL" : "AAAA.southeast-asia.azure.snowflakecomputing.com",
 "sfUser" : "user",
 "sfPassword" : "password",
 "sfRole" : "SYSADMIN",
 "sfDatabase" : "SNOWFLAKE_SAMPLE_DATA",
 "sfSchema" : "TPCH_SF1",
 "sfWarehouse" : "COMPUTE_WH",
 "column_mapping" : "name",
 "sfTimezone" : "Asia/Tokyo",
}

# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)、もしくは、query(SQLを指定する方法)を指定可能
how_to_read = 'dbtable'
table_or_query = 'NATION'
# how_to_read = 'query'
# table_or_query = 'select * from NATION'

## データフレームを作成
df = (spark.read
          .format(driver)
          .options(**sfOptions)
          .option(how_to_read, table_or_query)
          .load()
     )

## データフレームを表示
df.limit(10).display()

image.png

jdbcによりSparkデータフレームを作成する方法

# ドライバーを指定。
driver = 'net.snowflake.client.jdbc.SnowflakeDriver'

# 接続オプションを指定。指定可能なオプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/jdbc-configure.html#connection-parameters

url  = "jdbc:snowflake://AAAA.southeast-asia.azure.snowflakecomputing.com"

user = 'username'
password = 'password'
role = 'SYSADMIN'

warehouse = 'COMPUTE_WH'

db = 'SNOWFLAKE_SAMPLE_DATA'
schema = 'TPCH_SF1'


# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)、もしくは、query(SQLを指定する方法)を指定可能
how_to_read = 'dbtable'
table_or_query = "NATION"
# how_to_read = "query"
# table_or_query = "select * from NATION"

## データフレームを作成
df = (spark.read
          .format("jdbc")
          .option("driver", driver)
          .option("url", url)
          .option("user", user)
          .option("password", password)                  
          .option("role", role)
          .option("warehouse", warehouse)
          .option("db", db)
          .option("schema", schema)
          .option(how_to_read, table_or_query)          
          .load()
     )

## データフレームを表示
df.limit(100).display()

image.png

SparkコネクターによりSparkテーブルを作成する方法

# Sparkコネクター経由のSparkテーブルを作成する方法
## ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'net.snowflake.spark.snowflake'
# driver = 'snowflake'

# 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
# https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
# オプションの値を、"(ダブルクオーテーション)で囲む必要がある。
url  = '"AAAA.southeast-asia.azure.snowflakecomputing.com"'

warehouse = '"COMPUTE_WH"'

user = '"user"'
password = '"password"'
role = '"SYSADMIN"'

db = '"SNOWFLAKE_SAMPLE_DATA"'
schema = '"TPCH_SF1"'

column_mapping = '"name"'

# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)を指定可能
how_to_read = 'dbtable'
table = '"NATION"'

spark_table_name = spark_database_name + '.NATION_from_snowflake_with_sparkconnector'

# テーブルをドロップ
spark.sql(f'drop table if exists {spark_table_name}')

# DDL文を作成
ddl__create_table = f'''
create table {spark_table_name}
using {driver}
options ( 
  sfURL {url}, 
  sfWarehouse {warehouse},
  sfUser {user}, 
  sfPassword {password},
  sfRole {role},
  sfDatabase {db},
  sfSchema {schema},
  {how_to_read} {table},
  column_mapping {column_mapping}
  )
'''

# DDL文の実行
spark.sql(ddl__create_table)

image.png

jdbcによりSparkテーブルを作成する方法

# jdbc経由のSparkテーブルを作成する方法
## ドライバーを指定。Databrcksでは、"snowflake"と指定することでも可能
driver = 'org.apache.spark.sql.jdbc'
# driver = 'snowflake'

## 接続オプションを、sfOptionsに格納。オプションについては、下記ドキュメントに記載あり。
## https://docs.snowflake.com/ja/user-guide/spark-connector-use.html#required-connection-options
## オプションの値を、"(ダブルクオーテーション)で囲む必要がある。
url  = '"jdbc:snowflake://AAAA.southeast-asia.azure.snowflakecomputing.com"'

warehouse = '"COMPUTE_WH"'

user = '"username"'
password = '"password"'
role = '"SYSADMIN"'

db = '"SNOWFLAKE_SAMPLE_DATA"'
schema = '"TPCH_SF1"'
dbtable = '"lineitem"'

# データの取得方法と取得元を指定
### dbtable(テーブルを指定する方法)を指定可能
how_to_read = 'dbtable'
table = '"NATION"'

# Sparkのテーブル名を指定
spark_table_name = spark_database_name + '.NATION_from_snowflake_with_jdbc'

# DDL文を作成
ddl__create_table = f'''
create table if not exists {spark_table_name}
using {driver}
options ( 
  url {url}, 
  warehouse {warehouse},
  user {user}, 
  password {password},
  role {role},
  db {db},
  schema {schema},
  {how_to_read} {table}
  )
'''

# DDL文を実行
spark.sql(ddl__create_table)

image.png

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4