0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Cosmos DB Spark connectorでAzure Databricksからデータを書き込む

Posted at

はじめに

Azure Data Lake Storageに保存された大量のデータを変換しつつ、アプリで利用するためにCosmos DBにデータをロードする方法を模索していたところ、Cosmos DB Spark connectorを発見したので、これを試してみた。

Cosmos DB Spark connectorとは

SparkからCosmos DBのデータを簡単に並列で読み書きするためのライブラリーである。バッチとストリーム処理の両方に対して使用可能なツールである。AzureのPaaSとしてはDatabricksやHDInsightと一緒に使用可能。
https://docs.microsoft.com/ja-jp/azure/cosmos-db/spark-connector
https://docs.microsoft.com/ja-jp/azure/databricks/data/data-sources/azure/cosmosdb-connector

コンポーネント バージョン
Apache Spark 2.4.x, 2.3.x, 2.2.x, 2.1.x
Scala 2.11

使用にあたって以下に注意する必要がある。

  • このコネクタはCosmos DB Core (SQL) APIのみをサポートしている。その他コネクタとしては MongoDB Connector for SparkSpark Cassandra Connector がある。
  • 現在のところ利用できる最新版がSpark2.4.xのため、Databricks 7.0以降のSpark 3.0.xではまだ動作させることはできない。

使用方法

ライブラリのインストール

以下からSpark/Scalaバージョンに合わせたコネクタをダウンロードしてDatabricksにインストールする。
https://docs.microsoft.com/ja-jp/azure/cosmos-db/spark-connector#bk_working_with_connector

クラスターの Library タブからJARファイルをアップロードしてインストールする。インストール後は以下のようにライブラリーを確認することができる。
image.png

データの書き込み方法

Storageアクセス

今回はData Lake Storageに保存されたデータをCosmos DBに書き込むことを想定しているため、まずはStorageへのアクセスを行う。
またジョブクラスターで実行することも考慮し、パススルー認証ではなくService PrincipalによるOAuth 2.0認証を使用する。OAuth 2.0認証を使用する方法は過去の [この記事[(https://qiita.com/whata/items/47827586fc45b247a0d1) にまとめてある。

spark.conf.set("fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account-name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account-name>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account-name>.dfs.core.windows.net", dbutils.secrets.get(scope="<scope-name>",key="<service-credential-key-name>"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account-name>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

データをADLSから読み取ってDataFrameを作成する

ここではParquetファイルが格納されていることとしてデータをロードしている。

val load_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/path/to/*.parquet"
val df = spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(load_path)

Cosmos DBへの書き込み

書き込み方法は非常にシンプルで、Cosmos DBへの接続情報と書き込み先のDatabase、Collectionを指定して、そのConfigとともに書き込みを実行する。

// Write configuration
val writeConfig = Config(Map(
  "Endpoint" -> "https://<cosmos-db-account-name>.documents.azure.com:443/",
  "Masterkey" -> "<comsmos-db-master-key>",
  "Database" -> "<database-name>",
  "Collection" -> "<collection-name>",
  "Upsert" -> "true"
))

// Write to Cosmos DB from the DataFrame
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

まとめ

非常に簡単にDatabricksとCosmos DBを接続することができた。数千万件のレコードを8 workerノードくらいで400 RUに設定したCosmos DBに書き込みしたら、一瞬でRUの上限に達して動作しなくなってしまった。非常に高速に実行できるが、書き込みリクエストの加減を調整することができないので、Cosmos DB側のRUを見てどのような単位で (小分けにするなどして) 実行するか等を考える必要がある。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?