Azure
DocumentDB
CosmosDB

Azure Cosmos DB Bulk Executor Libraryを理解する


はじめに

Bulk Executor使ってますか?

触ってみて、中身見てみて、

どこが良いのか、なぜ普通のSDKではできないのか を理解したので書きます。


Bulk Executor Libraryとは


  • CosmosDBの一括挿入と一括更新のライブラリ


    • ドキュメント(SQL API)に対応

    • グラフ (Gremlin API)に対応(アナウンスまだ?ver1.0.2以降?)

    • .NET版(非Core)/Java版



  • 従来はスロットリング,タイムアウト,その他エラーの対処を自前で実装する必要があったが、
    組み込みの輻輳制御で、スロットリング、ネットワークエラーに対応


    • 1VM上のExecutor1つで50万 RU/s以上を消費可能

    • Executor&RU/sのスケールアウトで1時間以内に1TB超のデータを一括インポート可能




パーティション分割のざっくりおさらい

つい最近(7/28) Docs(日本語)の Azure Cosmos DB でのパーティション分割とスケーリング が更新されたので、既に読んでる人も再読してみると良いとおもう。


コレクション


  • 単一パーティションコレクション


    • パーティションキーを省略可能

    • 最小 400~最大10,000 RU/s

    • ストレージ容量は最大10GB(固定)



  • パーティション分割コレクション


    • 最小 1,000RU/s~最大50,000RU/s(それ以上はサポートへ連絡)

    • ストレージ容量は無制限




パーティション分割


  • 論理パーティション


    • パーティションキーで一意に特定されるパーティション

    • パーティションキーは内部でハッシュ化され、物理パーティションに割り当てられる(コンシステントハッシュ法)

    • 10 GB の論理パーティションの上限



  • 物理パーティション


    • 固定容量のストレージ、可変容量コンピューティング リソース (CPU およびメモリ) の組み合わせ

    • 容量の上限到達や、スループットの超過が発生すると、内部で物理パーティションをシームレ
      スに分割する




パーティションキー選択


  • ホットパーティションができないよう、いっぱい値があるものをキーにする


    • 日付単体ダメゼッタイ



  • トランザクションは論理パーティション単位なのでそこも考慮する

  • (あと、内部実装では、Stringは100文字で切り捨てぽいので、長けりゃ良いもんでもない(ぽい))


Bulk Import


使い方(ざっくり)


  • 初期化(InitializeAsync)して

  • インポート(BulkImportAsync)


サンプルコード


BulkImport

private async Task SampleCode()

{
//Step1.接続ポリシー
var connectionPolicy = new ConnectionPolicy()
{
ConnectionMode = ConnectionMode.Direct,//直接接続
ConnectionProtocol = Protocol.Tcp,//TCP接続
};
//Step2.Documentクライアントの生成
var client = new DocumentClient(new Uri(EndpointUrl), AuthorizationKey, connectionPolicy);
//Step3.Collectionの取得
var collection = client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(DatabaseName))
.Where(c => c.Id == CollectionName).AsEnumerable().FirstOrDefault(); ;
//Step4.BulkExecutorの生成
IBulkExecutor bulkExecutor = new BulkExecutor(client, collection);
//Step5.BulkExecutorの初期化
await bulkExecutor.InitializeAsync();
//Step6.接続ポリシーのリトライオプションをなしに設定(輻輳制御をBulkExecutorに委譲)
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
//Step7.BulkImport
var bulkImportResponse = await bulkExecutor.BulkImportAsync(
new string[] {
"{\"sample\":\"pkey-1\",\"id\":\"id-1\"}",
"{\"sample\":\"pkey-2\",\"id\":\"id-1\"}",
}
);
}


注意点


  • インポート前に接続ポリシーのリトライを0にすること(BulkExecutorにリトライを委譲するため)

  • disableAutomaticIdGenerationオプション(既定値:true)
    IDを指定しない場合、自動生成(GUID)

  • EnableUpsertオプションもあるよ


Bulk Update


使い方(ざっくり)


  • 初期化(InitializeAsync)して

  • パッチ内容生成して

  • アップデート(BulkUpdateAsync)


パッチ内容


  • ドキュメントのIDとパーティションキー毎にオペレーションを作成する

  • MongoDBのUpdate Operatorっぽい

  • 定義されているオペレーションは以下の通り


    • インクリメント:指定したフィールドを加減算 (3)

    • セット:指定したフィールドを任意の値に設定 (14)

    • アンセット:指定したフィールドを削除 (16)

    • 配列プッシュ:指定した配列フィールドに要素を追加 (10)

    • 配列リムーブ:指定した配列フィールドから要素を削除 (17)

    • (+アンドキュメンテッドなオペレーションがいっぱいあり? UpdateOperationType




サンプルコード


BulkUpdate

private async Task SampleCode()

{
//Step1-6.はBulkInsertと同じ
//Step7.BulkImport
var bulkImportResponse = await bulkExecutor.BulkImportAsync(
new string[] {
"{\"sample\":\"pkey-1\",\"id\":\"id-1\",\"temp\":\"1\",\"prop-unset\":\"1\",\"prop-increment\":1,\"prop-remove\":[1,2]}",
"{\"sample\":\"pkey-2\",\"id\":\"id-1\",\"temp\":\"2\",\"prop-unset\":\"1\",\"prop-increment\":1,\"prop-remove\":[1,2]}",
}, enableUpsert: true
);
//Step8.オペレーションの作成
var operationList = new List<UpdateOperation>
{
new SetUpdateOperation<string>("prop-set", "hello"),
new UnsetUpdateOperation("prop-unset"),
new IncUpdateOperation<int>("prop-increment",1),
new PushUpdateOperation("prop-push",new object[]{1,2}),
new RemoveUpdateOperation<int>("prop-remove",1),
new UpdateOperation<object>(UpdateOperationType.CurrentDate,"prop-currentDate",null),
};
var items = new List<UpdateItem>();
items.Add(new UpdateItem("id-1", "pkey-1", operationList));
items.Add(new UpdateItem("id-1", "pkey-2", operationList));
//Step9.BulkUpdate
var bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(items);
}


Update前

{

"sample": "pkey-2",
"id": "id-1",
"temp": "2",
"prop-unset": "1",
"prop-increment": 1,
"prop-remove": [
1,
2
],
"_rid": "1WFNAJbvWdIEAAAAAAAAAA==",
"_self": "dbs/1WFNAA==/colls/1WFNAJbvWdI=/docs/1WFNAJbvWdIEAAAAAAAAAA==/",
"_etag": "\"00004417-0000-0000-0000-5b8aa8bb0000\"",
"_attachments": "attachments/",
"_ts": 1535813819
}


Update後

{

"sample": "pkey-1",
"id": "id-1",
"temp": "1",
"prop-increment": 1,
"prop-remove": [
2
],
"_rid": "1WFNAJbvWdIDAAAAAAAAAA==",
"_self": "dbs/1WFNAA==/colls/1WFNAJbvWdI=/docs/1WFNAJbvWdIDAAAAAAAAAA==/",
"_etag": "\"00004517-0000-0000-0000-5b8aa8c90000\"",
"_attachments": "attachments/",
"prop-set": "hello",
"prop-push": [
1,
2
],
"prop-currentDate": {
"$timestamp": {
"t": 1535813833,
"i": 1
}
},
"_ts": 1535813833
}


Bulk Executor Libraryの内部


動作原理


初期化(InitializeAsync)


  • パーティションマップを取得

HTTPS接続でReq/Resを確認した結果


Request

GET https://xxxxx.documents.azure.com/dbs/sJ1lAA==/colls/sJ1lAOQ7qy0=/pkranges HTTP/1.1

x-ms-max-item-count: -1
A-IM: Incremental Feed
x-ms-date: Sat, 01 Sep 2018 15:13:24 GMT
authorization: xxxxx
Cache-Control: no-cache
x-ms-consistency-level: Session
User-Agent: documentdb-dotnet-sdk/2.0.0 Host/32-bit MicrosoftWindowsNT/6.2.9200.0
x-ms-version: 2017-11-15
Accept: application/json
Host: xxxxx.documents.azure.com



ResponseHeader

HTTP/1.1 200 Ok

Cache-Control: no-store, no-cache
Pragma: no-cache
Content-Type: application/json
Content-Location: https://xxxxx.documents.azure.com/dbs/sJ1lAA==/colls/sJ1lAOQ7qy0=/pkranges
Server: Microsoft-HTTPAPI/2.0
Strict-Transport-Security: max-age=31536000
x-ms-last-state-change-utc: Tue, 28 Aug 2018 07:09:44.280 GMT
etag: "30"
lsn: 30
x-ms-item-count: 5
x-ms-schemaversion: 1.6
x-ms-alt-content-path: dbs/xxxxx/colls/xxxxx
x-ms-xp-role: 1
x-ms-global-Committed-lsn: 30
x-ms-number-of-read-regions: 0
x-ms-transport-request-id: 34959
x-ms-cosmos-llsn: 30
x-ms-serviceversion: version=2.0.0.0
x-ms-activity-id: 5245d708-4747-4696-b343-9318866b3043
x-ms-session-token: 0:30
x-ms-gatewayversion: version=2.0.0.0
Date: Sat, 01 Sep 2018 15:13:32 GMT
Content-Length: 1731


ResponseBody

{

"_rid": "sJ1lAOQ7qy0=",
"PartitionKeyRanges": [
{
"_rid": "sJ1lAOQ7qy0CAAAAAAAAUA==",
"id": "0",
"_etag": "\"00003600-0000-0000-0000-5b817fcf0000\"",
"minInclusive": "",
"maxExclusive": "05C1C9CD673398",
"ridPrefix": 0,
"_self": "dbs\/sJ1lAA==\/colls\/sJ1lAOQ7qy0=\/pkranges\/sJ1lAOQ7qy0CAAAAAAAAUA==\/",
"throughputFraction": 0.2,
"status": "online",
"parents": [],
"_ts": 1535213519,
"_lsn": 23
},
~~~略~~~
"minInclusive": "05C1C9CD673398",
"maxExclusive": "05C1D9CD673398",
~~~略~~~
{
"_rid": "sJ1lAOQ7qy0GAAAAAAAAUA==",
"id": "4",
"_etag": "\"00003a00-0000-0000-0000-5b817fcf0000\"",
"minInclusive": "05C1E9CD673398",
"maxExclusive": "FF",
"ridPrefix": 4,
"_self": "dbs\/sJ1lAA==\/colls\/sJ1lAOQ7qy0=\/pkranges\/sJ1lAOQ7qy0GAAAAAAAAUA==\/",
"throughputFraction": 0.2,
"status": "online",
"parents": [],
"_ts": 1535213519,
"_lsn": 23
}
],
"_count": 5
}

物理パーティションが、minInclusivemaxExclusiveの範囲で表現されている。


BulkImportAsync/BulkUpdateAsync


  • 各ドキュメントのパーティションキーを元にどの物理パーティションか特定

  • 各ドキュメントを物理パーティション毎に仕分けする(バケット化)

  • バケットをさらに分割する(ミニバッチ化)


  • 物理パーティションごとに並列で、ミニバッチ単位でシステムストアドプロシージャを呼び出し


    • Import: __.sys.commonBulkInsert

    • Update:__.sys.bulkPatch



  • スロットリング(HttpStatusCode 429)が発生しても、成功するまでリトライ


    • AIMDStyleの輻輳制御(増やすときは加算的に、減らすときは乗算的に)

    • 内部的には物理パーティション毎にRU割り当てがあり、スロットリングもその単位で行われる




Bulk Delete


  • なぜかメソッド(BulkDeleteAsync)はinternalとなっているが Reflectionで呼び出せる

  • 条件付きのDELETEを全物理パーティションに効率よく投げられる


BulkDelete

private async Task SampleCode()

{
//Step1-7.はBulkUpdateと同じ
//Step8.BulkDelete (ver1.0.1 リトライが未実装)
//var querySpec = new BulkDeleteQuerySpec("s", "s.temp='1'", null, null);
//var method = typeof(BulkExecutor).GetMethod("BulkDeleteAsync", BindingFlags.NonPublic | BindingFlags.Instance);
//await ((Task)method.Invoke(bulkExecutor, new object[] { querySpec }));
//Step8.BulkDelete (ver1.0.2)
var method = typeof(BulkExecutor).GetMethod("BulkDeleteAsync", BindingFlags.NonPublic | BindingFlags.Instance);
await ((Task)method.Invoke(bulkExecutor
, new object[] { "select * from a where a.temp='1'", null, null }));
}


疑問と実験



  • BulkExecutor実行中にパーティション分割されたら?


    • 物理パーティションの範囲と異なる操作をするとHttpStatusCode Gone(410)が返される
      →再度パーティションキーマップを取得する実装となっている(ぽい)




  • 通常のストアドプロシージャは、パーティションキーを指定して実行する(論理パーティション単位で実行)


    • スロットリングは物理パーティション単位

    • 効率的なRU消費の為には、物理パーティション毎にスロットリング制御する必要がある


      1. パーティションキーマップを元に、物理パーティションを特定

      2. 物理パーティション毎に並列化して、論理パーティションキー毎に並列でストアドを実行
        ⇒それってBulkExecutorの劣化実装…






  • 通常のストアドプロシージャは、物理パーティション単位で実行することは出来ないか?


    • BulkExecutorはどうやってる?



      • x-ms-documentdb-partitionkeyrangeidヘッダーを指定している



    • ユーザー定義のストアドをinternalなRequestOption.PartitionKeyRangeId指定して呼んでみた




PartitionKeyRangeId指定

var pkRanges = await client.ReadPartitionKeyRangeFeedAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionName));

foreach (var pkRange in pkRanges)
{
var requestOptions = new RequestOptions()
{
//PartitionKey = new PartitionKey(partitonKey),
};
typeof(RequestOptions).GetProperty("PartitionKeyRangeId", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(requestOptions, pkRange.Id);

var response = await client.ExecuteStoredProcedureAsync<int>(
UriFactory.CreateStoredProcedureUri(DatabaseName, CollectionName, "CountDocs"),
requestOptions);
}


結果⇒Current request not allowed to set only partition key range id without partition key in header.


まとめ


  • BulkExecutor


    • スループットを使い切る、シンプルな一括操作

    • 一括インポート、一括更新(削除もあるよ!)



  • なぜ良いのか


    • SDKでは論理パーティション単位での操作となるが、

      BulkExecutor(システムストアド)は物理パーティション単位での操作ができる

      なのでスロットリングを極力避けてRUを最大限消費できる

      インピーダンスミスマッチというかなんというか…




希望/期待/不満


  • 普通のストアドをBulkExecuteしたい

  • UPDATEもPK/ID指定じゃなくて条件+オペレーションで更新したい


    • mongo db のfindAndModifyみたいに



  • UPDATEのオペレーションのドキュメント拡充



    • getting-started に書かれていないオペレーションが
      Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate.UpdateOperationTypeに列挙されている。
      CurrentDateとか。Popとか



  • BulkDeleteAsyncの公開


    • 論理キー不明で消したいシーンってあんまりないはずだし、使い方によっては危険だけど
      ID/PK指定ではない、条件付き削除を効率よく実行したいとか
      定義済みのストアド消さずにコレクションを消したいとか




参考資料

[Docs]Azure Cosmos DB Bulk Executor ライブラリの概要

[Docs]Bulk Executor .NET ライブラリを使用して Azure Cosmos DB で一括操作を実行する

[GitHub] Bulk Executor Utility for Azure Cosmos DB .NET SQL API

[GitHub/Issue]No BulkDeleteAsync

[Blog]How the CosmosDB Bulk Executor works under the hood