4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Azure Cosmos DB Bulk Executor Libraryを理解する

Last updated at Posted at 2018-09-01

はじめに

Bulk Executor使ってますか?
触ってみて、中身見てみて、
どこが良いのか、なぜ普通のSDKではできないのか を理解したので書きます。

Bulk Executor Libraryとは

  • CosmosDBの一括挿入と一括更新のライブラリ
    • ドキュメント(SQL API)に対応
    • グラフ (Gremlin API)に対応(アナウンスまだ?ver1.0.2以降?)
    • .NET版(非Core)/Java版
  • 従来はスロットリング,タイムアウト,その他エラーの対処を自前で実装する必要があったが、
    組み込みの輻輳制御で、スロットリング、ネットワークエラーに対応
    • 1VM上のExecutor1つで50万 RU/s以上を消費可能
    • Executor&RU/sのスケールアウトで1時間以内に1TB超のデータを一括インポート可能

パーティション分割のざっくりおさらい

つい最近(7/28) Docs(日本語)の Azure Cosmos DB でのパーティション分割とスケーリング が更新されたので、既に読んでる人も再読してみると良いとおもう。

コレクション

  • 単一パーティションコレクション
    • パーティションキーを省略可能
    • 最小 400~最大10,000 RU/s
    • ストレージ容量は最大10GB(固定)
  • パーティション分割コレクション
    • 最小 1,000RU/s~最大50,000RU/s(それ以上はサポートへ連絡)
    • ストレージ容量は無制限

パーティション分割

  • 論理パーティション
    • パーティションキーで一意に特定されるパーティション
    • パーティションキーは内部でハッシュ化され、物理パーティションに割り当てられる(コンシステントハッシュ法)
    • 10 GB の論理パーティションの上限
  • 物理パーティション
    • 固定容量のストレージ、可変容量コンピューティング リソース (CPU およびメモリ) の組み合わせ
    • 容量の上限到達や、スループットの超過が発生すると、内部で物理パーティションをシームレ
      スに分割する

パーティションキー選択

  • ホットパーティションができないよう、いっぱい値があるものをキーにする
    • 日付単体ダメゼッタイ
  • トランザクションは論理パーティション単位なのでそこも考慮する
  • (あと、内部実装では、Stringは100文字で切り捨てぽいので、長けりゃ良いもんでもない(ぽい))

Bulk Import

使い方(ざっくり)

  • 初期化(InitializeAsync)して
  • インポート(BulkImportAsync)

サンプルコード

BulkImport
private async Task SampleCode()
{
    //Step1.接続ポリシー
    var connectionPolicy = new ConnectionPolicy()
    {
        ConnectionMode = ConnectionMode.Direct,//直接接続
        ConnectionProtocol = Protocol.Tcp,//TCP接続
    };
    //Step2.Documentクライアントの生成
    var client = new DocumentClient(new Uri(EndpointUrl), AuthorizationKey, connectionPolicy);
    //Step3.Collectionの取得
    var collection = client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(DatabaseName))
        .Where(c => c.Id == CollectionName).AsEnumerable().FirstOrDefault(); ;
    //Step4.BulkExecutorの生成
    IBulkExecutor bulkExecutor = new BulkExecutor(client, collection);
    //Step5.BulkExecutorの初期化
    await bulkExecutor.InitializeAsync();
    //Step6.接続ポリシーのリトライオプションをなしに設定(輻輳制御をBulkExecutorに委譲)
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    //Step7.BulkImport
    var bulkImportResponse = await bulkExecutor.BulkImportAsync(
            new string[] {
        "{\"sample\":\"pkey-1\",\"id\":\"id-1\"}",
        "{\"sample\":\"pkey-2\",\"id\":\"id-1\"}",
            }
        );
}

注意点

  • インポート前に接続ポリシーのリトライを0にすること(BulkExecutorにリトライを委譲するため)
  • disableAutomaticIdGenerationオプション(既定値:true)
    IDを指定しない場合、自動生成(GUID)
  • EnableUpsertオプションもあるよ

Bulk Update

使い方(ざっくり)

  • 初期化(InitializeAsync)して
  • パッチ内容生成して
  • アップデート(BulkUpdateAsync)

パッチ内容

  • ドキュメントのIDとパーティションキー毎にオペレーションを作成する
  • MongoDBのUpdate Operatorっぽい
  • 定義されているオペレーションは以下の通り
    • インクリメント:指定したフィールドを加減算 (3)
    • セット:指定したフィールドを任意の値に設定 (14)
    • アンセット:指定したフィールドを削除 (16)
    • 配列プッシュ:指定した配列フィールドに要素を追加 (10)
    • 配列リムーブ:指定した配列フィールドから要素を削除 (17)
    • (+アンドキュメンテッドなオペレーションがいっぱいあり? UpdateOperationType

サンプルコード

BulkUpdate
private async Task SampleCode()
{
    //Step1-6.はBulkInsertと同じ
    //Step7.BulkImport
    var bulkImportResponse = await bulkExecutor.BulkImportAsync(
            new string[] {
        "{\"sample\":\"pkey-1\",\"id\":\"id-1\",\"temp\":\"1\",\"prop-unset\":\"1\",\"prop-increment\":1,\"prop-remove\":[1,2]}",
        "{\"sample\":\"pkey-2\",\"id\":\"id-1\",\"temp\":\"2\",\"prop-unset\":\"1\",\"prop-increment\":1,\"prop-remove\":[1,2]}",
            }, enableUpsert: true
        );
    //Step8.オペレーションの作成
    var operationList = new List<UpdateOperation>
    {
        new SetUpdateOperation<string>("prop-set", "hello"),
        new UnsetUpdateOperation("prop-unset"),
        new IncUpdateOperation<int>("prop-increment",1),
        new PushUpdateOperation("prop-push",new object[]{1,2}),
        new RemoveUpdateOperation<int>("prop-remove",1),
        new UpdateOperation<object>(UpdateOperationType.CurrentDate,"prop-currentDate",null),
    };
    var items = new List<UpdateItem>();
    items.Add(new UpdateItem("id-1", "pkey-1", operationList));
    items.Add(new UpdateItem("id-1", "pkey-2", operationList));
    //Step9.BulkUpdate
    var bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(items);
}
Update前
{
    "sample": "pkey-2",
    "id": "id-1",
    "temp": "2",
    "prop-unset": "1",
    "prop-increment": 1,
    "prop-remove": [
        1,
        2
    ],
    "_rid": "1WFNAJbvWdIEAAAAAAAAAA==",
    "_self": "dbs/1WFNAA==/colls/1WFNAJbvWdI=/docs/1WFNAJbvWdIEAAAAAAAAAA==/",
    "_etag": "\"00004417-0000-0000-0000-5b8aa8bb0000\"",
    "_attachments": "attachments/",
    "_ts": 1535813819
}
Update後
{
    "sample": "pkey-1",
    "id": "id-1",
    "temp": "1",
    "prop-increment": 1,
    "prop-remove": [
        2
    ],
    "_rid": "1WFNAJbvWdIDAAAAAAAAAA==",
    "_self": "dbs/1WFNAA==/colls/1WFNAJbvWdI=/docs/1WFNAJbvWdIDAAAAAAAAAA==/",
    "_etag": "\"00004517-0000-0000-0000-5b8aa8c90000\"",
    "_attachments": "attachments/",
    "prop-set": "hello",
    "prop-push": [
        1,
        2
    ],
    "prop-currentDate": {
        "$timestamp": {
            "t": 1535813833,
            "i": 1
        }
    },
    "_ts": 1535813833
}

Bulk Executor Libraryの内部

動作原理

初期化(InitializeAsync)

  • パーティションマップを取得

HTTPS接続でReq/Resを確認した結果

Request
GET https://xxxxx.documents.azure.com/dbs/sJ1lAA==/colls/sJ1lAOQ7qy0=/pkranges HTTP/1.1
x-ms-max-item-count: -1
A-IM: Incremental Feed
x-ms-date: Sat, 01 Sep 2018 15:13:24 GMT
authorization: xxxxx
Cache-Control: no-cache
x-ms-consistency-level: Session
User-Agent: documentdb-dotnet-sdk/2.0.0 Host/32-bit MicrosoftWindowsNT/6.2.9200.0
x-ms-version: 2017-11-15
Accept: application/json
Host: xxxxx.documents.azure.com

ResponseHeader
HTTP/1.1 200 Ok
Cache-Control: no-store, no-cache
Pragma: no-cache
Content-Type: application/json
Content-Location: https://xxxxx.documents.azure.com/dbs/sJ1lAA==/colls/sJ1lAOQ7qy0=/pkranges
Server: Microsoft-HTTPAPI/2.0
Strict-Transport-Security: max-age=31536000
x-ms-last-state-change-utc: Tue, 28 Aug 2018 07:09:44.280 GMT
etag: "30"
lsn: 30
x-ms-item-count: 5
x-ms-schemaversion: 1.6
x-ms-alt-content-path: dbs/xxxxx/colls/xxxxx
x-ms-xp-role: 1
x-ms-global-Committed-lsn: 30
x-ms-number-of-read-regions: 0
x-ms-transport-request-id: 34959
x-ms-cosmos-llsn: 30
x-ms-serviceversion: version=2.0.0.0
x-ms-activity-id: 5245d708-4747-4696-b343-9318866b3043
x-ms-session-token: 0:30
x-ms-gatewayversion: version=2.0.0.0
Date: Sat, 01 Sep 2018 15:13:32 GMT
Content-Length: 1731
ResponseBody
{
  "_rid": "sJ1lAOQ7qy0=",
  "PartitionKeyRanges": [
    {
      "_rid": "sJ1lAOQ7qy0CAAAAAAAAUA==",
      "id": "0",
      "_etag": "\"00003600-0000-0000-0000-5b817fcf0000\"",
      "minInclusive": "",
      "maxExclusive": "05C1C9CD673398",
      "ridPrefix": 0,
      "_self": "dbs\/sJ1lAA==\/colls\/sJ1lAOQ7qy0=\/pkranges\/sJ1lAOQ7qy0CAAAAAAAAUA==\/",
      "throughputFraction": 0.2,
      "status": "online",
      "parents": [],
      "_ts": 1535213519,
      "_lsn": 23
    },
~~~略~~~
      "minInclusive": "05C1C9CD673398",
      "maxExclusive": "05C1D9CD673398",
~~~略~~~
    {
      "_rid": "sJ1lAOQ7qy0GAAAAAAAAUA==",
      "id": "4",
      "_etag": "\"00003a00-0000-0000-0000-5b817fcf0000\"",
      "minInclusive": "05C1E9CD673398",
      "maxExclusive": "FF",
      "ridPrefix": 4,
      "_self": "dbs\/sJ1lAA==\/colls\/sJ1lAOQ7qy0=\/pkranges\/sJ1lAOQ7qy0GAAAAAAAAUA==\/",
      "throughputFraction": 0.2,
      "status": "online",
      "parents": [],
      "_ts": 1535213519,
      "_lsn": 23
    }
  ],
  "_count": 5
}

物理パーティションが、minInclusivemaxExclusiveの範囲で表現されている。

BulkImportAsync/BulkUpdateAsync

  • 各ドキュメントのパーティションキーを元にどの物理パーティションか特定
  • 各ドキュメントを物理パーティション毎に仕分けする(バケット化)
  • バケットをさらに分割する(ミニバッチ化)
  • 物理パーティションごとに並列で、ミニバッチ単位でシステムストアドプロシージャを呼び出し
    • Import: __.sys.commonBulkInsert
    • Update:__.sys.bulkPatch
  • スロットリング(HttpStatusCode 429)が発生しても、成功するまでリトライ
    • AIMDStyleの輻輳制御(増やすときは加算的に、減らすときは乗算的に)
    • 内部的には物理パーティション毎にRU割り当てがあり、スロットリングもその単位で行われる

Bulk Delete

  • なぜかメソッド(BulkDeleteAsync)はinternalとなっているが Reflectionで呼び出せる
  • 条件付きのDELETEを全物理パーティションに効率よく投げられる
BulkDelete
private async Task SampleCode()
{
    //Step1-7.はBulkUpdateと同じ
    //Step8.BulkDelete (ver1.0.1 リトライが未実装)
    //var querySpec = new BulkDeleteQuerySpec("s", "s.temp='1'", null, null);
    //var method = typeof(BulkExecutor).GetMethod("BulkDeleteAsync", BindingFlags.NonPublic | BindingFlags.Instance);
    //await ((Task)method.Invoke(bulkExecutor, new object[] { querySpec }));
    //Step8.BulkDelete (ver1.0.2)
    var method = typeof(BulkExecutor).GetMethod("BulkDeleteAsync", BindingFlags.NonPublic | BindingFlags.Instance);
    await ((Task)method.Invoke(bulkExecutor
        , new object[] { "select * from a where a.temp='1'", null, null }));
}

疑問と実験

  • BulkExecutor実行中にパーティション分割されたら?

    • 物理パーティションの範囲と異なる操作をするとHttpStatusCode Gone(410)が返される
      →再度パーティションキーマップを取得する実装となっている(ぽい)
  • 通常のストアドプロシージャは、パーティションキーを指定して実行する(論理パーティション単位で実行)

    • スロットリングは物理パーティション単位
    • 効率的なRU消費の為には、物理パーティション毎にスロットリング制御する必要がある
      1. パーティションキーマップを元に、物理パーティションを特定
      2. 物理パーティション毎に並列化して、論理パーティションキー毎に並列でストアドを実行
        ⇒それってBulkExecutorの劣化実装…
  • 通常のストアドプロシージャは、物理パーティション単位で実行することは出来ないか?

    • BulkExecutorはどうやってる?
      • x-ms-documentdb-partitionkeyrangeidヘッダーを指定している
    • ユーザー定義のストアドをinternalなRequestOption.PartitionKeyRangeId指定して呼んでみた
PartitionKeyRangeId指定
var pkRanges = await client.ReadPartitionKeyRangeFeedAsync(UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionName));
foreach (var pkRange in pkRanges)
{
    var requestOptions = new RequestOptions()
    {
        //PartitionKey = new PartitionKey(partitonKey),
    };
    typeof(RequestOptions).GetProperty("PartitionKeyRangeId", BindingFlags.NonPublic | BindingFlags.Instance).SetValue(requestOptions, pkRange.Id);

    var response = await client.ExecuteStoredProcedureAsync<int>(
        UriFactory.CreateStoredProcedureUri(DatabaseName, CollectionName, "CountDocs"),
        requestOptions);
}

結果⇒Current request not allowed to set only partition key range id without partition key in header.

まとめ

  • BulkExecutor
    • スループットを使い切る、シンプルな一括操作
    • 一括インポート、一括更新(削除もあるよ!)
  • なぜ良いのか
    • SDKでは論理パーティション単位での操作となるが、

      BulkExecutor(システムストアド)は物理パーティション単位での操作ができる

      なのでスロットリングを極力避けてRUを最大限消費できる

      インピーダンスミスマッチというかなんというか…

希望/期待/不満

  • 普通のストアドをBulkExecuteしたい
  • UPDATEもPK/ID指定じゃなくて条件+オペレーションで更新したい
    • mongo db のfindAndModifyみたいに
  • UPDATEのオペレーションのドキュメント拡充
    • getting-started に書かれていないオペレーションが
      Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate.UpdateOperationTypeに列挙されている。
      CurrentDateとか。Popとか
  • BulkDeleteAsyncの公開
    • 論理キー不明で消したいシーンってあんまりないはずだし、使い方によっては危険だけど
      ID/PK指定ではない、条件付き削除を効率よく実行したいとか
      定義済みのストアド消さずにコレクションを消したいとか

参考資料

[Docs]Azure Cosmos DB Bulk Executor ライブラリの概要
[Docs]Bulk Executor .NET ライブラリを使用して Azure Cosmos DB で一括操作を実行する
[GitHub] Bulk Executor Utility for Azure Cosmos DB .NET SQL API
[GitHub/Issue]No BulkDeleteAsync
[Blog]How the CosmosDB Bulk Executor works under the hood

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?