はじめに
Bulk Executor使ってますか?
触ってみて、中身見てみて、
どこが良いのか、なぜ普通のSDKではできないのか を理解したので書きます。
Bulk Executor Libraryとは
- CosmosDBの一括挿入と一括更新のライブラリ
- ドキュメント(SQL API)に対応
- グラフ (Gremlin API)に対応(アナウンスまだ?ver1.0.2以降?)
- .NET版(非Core)/Java版
- 従来はスロットリング,タイムアウト,その他エラーの対処を自前で実装する必要があったが、
組み込みの輻輳制御で、スロットリング、ネットワークエラーに対応- 1VM上のExecutor1つで50万 RU/s以上を消費可能
- Executor&RU/sのスケールアウトで1時間以内に1TB超のデータを一括インポート可能
パーティション分割のざっくりおさらい
つい最近(7/28) Docs(日本語)の Azure Cosmos DB でのパーティション分割とスケーリング が更新されたので、既に読んでる人も再読してみると良いとおもう。
コレクション
- 単一パーティションコレクション
- パーティションキーを省略可能
- 最小 400~最大10,000 RU/s
- ストレージ容量は最大10GB(固定)
- パーティション分割コレクション
- 最小 1,000RU/s~最大50,000RU/s(それ以上はサポートへ連絡)
- ストレージ容量は無制限
パーティション分割
- 論理パーティション
- パーティションキーで一意に特定されるパーティション
- パーティションキーは内部でハッシュ化され、物理パーティションに割り当てられる(コンシステントハッシュ法)
- 10 GB の論理パーティションの上限
- 物理パーティション
- 固定容量のストレージ、可変容量コンピューティング リソース (CPU およびメモリ) の組み合わせ
- 容量の上限到達や、スループットの超過が発生すると、内部で物理パーティションをシームレ
スに分割する
パーティションキー選択
- ホットパーティションができないよう、いっぱい値があるものをキーにする
- 日付単体ダメゼッタイ
- トランザクションは論理パーティション単位なのでそこも考慮する
- (あと、内部実装では、Stringは100文字で切り捨てぽいので、長けりゃ良いもんでもない(ぽい))
Bulk Import
使い方(ざっくり)
- 初期化(InitializeAsync)して
- インポート(BulkImportAsync)
サンプルコード
private async Task SampleCode()
{
//Step1.接続ポリシー
var connectionPolicy = new ConnectionPolicy()
{
ConnectionMode = ConnectionMode.Direct,//直接接続
ConnectionProtocol = Protocol.Tcp,//TCP接続
};
//Step2.Documentクライアントの生成
var client = new DocumentClient(new Uri(EndpointUrl), AuthorizationKey, connectionPolicy);
//Step3.Collectionの取得
var collection = client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(DatabaseName))
.Where(c => c.Id == CollectionName).AsEnumerable().FirstOrDefault(); ;
//Step4.BulkExecutorの生成
IBulkExecutor bulkExecutor = new BulkExecutor(client, collection);
//Step5.BulkExecutorの初期化
await bulkExecutor.InitializeAsync();
//Step6.接続ポリシーのリトライオプションをなしに設定(輻輳制御をBulkExecutorに委譲)
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
//Step7.BulkImport
var bulkImportResponse = await bulkExecutor.BulkImportAsync(
new string[] {
"{\"sample\":\"pkey-1\",\"id\":\"id-1\"}",
"{\"sample\":\"pkey-2\",\"id\":\"id-1\"}",
}
);
}
注意点
- インポート前に接続ポリシーのリトライを0にすること(BulkExecutorにリトライを委譲するため)
- disableAutomaticIdGenerationオプション(既定値:true)
IDを指定しない場合、自動生成(GUID) - EnableUpsertオプションもあるよ
Bulk Update
使い方(ざっくり)
- 初期化(InitializeAsync)して
- パッチ内容生成して
- アップデート(BulkUpdateAsync)
パッチ内容
- ドキュメントのIDとパーティションキー毎にオペレーションを作成する
- MongoDBのUpdate Operatorっぽい
- 定義されているオペレーションは以下の通り
- インクリメント:指定したフィールドを加減算 (3)
- セット:指定したフィールドを任意の値に設定 (14)
- アンセット:指定したフィールドを削除 (16)
- 配列プッシュ:指定した配列フィールドに要素を追加 (10)
- 配列リムーブ:指定した配列フィールドから要素を削除 (17)
- (+アンドキュメンテッドなオペレーションがいっぱいあり?
UpdateOperationType
)
サンプルコード
private async Task SampleCode()
{
//Step1-6.はBulkInsertと同じ
//Step7.BulkImport
var bulkImportResponse = await bulkExecutor.BulkImportAsync(
new string[] {
"{\"sample\":\"pkey-1\",\"id\":\"id-1\",\"temp\":\"1\",\"prop-unset\":\"1\",\"prop-increment\":1,\"prop-remove\":[1,2]}",
"{\"sample\":\"pkey-2\",\"id\":\"id-1\",\"temp\":\"2\",\"prop-unset\":\"1\",\"prop-increment\":1,\"prop-remove\":[1,2]}",
}, enableUpsert: true
);
//Step8.オペレーションの作成
var operationList = new List<UpdateOperation>
{
new SetUpdateOperation<string>("prop-set", "hello"),
new UnsetUpdateOperation("prop-unset"),
new IncUpdateOperation<int>("prop-increment",1),
new PushUpdateOperation("prop-push",new object[]{1,2}),
new RemoveUpdateOperation<int>("prop-remove",1),
new UpdateOperation<object>(UpdateOperationType.CurrentDate,"prop-currentDate",null),
};
var items = new List<UpdateItem>();
items.Add(new UpdateItem("id-1", "pkey-1", operationList));
items.Add(new UpdateItem("id-1", "pkey-2", operationList));
//Step9.BulkUpdate
var bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(items);
}
{
"sample": "pkey-2",
"id": "id-1",
"temp": "2",
"prop-unset": "1",
"prop-increment": 1,
"prop-remove": [
1,
2
],
"_rid": "1WFNAJbvWdIEAAAAAAAAAA==",
"_self": "dbs/1WFNAA==/colls/1WFNAJbvWdI=/docs/1WFNAJbvWdIEAAAAAAAAAA==/",
"_etag": "\"00004417-0000-0000-0000-5b8aa8bb0000\"",
"_attachments": "attachments/",
"_ts": 1535813819
}
{
"sample": "pkey-1",
"id": "id-1",
"temp": "1",
"prop-increment": 1,
"prop-remove": [
2
],
"_rid": "1WFNAJbvWdIDAAAAAAAAAA==",
"_self": "dbs/1WFNAA==/colls/1WFNAJbvWdI=/docs/1WFNAJbvWdIDAAAAAAAAAA==/",
"_etag": "\"00004517-0000-0000-0000-5b8aa8c90000\"",
"_attachments": "attachments/",
"prop-set": "hello",
"prop-push": [
1,
2
],
"prop-currentDate": {
"$timestamp": {
"t": 1535813833,
"i": 1
}
},
"_ts": 1535813833
}
Bulk Executor Libraryの内部
動作原理
初期化(InitializeAsync)
- パーティションマップを取得
HTTPS接続でReq/Resを確認した結果
GET https://xxxxx.documents.azure.com/dbs/sJ1lAA==/colls/sJ1lAOQ7qy0=/pkranges HTTP/1.1
x-ms-max-item-count: -1
A-IM: Incremental Feed
x-ms-date: Sat, 01 Sep 2018 15:13:24 GMT
authorization: xxxxx
Cache-Control: no-cache
x-ms-consistency-level: Session
User-Agent: documentdb-dotnet-sdk/2.0.0 Host/32-bit MicrosoftWindowsNT/6.2.9200.0
x-ms-version: 2017-11-15
Accept: application/json
Host: xxxxx.documents.azure.com
HTTP/1.1 200 Ok
Cache-Control: no-store, no-cache
Pragma: no-cache
Content-Type: application/json
Content-Location: https://xxxxx.documents.azure.com/dbs/sJ1lAA==/colls/sJ1lAOQ7qy0=/pkranges
Server: Microsoft-HTTPAPI/2.0
Strict-Transport-Security: max-age=31536000
x-ms-last-state-change-utc: Tue, 28 Aug 2018 07:09:44.280 GMT
etag: "30"
lsn: 30
x-ms-item-count: 5
x-ms-schemaversion: 1.6
x-ms-alt-content-path: dbs/xxxxx/colls/xxxxx
x-ms-xp-role: 1
x-ms-global-Committed-lsn: 30
x-ms-number-of-read-regions: 0
x-ms-transport-request-id: 34959
x-ms-cosmos-llsn: 30
x-ms-serviceversion: version=2.0.0.0
x-ms-activity-id: 5245d708-4747-4696-b343-9318866b3043
x-ms-session-token: 0:30
x-ms-gatewayversion: version=2.0.0.0
Date: Sat, 01 Sep 2018 15:13:32 GMT
Content-Length: 1731
{
"_rid": "sJ1lAOQ7qy0=",
"PartitionKeyRanges": [
{
"_rid": "sJ1lAOQ7qy0CAAAAAAAAUA==",
"id": "0",
"_etag": "\"00003600-0000-0000-0000-5b817fcf0000\"",
"minInclusive": "",
"maxExclusive": "05C1C9CD673398",
"ridPrefix": 0,
"_self": "dbs\/sJ1lAA==\/colls\/sJ1lAOQ7qy0=\/pkranges\/sJ1lAOQ7qy0CAAAAAAAAUA==\/",
"throughputFraction": 0.2,
"status": "online",
"parents": [],
"_ts": 1535213519,
"_lsn": 23
},
~~~略~~~
"minInclusive": "05C1C9CD673398",
"maxExclusive": "05C1D9CD673398",
~~~略~~~
{
"_rid": "sJ1lAOQ7qy0GAAAAAAAAUA==",
"id": "4",
"_etag": "\"00003a00-0000-0000-0000-5b817fcf0000\"",
"minInclusive": "05C1E9CD673398",
"maxExclusive": "FF",
"ridPrefix": 4,
"_self": "dbs\/sJ1lAA==\/colls\/sJ1lAOQ7qy0=\/pkranges\/sJ1lAOQ7qy0GAAAAAAAAUA==\/",
"throughputFraction": 0.2,
"status": "online",
"parents": [],
"_ts": 1535213519,
"_lsn": 23
}
],
"_count": 5
}
物理パーティションが、minInclusive
とmaxExclusive
の範囲で表現されている。
BulkImportAsync/BulkUpdateAsync
- 各ドキュメントのパーティションキーを元にどの物理パーティションか特定
- 各ドキュメントを物理パーティション毎に仕分けする(バケット化)
- バケットをさらに分割する(ミニバッチ化)
-
物理パーティションごとに並列で、ミニバッチ単位でシステムストアドプロシージャを呼び出し
- Import:
__.sys.commonBulkInsert
- Update:
__.sys.bulkPatch
- Import:
- スロットリング(
HttpStatusCode 429
)が発生しても、成功するまでリトライ- AIMDStyleの輻輳制御(増やすときは加算的に、減らすときは乗算的に)
- 内部的には物理パーティション毎にRU割り当てがあり、スロットリングもその単位で行われる
Bulk Delete
- なぜかメソッド(
BulkDeleteAsync
)はinternalとなっているが Reflectionで呼び出せる - 条件付きのDELETEを全物理パーティションに効率よく投げられる
private async Task SampleCode()
{
//Step1-7.はBulkUpdateと同じ
//Step8.BulkDelete (ver1.0.1 リトライが未実装)
//var querySpec = new BulkDeleteQuerySpec("s", "s.temp='1'", null, null);
//var method = typeof(BulkExecutor).GetMethod("BulkDeleteAsync", BindingFlags.NonPublic | BindingFlags.Instance);
//await ((Task)method.Invoke(bulkExecutor, new object[] { querySpec }));
//Step8.BulkDelete (ver1.0.2)
var method = typeof(BulkExecutor).GetMethod("BulkDeleteAsync", BindingFlags.NonPublic | BindingFlags.Instance);
await ((Task)method.Invoke(bulkExecutor
, new object[] { "select * from a where a.temp='1'", null, null }));
}
疑問と実験
-
BulkExecutor実行中にパーティション分割されたら?
- 物理パーティションの範囲と異なる操作をすると
HttpStatusCode Gone(410)
が返される
→再度パーティションキーマップを取得する実装となっている(ぽい)
- 物理パーティションの範囲と異なる操作をすると
-
通常のストアドプロシージャは、パーティションキーを指定して実行する(論理パーティション単位で実行)
- スロットリングは物理パーティション単位
- 効率的なRU消費の為には、物理パーティション毎にスロットリング制御する必要がある
- パーティションキーマップを元に、物理パーティションを特定
- 物理パーティション毎に並列化して、論理パーティションキー毎に並列でストアドを実行
⇒それってBulkExecutorの劣化実装…
-
通常のストアドプロシージャは、物理パーティション単位で実行することは出来ないか?
- BulkExecutorはどうやってる?
-
x-ms-documentdb-partitionkeyrangeid
ヘッダーを指定している
-
- ユーザー定義のストアドをinternalな
RequestOption.PartitionKeyRangeId
指定して呼んでみた
- BulkExecutorはどうやってる?
var pkRanges = await client.ReadPartitionKeyRangeFeedAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionName));
foreach (var pkRange in pkRanges)
{
var requestOptions = new RequestOptions()
{
//PartitionKey = new PartitionKey(partitonKey),
};
typeof(RequestOptions).GetProperty("PartitionKeyRangeId", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(requestOptions, pkRange.Id);
var response = await client.ExecuteStoredProcedureAsync<int>(
UriFactory.CreateStoredProcedureUri(DatabaseName, CollectionName, "CountDocs"),
requestOptions);
}
結果⇒Current request not allowed to set only partition key range id without partition key in header.
まとめ
- BulkExecutor
- スループットを使い切る、シンプルな一括操作
- 一括インポート、一括更新(削除もあるよ!)
- なぜ良いのか
- SDKでは論理パーティション単位での操作となるが、
BulkExecutor(システムストアド)は物理パーティション単位での操作ができる
なのでスロットリングを極力避けてRUを最大限消費できる
インピーダンスミスマッチというかなんというか…
- SDKでは論理パーティション単位での操作となるが、
希望/期待/不満
- 普通のストアドをBulkExecuteしたい
- UPDATEもPK/ID指定じゃなくて条件+オペレーションで更新したい
- mongo db の
findAndModify
みたいに
- mongo db の
- UPDATEのオペレーションのドキュメント拡充
-
getting-started に書かれていないオペレーションが
Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate.UpdateOperationType
に列挙されている。CurrentDate
とか。Pop
とか
-
getting-started に書かれていないオペレーションが
- BulkDeleteAsyncの公開
- 論理キー不明で消したいシーンってあんまりないはずだし、使い方によっては危険だけど
ID/PK指定ではない、条件付き削除を効率よく実行したいとか
定義済みのストアド消さずにコレクションを消したいとか
- 論理キー不明で消したいシーンってあんまりないはずだし、使い方によっては危険だけど
参考資料
[Docs]Azure Cosmos DB Bulk Executor ライブラリの概要
[Docs]Bulk Executor .NET ライブラリを使用して Azure Cosmos DB で一括操作を実行する
[GitHub] Bulk Executor Utility for Azure Cosmos DB .NET SQL API
[GitHub/Issue]No BulkDeleteAsync
[Blog]How the CosmosDB Bulk Executor works under the hood