1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft Fabric 容量を自動スケールする

Last updated at Posted at 2025-12-15

はじめに

以下の記事で監視の方法を紹介しました。
今度は、監視のアクションを容量のサイズ変更、つまり 自動スケーリング を構成してみます

方式

image.png

  1. Fabric 容量イベントを複数構成し、容量の使用率などの情報を容量別に供給します
  2. イベントストリームは容量イベントを統合し、監視構成のためのアクティベーター送信と、蓄積のためのイベントハウス送信を行います。
  3. イベントハウスでは、対象の容量を分析可能な状態に保管します。
  4. アクティベーターは容量ごと使用率を追跡し、スロットリング到達率に対する閾値条件で サイズ変更ノートブックを実行します。
  5. ノートブックで Fabric 容量(Azure リソース)を更新します。

イベントストリームでの複数のデータ統合や送信はそれなりに CU 消費があるので、今回の記事を参考に実装したらベンチマークをとることをおすすめします。

https://learn.microsoft.com/ja-jp/fabric/real-time-intelligence/event-streams/monitor-capacity-consumption

実装

サービスプリンシパル

を参考にサービスプリンシパルを作成し、 Fabric 容量リソースに 共同作成者 などの、リソースの更新ができる権限を付与します。

ノートブック

  1. python ノートブックを作成します。(UDF でもいいかも)
    image.png

  2. 先頭のセルをパラメータにします。

    python
    
    operation = "scale_up"
    
    

    image.png

  3. Fabric の操作を行う モジュールを作りました。

    python
    import json
    import time
    import re
    import requests
    from azure.identity import ClientSecretCredential
    
    
    class FabricOperations:
        def __init__(
            self,
            credential: ClientSecretCredential,
            subscription_id: str,
            resource_group_name: str,
            capacity_name: str,
            api_version: str = "2023-11-01",
            timeout: int = 60,
            allowed_f_skus=(2, 4, 8, 16, 32, 64, 128),  # 必要なら拡張
        ):
            self.credential = credential
            self.subscription_id = subscription_id
            self.resource_group_name = resource_group_name
            self.capacity_name = capacity_name
            self.api_version = api_version
            self.timeout = timeout
            self.allowed_f_skus = tuple(sorted(set(int(x) for x in allowed_f_skus)))
    
        @property
        def capacity_url(self) -> str:
            return (
                f"https://management.azure.com/subscriptions/{self.subscription_id}"
                f"/resourceGroups/{self.resource_group_name}"
                f"/providers/Microsoft.Fabric/capacities/{self.capacity_name}"
                f"?api-version={self.api_version}"
            )
    
        def _headers(self) -> dict:
            token = self.credential.get_token("https://management.azure.com/.default").token
            return {
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json",
                "Accept": "application/json",
            }
    
        # --- 基本 ---
        def get_capacity(self) -> dict:
            resp = requests.get(self.capacity_url, headers=self._headers(), timeout=self.timeout)
            if not resp.ok:
                raise RuntimeError(f"GET failed: {resp.status_code} {resp.text}")
            return resp.json()
    
        def change_sku(
            self,
            sku_name: str,
            tier: str = "Fabric",
            poll_interval_sec: int = 5,
            max_wait_sec: int = 600,
        ) -> dict:
            body = {"sku": {"name": sku_name, "tier": tier}}
    
            resp = requests.patch(
                self.capacity_url,
                headers=self._headers(),
                json=body,
                timeout=self.timeout,
            )
    
            if resp.status_code == 200:
                return resp.json()
    
            if resp.status_code == 202:
                poll_url = resp.headers.get("Azure-AsyncOperation") or resp.headers.get("Location")
                if not poll_url:
                    raise RuntimeError(f"202 but no polling URL. headers={dict(resp.headers)}")
    
                retry_after = int(resp.headers.get("Retry-After", str(poll_interval_sec)))
                deadline = time.time() + max_wait_sec
    
                while True:
                    if time.time() > deadline:
                        raise TimeoutError(f"LRO timeout after {max_wait_sec}s. poll_url={poll_url}")
    
                    time.sleep(retry_after)
                    poll = requests.get(poll_url, headers=self._headers(), timeout=self.timeout)
                    poll.raise_for_status()
                    pj = poll.json()
                    status = pj.get("status")  # Running / Succeeded / Failed / Canceled
    
                    if status in ("Succeeded", "Failed", "Canceled"):
                        if status != "Succeeded":
                            raise RuntimeError(f"LRO ended with {status}: {pj}")
                        break
    
                    retry_after = int(poll.headers.get("Retry-After", str(poll_interval_sec)))
    
                return self.get_capacity()
    
            raise RuntimeError(f"PATCH failed: {resp.status_code} {resp.text}")
    
        # --- SKUユーティリティ ---
        @staticmethod
        def _parse_f_sku(sku: str) -> int:
            # "F2" -> 2
            m = re.fullmatch(r"F(\d+)", (sku or "").strip().upper())
            if not m:
                raise ValueError(f"Unsupported sku format: {sku!r} (expected like 'F2')")
            return int(m.group(1))
    
        @staticmethod
        def _make_f_sku(size: int) -> str:
            return f"F{int(size)}"
    
        def get_current_sku(self) -> str:
            cap = self.get_capacity()
            sku = (cap.get("sku") or {}).get("name")
            if not sku:
                raise RuntimeError(f"SKU not found in response: {cap}")
            return sku
    
        def _clamp_to_allowed(self, size: int, min_f: int, max_f: int) -> int:
            lo = max(int(min_f), self.allowed_f_skus[0])
            hi = min(int(max_f), self.allowed_f_skus[-1])
            if lo > hi:
                raise ValueError(f"Invalid bounds: min_f={min_f}, max_f={max_f}, allowed={self.allowed_f_skus}")
    
            # bounds内 & allowedに存在する最大/最小へ寄せる(存在しないF値を作らない)
            candidates = [x for x in self.allowed_f_skus if lo <= x <= hi]
            if not candidates:
                raise ValueError(f"No allowed SKUs within bounds: min_f={min_f}, max_f={max_f}")
    
            # size を candidates の範囲内へ丸め
            if size <= candidates[0]:
                return candidates[0]
            if size >= candidates[-1]:
                return candidates[-1]
            # candidates にない size が来たら「一番近い下側」に寄せる(安全側)
            lower = [x for x in candidates if x <= size]
            return lower[-1] if lower else candidates[0]
    
        # --- ここが要望:scale up/down(上限・下限付き) ---
        def scale_up(
            self,
            max_f: int = 128,
            min_f: int = 2,
            dry_run: bool = False,
            **change_kwargs,
        ) -> dict:
            """
            例: F2 -> F4, F4 -> F8(2倍)
            max_f/min_f: 上限・下限
            dry_run: Trueなら変更せずに計算結果だけ返す
            change_kwargs: change_sku に渡す(poll_interval_sec, max_wait_sec など)
            """
            current = self.get_current_sku()
            cur_size = self._parse_f_sku(current)
    
            target_size = cur_size * 2
            target_size = self._clamp_to_allowed(target_size, min_f=min_f, max_f=max_f)
            target = self._make_f_sku(target_size)
    
            if dry_run:
                return {"current": current, "target": target, "action": "scale_up", "applied": False}
    
            if target == current:
                return {"current": current, "target": target, "action": "scale_up", "applied": False, "reason": "at_bound"}
    
            result = self.change_sku(target, **change_kwargs)
            return {"current": current, "target": target, "action": "scale_up", "applied": True, "result": result}
    
        def scale_down(
            self,
            min_f: int = 2,
            max_f: int = 128,
            dry_run: bool = False,
            **change_kwargs,
        ) -> dict:
            """
            例: F4 -> F2(半分)
            min_f/max_f: 下限・上限
            """
            current = self.get_current_sku()
            cur_size = self._parse_f_sku(current)
    
            target_size = max(1, cur_size // 2)
            target_size = self._clamp_to_allowed(target_size, min_f=min_f, max_f=max_f)
            target = self._make_f_sku(target_size)
    
            if dry_run:
                return {"current": current, "target": target, "action": "scale_down", "applied": False}
    
            if target == current:
                return {"current": current, "target": target, "action": "scale_down", "applied": False, "reason": "at_bound"}
    
            result = self.change_sku(target, **change_kwargs)
            return {"current": current, "target": target, "action": "scale_down", "applied": True, "result": result}
    
    
    

    生成 AI を使用してコードを作成しています。業務利用時には内容をご確認のうえ参考にしてください。

  4. パラメータ変数の内容に応じて

    python
    
    cred = ClientSecretCredential(
        tenant_id="<tenant id>", 
        client_id="<サービスプリンシパルのcliend id>", 
        client_secret="<サービスプリンシパルのシークレット>"
    )
    ops = FabricOperations(
        credential=cred,
        subscription_id="<サブスクリプションID>",
        resource_group_name="<リソースグループ名>",
        capacity_name="<容量リソース名>",
        allowed_f_skus=(2,4,8,16,32,64),
    )
    
    # 現在状態
    current_sku = ops.get_current_sku()
    print(f"現在の SKU: {current_sku}")
    
    if operation == "scale_up" :
        result = ops.scale_up()
    elif operation == "scale_down" :
        result = ops.scale_down()
    
    new_sku = ops.get_current_sku() 
    
    print(f"新しい SKU: {new_sku}")
    
    
    
    

アクティベーター (スケールアップ)

このノートブックをスケールアップとして実行する条件を検討します。

  1. ウィンドウサイズと条件=Trueが継続した時間を観察してみます。
    9:14 頃から9:24 の10分状態が継続したタイミングでアラートされることがわかります。
    その後は連続で9:34 までアラートされています。

    image.png

    仮にこの条件でノートブックを実行してスケールアップするような操作をした場合、スケールアップを待つ間に、もう一度スケールアップされてしまう可能性があります。

    ノートブックに現在の SKU を渡してしまい、ターゲットの SKU を算出するような処理にしてもいいですが、無駄なノートブックは起こしたくありません。

    したがって、ノートブックの更新にかかる時間を考慮の上、スケーリングの条件監視間隔を検討する必要があります。
    つまり、ウィンドウサイズはノートブックの処理時間より広い必要です。ノートブックは早いときは数10秒で完了しますが、最短で5分少し余裕を見て10分程度が妥当でしょう

  2. 継続時間も考慮して今回はこのようにしてみます。閾値を10分以上超えたらアラートです。矢印のタイミングでスケールさせます。(閾値はテストしやすく50にしてみます)
    image.png

    image.png

  3. F64 以上にはしたくないので、プロパティフィルターもかけてみます。
    image.png

    このあたりはスロットリングのアラートも最大SKUの場合のみアラートなどで構成するのもいいかもしれません

  4. 最後に、ノートブックのアクションを構成します。
    image.png

  5. 確認のためテストしてみます。うまく構成できているようです。
    image.png

    image.png
    スケールアップにより、使用率も半減していることがわかります。
    image.png

アクティベーター (スケールダウン)

このノートブックをスケールダウンとして実行する条件を検討します。

  1. スケールダウンは条件を逆にしてみます。最大値が10分以上50%を下回っていたらサイズダウンします。
    image.png
    image.png

  2. テストを確認します。大丈夫そうですね
    image.png
    image.png

確認

  1. しばらく未使用のまま放っておきました。F8->F2までスケールダウンしています。

    image.png
    image.png

  2. 次に、負荷をあげてみます。F2->F8までスケールアップしています。それに伴い、使用率も低下しています。
    image.png
    image.png
    image.png

このように自動スケールを構成することができました。調整することでより適切なスケーリングを実行できるかと思います。

以上です。参考になれば幸いです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?