はじめに
以下の記事で監視の方法を紹介しました。
今度は、監視のアクションを容量のサイズ変更、つまり 自動スケーリング を構成してみます
方式
- Fabric 容量イベントを複数構成し、容量の使用率などの情報を容量別に供給します
- イベントストリームは容量イベントを統合し、監視構成のためのアクティベーター送信と、蓄積のためのイベントハウス送信を行います。
- イベントハウスでは、対象の容量を分析可能な状態に保管します。
- アクティベーターは容量ごと使用率を追跡し、スロットリング到達率に対する閾値条件で サイズ変更ノートブックを実行します。
- ノートブックで Fabric 容量(Azure リソース)を更新します。
イベントストリームでの複数のデータ統合や送信はそれなりに CU 消費があるので、今回の記事を参考に実装したらベンチマークをとることをおすすめします。
実装
サービスプリンシパル
を参考にサービスプリンシパルを作成し、 Fabric 容量リソースに 共同作成者 などの、リソースの更新ができる権限を付与します。
ノートブック
-
先頭のセルをパラメータにします。
pythonoperation = "scale_up" -
Fabric の操作を行う モジュールを作りました。
pythonimport json import time import re import requests from azure.identity import ClientSecretCredential class FabricOperations: def __init__( self, credential: ClientSecretCredential, subscription_id: str, resource_group_name: str, capacity_name: str, api_version: str = "2023-11-01", timeout: int = 60, allowed_f_skus=(2, 4, 8, 16, 32, 64, 128), # 必要なら拡張 ): self.credential = credential self.subscription_id = subscription_id self.resource_group_name = resource_group_name self.capacity_name = capacity_name self.api_version = api_version self.timeout = timeout self.allowed_f_skus = tuple(sorted(set(int(x) for x in allowed_f_skus))) @property def capacity_url(self) -> str: return ( f"https://management.azure.com/subscriptions/{self.subscription_id}" f"/resourceGroups/{self.resource_group_name}" f"/providers/Microsoft.Fabric/capacities/{self.capacity_name}" f"?api-version={self.api_version}" ) def _headers(self) -> dict: token = self.credential.get_token("https://management.azure.com/.default").token return { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json", } # --- 基本 --- def get_capacity(self) -> dict: resp = requests.get(self.capacity_url, headers=self._headers(), timeout=self.timeout) if not resp.ok: raise RuntimeError(f"GET failed: {resp.status_code} {resp.text}") return resp.json() def change_sku( self, sku_name: str, tier: str = "Fabric", poll_interval_sec: int = 5, max_wait_sec: int = 600, ) -> dict: body = {"sku": {"name": sku_name, "tier": tier}} resp = requests.patch( self.capacity_url, headers=self._headers(), json=body, timeout=self.timeout, ) if resp.status_code == 200: return resp.json() if resp.status_code == 202: poll_url = resp.headers.get("Azure-AsyncOperation") or resp.headers.get("Location") if not poll_url: raise RuntimeError(f"202 but no polling URL. headers={dict(resp.headers)}") retry_after = int(resp.headers.get("Retry-After", str(poll_interval_sec))) deadline = time.time() + max_wait_sec while True: if time.time() > deadline: raise TimeoutError(f"LRO timeout after {max_wait_sec}s. poll_url={poll_url}") time.sleep(retry_after) poll = requests.get(poll_url, headers=self._headers(), timeout=self.timeout) poll.raise_for_status() pj = poll.json() status = pj.get("status") # Running / Succeeded / Failed / Canceled if status in ("Succeeded", "Failed", "Canceled"): if status != "Succeeded": raise RuntimeError(f"LRO ended with {status}: {pj}") break retry_after = int(poll.headers.get("Retry-After", str(poll_interval_sec))) return self.get_capacity() raise RuntimeError(f"PATCH failed: {resp.status_code} {resp.text}") # --- SKUユーティリティ --- @staticmethod def _parse_f_sku(sku: str) -> int: # "F2" -> 2 m = re.fullmatch(r"F(\d+)", (sku or "").strip().upper()) if not m: raise ValueError(f"Unsupported sku format: {sku!r} (expected like 'F2')") return int(m.group(1)) @staticmethod def _make_f_sku(size: int) -> str: return f"F{int(size)}" def get_current_sku(self) -> str: cap = self.get_capacity() sku = (cap.get("sku") or {}).get("name") if not sku: raise RuntimeError(f"SKU not found in response: {cap}") return sku def _clamp_to_allowed(self, size: int, min_f: int, max_f: int) -> int: lo = max(int(min_f), self.allowed_f_skus[0]) hi = min(int(max_f), self.allowed_f_skus[-1]) if lo > hi: raise ValueError(f"Invalid bounds: min_f={min_f}, max_f={max_f}, allowed={self.allowed_f_skus}") # bounds内 & allowedに存在する最大/最小へ寄せる(存在しないF値を作らない) candidates = [x for x in self.allowed_f_skus if lo <= x <= hi] if not candidates: raise ValueError(f"No allowed SKUs within bounds: min_f={min_f}, max_f={max_f}") # size を candidates の範囲内へ丸め if size <= candidates[0]: return candidates[0] if size >= candidates[-1]: return candidates[-1] # candidates にない size が来たら「一番近い下側」に寄せる(安全側) lower = [x for x in candidates if x <= size] return lower[-1] if lower else candidates[0] # --- ここが要望:scale up/down(上限・下限付き) --- def scale_up( self, max_f: int = 128, min_f: int = 2, dry_run: bool = False, **change_kwargs, ) -> dict: """ 例: F2 -> F4, F4 -> F8(2倍) max_f/min_f: 上限・下限 dry_run: Trueなら変更せずに計算結果だけ返す change_kwargs: change_sku に渡す(poll_interval_sec, max_wait_sec など) """ current = self.get_current_sku() cur_size = self._parse_f_sku(current) target_size = cur_size * 2 target_size = self._clamp_to_allowed(target_size, min_f=min_f, max_f=max_f) target = self._make_f_sku(target_size) if dry_run: return {"current": current, "target": target, "action": "scale_up", "applied": False} if target == current: return {"current": current, "target": target, "action": "scale_up", "applied": False, "reason": "at_bound"} result = self.change_sku(target, **change_kwargs) return {"current": current, "target": target, "action": "scale_up", "applied": True, "result": result} def scale_down( self, min_f: int = 2, max_f: int = 128, dry_run: bool = False, **change_kwargs, ) -> dict: """ 例: F4 -> F2(半分) min_f/max_f: 下限・上限 """ current = self.get_current_sku() cur_size = self._parse_f_sku(current) target_size = max(1, cur_size // 2) target_size = self._clamp_to_allowed(target_size, min_f=min_f, max_f=max_f) target = self._make_f_sku(target_size) if dry_run: return {"current": current, "target": target, "action": "scale_down", "applied": False} if target == current: return {"current": current, "target": target, "action": "scale_down", "applied": False, "reason": "at_bound"} result = self.change_sku(target, **change_kwargs) return {"current": current, "target": target, "action": "scale_down", "applied": True, "result": result}生成 AI を使用してコードを作成しています。業務利用時には内容をご確認のうえ参考にしてください。
-
パラメータ変数の内容に応じて
pythoncred = ClientSecretCredential( tenant_id="<tenant id>", client_id="<サービスプリンシパルのcliend id>", client_secret="<サービスプリンシパルのシークレット>" ) ops = FabricOperations( credential=cred, subscription_id="<サブスクリプションID>", resource_group_name="<リソースグループ名>", capacity_name="<容量リソース名>", allowed_f_skus=(2,4,8,16,32,64), ) # 現在状態 current_sku = ops.get_current_sku() print(f"現在の SKU: {current_sku}") if operation == "scale_up" : result = ops.scale_up() elif operation == "scale_down" : result = ops.scale_down() new_sku = ops.get_current_sku() print(f"新しい SKU: {new_sku}")
アクティベーター (スケールアップ)
このノートブックをスケールアップとして実行する条件を検討します。
-
ウィンドウサイズと条件=Trueが継続した時間を観察してみます。
9:14 頃から9:24 の10分状態が継続したタイミングでアラートされることがわかります。
その後は連続で9:34 までアラートされています。仮にこの条件でノートブックを実行してスケールアップするような操作をした場合、スケールアップを待つ間に、もう一度スケールアップされてしまう可能性があります。
ノートブックに現在の SKU を渡してしまい、ターゲットの SKU を算出するような処理にしてもいいですが、無駄なノートブックは起こしたくありません。
したがって、ノートブックの更新にかかる時間を考慮の上、スケーリングの条件監視間隔を検討する必要があります。
つまり、ウィンドウサイズはノートブックの処理時間より広い必要です。ノートブックは早いときは数10秒で完了しますが、最短で5分少し余裕を見て10分程度が妥当でしょう -
継続時間も考慮して今回はこのようにしてみます。閾値を10分以上超えたらアラートです。矢印のタイミングでスケールさせます。(閾値はテストしやすく50にしてみます)

-
F64 以上にはしたくないので、プロパティフィルターもかけてみます。

このあたりはスロットリングのアラートも最大SKUの場合のみアラートなどで構成するのもいいかもしれません
アクティベーター (スケールダウン)
このノートブックをスケールダウンとして実行する条件を検討します。
確認
このように自動スケールを構成することができました。調整することでより適切なスケーリングを実行できるかと思います。
以上です。参考になれば幸いです。

















