0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft Fabric の SQL 分析エンドポイントが即時最新化されない問題への暫定対処

Posted at

はじめに

レイクハウスに対して T-SQL でアクセスした際、情報が古く、Spark では正常に表示されるのに T-SQL でだけ表示データが古くなっている問題について、現時点での対処を案内します。

20250326時点の情報です

発生しやすい状況

メダリオンアーキテクチャの構成例として、Bronze/Silver を レイクハウス、Gold をウェアハウスにする、というパターンを採用するとよく確認されます。

この状況では レイクハウステーブルをソースにして ストアドプロシージャでウェアハウスに書き込みをする、というデータ処理開発を実装することになります。

image.png

この時、Silver データは正常に準備できているのに、後続のGold生成データが古いデータにアクセスしてしまう、といった事象が本記事のターゲットとなる事象です。

この問題は SQL分析エンドポイントのメタデータ自動同期プロセス が遅延することにより発生します。

同期プロセスは原則 1 分以内に完了するため普段は認知しにくく、遅くても1時間程度で解消することがほとんどであるため夜間バッチで起きていた同期遅延も朝には同期完了しており再現できなくなるなど、問題の特定に至ることが難しい点で現場を非常に混乱させます。

遅延は主に

  1. ワークスペース上に同期対象が多く、キュー待ちが起きている
  2. 容量の負荷が高くなっている
  3. レイクハウス上のDelta Parquet が最適化されていない

などの理由で発生します。

何が起きている

レイクハウス上の テーブル(Delta Parquet ファイル)はSQL 分析エンドポイントにメタデータ同期されることで初めて T-SQL エンジンでアクセスができるようになります。

ウェアハウスがクロスデータベースの対象にできるのは同じく ウェアハウスに分類されるSQL分析エンドポイント上のテーブルであるため、レイクハウス上のデータをウェアハウスで表示しようとすると、そのデータ鮮度はメタデータ同期の状況に完全に依存することになります。

image.png

軽減策

まずは、 https://learn.microsoft.com/ja-jp/fabric/data-warehouse/sql-analytics-endpoint-performance#guidance を参考に軽減策を確認します。

  1. ワークスペース上に同期対象が多く、キュー待ちが起きている
    ワークスペースにレイクハウスが多い場合は分割を検討します。
  2. 容量の負荷が高くなっている
    SKU がそもそも適切でない可能性があるのでスケールアップを検討します。
  3. レイクハウス上のDelta Parquet が最適化されていない
    これは処理自体のパフォーマンスを保つうえでも非常に重要ですので、必ず最適化を実施しましょう。

Delta Parquet の最適化

以下のコマンドを実行するか、GUIでのボタンをクリックします。

  1. ETL 処理後など、広範囲にデータを更新したあと、Optimize コマンドを実行して断片化したファイルを整理する

    
    OPTIMIZE <table|fileOrFolderPath>
    
    

    または、セッション、テーブルプロパティレベルでauto compact を設定し、書き込み時に断片化を減らすことも可能です。

    Optimize コマンドは CPU 重視のワークロードとなります。メモリ重視のワークロードである普段のバッチETL ジョブとは分けることが賢明とされています。

    https://learn.microsoft.com/ja-jp/azure/databricks/delta/optimize#whats-the-best-instance-type-to-run-optimize-bin-packing-and-z-ordering-on

  2. Vacuum コマンドで不要ファイルを削除する

    https://docs.delta.io/latest/delta-utility.html#-delta-vacuum を使用して、参照されなくなった古いファイルを削除します。

    
    VACUUM <table|fileOrFolderPath>
    
    

対処策

軽減策と併せて、メタデータ同期処理を明示的にトリガーしてしまう方法も案内します

以下は、MS の中の人の書いたブログで案内されている、 非公式 の APIを利用したメタデータ同期処理をアレンジしたものです。

このコードを実行すると、ノートブックに接続した、既定のレイクハウスの SQL 分析エンドポイント上のテーブルをすべて更新されます。

ウェアハウスのストアド処理前に実行するようにしておくとよいかと思います。



########################################################################################################
# Sample script to call the syncronisation between the Fabric Lakehouse and the SQL Endpoint
#        
## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
#  This script is a workaround until the documented API is released: https://learn.microsoft.com/en-us/fabric/release-plan/data-warehouse#refresh-sql-analytics-endpoint-rest-api
#
#sempy version 0.4.0 or higher

import json
import time
import struct
import sqlalchemy
import pyodbc
import notebookutils
import pandas as pd
from pyspark.sql import functions as fn
from datetime import datetime
import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException

def pad_or_truncate_string(input_string, length, pad_char=' '):
    # Truncate if the string is longer than the specified length
    if len(input_string) > length:
        return input_string[:length]
    # Pad if the string is shorter than the specified length
    return input_string.ljust(length, pad_char)

## not needed, but usefull
# tenant_id=spark.conf.get("trident.tenant.id")
workspace_id=fabric.resolve_workspace_id()
lakehouse_id=fabric.get_lakehouse_id()
# lakehouse_name=spark.conf.get("trident.lakehouse.name")
#sql_endpoint= fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['connectionString']

#Instantiate the client
client = fabric.FabricRestClient()

# This is the SQL endpoint I want to sync with the lakehouse, this needs to be the GUI
sqlendpoint = fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['id']
j_son = fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()
#display(j_son)

# URI for the call
uri = f"/v1.0/myorg/lhdatamarts/{sqlendpoint}"
# This is the action, we want to take
payload = {"commands":[{"$type":"MetadataRefreshExternalCommand"}]}

# code for testing 
# Test 1 : test creating a new table
#spark.sql("drop table if exists test1")
#spark.sql("create table test1 as SELECT * FROM lakehouse.Date LIMIT 1000")
# Test 2 : create a duplicate of the table with a different case
#df = spark.sql("SELECT * FROM lakehouse.Date LIMIT 1000")
#df.write.save("Tables/Test1")

# Call the REST API
response = client.post(uri,json= payload)
## You should add some error handling here

# return the response from json into an object we can get values from
data = json.loads(response.text)

# We just need this, we pass this to call to check the status
batchId = data["batchId"]

# the state of the sync i.e. inProgress
progressState = data["progressState"]

# URL so we can get the status of the sync
statusuri = f"/v1.0/myorg/lhdatamarts/{sqlendpoint}/batches/{batchId}"

statusresponsedata = ""

while progressState == 'inProgress' :
    # For the demo, I have removed the 1 second sleep.
    time.sleep(1)

    # check to see if its sync'ed
    #statusresponse = client.get(statusuri)

    # turn response into object
    statusresponsedata = client.get(statusuri).json()

    # get the status of the check
    progressState = statusresponsedata["progressState"]
    # show the status
    display(f"Sync state: {progressState}")

# if its good, then create a temp results, with just the info we care about
if progressState == 'success':
    table_details = [
        {
          'tableName': table['tableName'],
         'warningMessages': table.get('warningMessages', []),
         'lastSuccessfulUpdate': table.get('lastSuccessfulUpdate', 'N/A'),
         'tableSyncState':  table['tableSyncState'],
         'sqlSyncState':  table['sqlSyncState']
        }
        for table in statusresponsedata['operationInformation'][0]['progressDetail']['tablesSyncStatus']
    ]

# if its good, then shows the tables
if progressState == 'success':
    # Print the extracted details
    print("Extracted Table Details:")
    for detail in table_details:
        print(f"Table: {pad_or_truncate_string(detail['tableName'],30)}   Last Update: {detail['lastSuccessfulUpdate']}  tableSyncState: {detail['tableSyncState']}   Warnings: {detail['warningMessages']}")

## if there is a problem, show all the errors
if progressState == 'failure':
    # display error if there is an error
    display(statusresponsedata)

非公式のAPIであるため、変更が予期されることを前提に利用する必要があります。

なお、https://blog.fabric.microsoft.com/en-us/blog/whats-new-in-the-fabric-sql-analytics-endpoint?ft=All では今後公式の REST API が予定されているようです。

ブログでは同様に手動での更新ボタンがリリースされていることも案内されています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?