0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft FabricのウェアハウスにおけるSQL接続とPythonからのデータ書き込み手法

Last updated at Posted at 2025-04-05

Microsoft FabricのウェアハウスにおけるSQL接続とPythonからのデータ書き込み手法

Microsoft Fabricは,データ分析とAI処理のためのオールインワンの分析プラットフォームである.Fabricのウェアハウスでは,SQLエンドポイントを利用して外部からデータの読み書きが可能である.本記事では,Pythonアプリケーションから直接Fabricウェアハウスにデータを書き込む方法について詳しく解説する.

image.png

1. 前提条件

  • Microsoft Fabricの有効なライセンス(ProまたはPremium)
  • Fabricワークスペースの管理権限
  • ウェアハウスが作成済みであること
  • Python 3.6以上の実行環境
  • 必要なPythonパッケージ:pyodbcまたはpandas

2. Fabricウェアハウスへの接続準備

2.1 SQLエンドポイントの有効化と確認

まずFabricポータルで対象のウェアハウスにアクセスし,SQLエンドポイントを確認する必要がある.

  1. Microsoft Fabricポータル(https://app.fabric.microsoft.com)にログインする
  2. ワークスペースに移動し,対象のウェアハウスを開く
  3. 「設定」メニューを選択し,「接続」タブを確認する
  4. 「SQL接続文字列」セクションに表示される接続情報を確認する

image.png

2.2 ファイアウォール設定

外部からの接続を許可するため,ファイアウォール設定を構成する.

  1. ウェアハウスの「設定」から「接続」タブに移動
  2. 「ファイアウォール設定」セクションで「Azure サービスからのアクセスを許可」を有効にする
  3. 「クライアントIPアドレスの追加」をクリックして,現在使用しているPCのIPアドレスを許可リストに追加する

2.3 認証方法の選択

Fabricウェアハウスに接続するための認証方法を選択する.一般的に次の2つの方法がある:

  1. SQL認証 - ユーザー名とパスワードを使用
  2. Azure Active Directory認証 - Microsoft Entraアカウント(旧Azure AD)を使用

セキュリティ要件に応じて適切な認証方法を選択する.本記事ではどちらの方法も解説する.

3. PythonからのSQL接続とデータ書き込み

3.1 必要なライブラリのインストール

まず,必要なPythonライブラリをインストールする.

pip install pyodbc pandas

3.2 SQL認証による接続とデータ挿入

import pyodbc
import pandas as pd

# 接続文字列の設定
server = "ワークスペース名.sql.fabric.microsoft.com"  # Fabricポータルで確認したサーバー名
database = "ウェアハウス名"  # ウェアハウス名
username = "ユーザー名"  # SQLユーザー名
password = "パスワード"  # SQLパスワード

# 接続文字列
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}"

# 接続オブジェクトを作成
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

# サンプルデータを挿入するSQL文
insert_query = """
INSERT INTO schema_name.table_name (column1, column2, column3)
VALUES (?, ?, ?)
"""

# 挿入するデータ
data_to_insert = [
    ('値1', 100, '2023-01-15'),
    ('値2', 200, '2023-01-16'),
    ('値3', 300, '2023-01-17')
]

# データ挿入の実行
try:
    cursor.executemany(insert_query, data_to_insert)
    conn.commit()
    print(f"{len(data_to_insert)}行のデータを正常に挿入しました。")
except Exception as e:
    conn.rollback()
    print(f"エラーが発生しました: {str(e)}")
finally:
    cursor.close()
    conn.close()

3.3 Azure Active Directory認証による接続

Azure AD認証を使用する場合は,Microsoft Authentication Libraryを利用する方法が推奨される.

pip install msal pyodbc pandas
import pyodbc
import pandas as pd
import msal

# AAD認証情報
tenant_id = "テナントID"  # Azure ADのテナントID
client_id = "クライアントID"  # アプリケーション(クライアント)ID
client_secret = "クライアントシークレット"  # クライアントシークレット
server = "ワークスペース名.sql.fabric.microsoft.com"
database = "ウェアハウス名"

# MSALを使用してトークンを取得
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = ["https://database.windows.net/.default"]

app = msal.ConfidentialClientApplication(
    client_id=client_id,
    client_credential=client_secret,
    authority=authority
)

result = app.acquire_token_for_client(scopes=scope)

if "access_token" in result:
    token = result["access_token"]
else:
    print(f"トークン取得エラー: {result.get('error')}: {result.get('error_description')}")
    exit()

# ODBCドライバーでAzure AD認証を使用して接続
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Authentication=ActiveDirectoryServicePrincipal;UID={client_id};PWD={client_secret}"

# 接続と処理
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

# サンプルデータを挿入
insert_query = """
INSERT INTO schema_name.table_name (column1, column2, column3)
VALUES (?, ?, ?)
"""

data_to_insert = [
    ('AAD値1', 100, '2023-01-15'),
    ('AAD値2', 200, '2023-01-16')
]

try:
    cursor.executemany(insert_query, data_to_insert)
    conn.commit()
    print(f"{len(data_to_insert)}行のデータを正常に挿入しました。")
except Exception as e:
    conn.rollback()
    print(f"エラーが発生しました: {str(e)}")
finally:
    cursor.close()
    conn.close()

3.4 Pandasを使用したデータフレームからの書き込み

大量のデータを効率的に書き込む場合,Pandasのto_sqlメソッドが便利である.

import pandas as pd
import urllib
import sqlalchemy

# サンプルデータフレームの作成
df = pd.DataFrame({
    'column1': ['値A', '値B', '値C', '値D'],
    'column2': [150, 250, 350, 450],
    'column3': pd.date_range(start='2023-01-01', periods=4)
})

# SQL Server接続用のURLエンコードされた接続文字列
params = urllib.parse.quote_plus(
    "DRIVER={ODBC Driver 17 for SQL Server};"
    f"SERVER=ワークスペース名.sql.fabric.microsoft.com;"
    f"DATABASE=ウェアハウス名;"
    f"UID=ユーザー名;"
    f"PWD=パスワード"
)

# SQLAlchemyエンジンを作成
engine = sqlalchemy.create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

# データフレームをSQLテーブルに書き込む
try:
    df.to_sql(
        'table_name',  # テーブル名
        engine,
        schema='schema_name',  # スキーマ名
        if_exists='append',    # 既存テーブルに追加
        index=False,
        chunksize=1000         # 一度に処理する行数
    )
    print(f"{len(df)}行のデータを正常に挿入しました。")
except Exception as e:
    print(f"エラーが発生しました: {str(e)}")
finally:
    engine.dispose()

4. 高度な利用方法

4.1 バルクインサート

大量のデータを高速に挿入する場合は,ODBCドライバーのファストロード機能を使用する方法がある.

import pyodbc
import pandas as pd
import numpy as np
from datetime import datetime

# 大量のテストデータを生成
num_rows = 100000
test_data = {
    'id': np.arange(1, num_rows + 1),
    'name': [f'名前{i}' for i in range(1, num_rows + 1)],
    'value': np.random.rand(num_rows) * 1000,
    'date': [datetime.now().strftime('%Y-%m-%d') for _ in range(num_rows)]
}

df = pd.DataFrame(test_data)

# 接続文字列
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER=ワークスペース名.sql.fabric.microsoft.com;DATABASE=ウェアハウス名;UID=ユーザー名;PWD=パスワード"
conn = pyodbc.connect(conn_str)

# バルク挿入用の準備
cursor = conn.cursor()

# テーブルが存在しない場合は作成
create_table_sql = """
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'bulk_test' AND schema_id = SCHEMA_ID('dbo'))
BEGIN
    CREATE TABLE dbo.bulk_test (
        id INT PRIMARY KEY,
        name NVARCHAR(100),
        value FLOAT,
        date DATE
    )
END
"""

cursor.execute(create_table_sql)
conn.commit()

# バッチサイズ
batch_size = 5000
total_rows = len(df)
batches = (total_rows + batch_size - 1) // batch_size

start_time = datetime.now()
print(f"開始時刻: {start_time}")

for i in range(batches):
    start_idx = i * batch_size
    end_idx = min((i + 1) * batch_size, total_rows)
    batch = df.iloc[start_idx:end_idx]
    
    # バッチデータをタプルのリストに変換
    values = [tuple(x) for x in batch.values]
    
    # 複数行一括挿入
    insert_sql = "INSERT INTO dbo.bulk_test (id, name, value, date) VALUES (?, ?, ?, ?)"
    cursor.fast_executemany = True  # 高速バルク挿入を有効化
    cursor.executemany(insert_sql, values)
    conn.commit()
    
    print(f"バッチ {i+1}/{batches} 完了: {end_idx - start_idx}行挿入")

end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"完了: 合計{total_rows}行を{duration:.2f}秒で挿入 (平均: {total_rows/duration:.2f}行/秒)")

cursor.close()
conn.close()

4.2 ストアドプロシージャの実行

Fabricウェアハウスでは,ストアドプロシージャを作成し,Pythonから実行することも可能である.

import pyodbc

# 接続設定
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER=ワークスペース名.sql.fabric.microsoft.com;DATABASE=ウェアハウス名;UID=ユーザー名;PWD=パスワード"
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

# ストアドプロシージャの作成例
create_proc = """
CREATE OR ALTER PROCEDURE dbo.InsertSampleData
    @name NVARCHAR(100),
    @value FLOAT,
    @date DATE
AS
BEGIN
    INSERT INTO dbo.sample_table (name, value, date_column)
    VALUES (@name, @value, @date);
    
    SELECT SCOPE_IDENTITY() AS new_id;
END
"""

try:
    cursor.execute(create_proc)
    conn.commit()
    print("ストアドプロシージャを作成しました。")
    
    # ストアドプロシージャの実行
    params = ('テスト名', 123.45, '2023-04-01')
    cursor.execute("{CALL dbo.InsertSampleData(?, ?, ?)}", params)
    
    # 結果の取得(SCOPE_IDENTITYの値)
    result = cursor.fetchone()
    new_id = result.new_id if result else None
    
    conn.commit()
    print(f"データを挿入しました。新しいID: {new_id}")
    
except Exception as e:
    conn.rollback()
    print(f"エラーが発生しました: {str(e)}")
finally:
    cursor.close()
    conn.close()

5. トラブルシューティング

5.1 一般的な接続エラー

「サーバーに接続できません」エラー

  • ファイアウォール設定を確認する:クライアントIPアドレスが許可リストに含まれているか
  • サーバー名の正確性を確認する:完全修飾ドメイン名(FQDN)が正しいか
  • ネットワーク接続を確認する:VPN経由の場合は,VPNが正しく構成されているか

「ログイン失敗」エラー

  • ユーザー名とパスワードが正しいか確認する
  • アカウントがロックされていないか確認する
  • Azure AD認証の場合:正しい権限が付与されているか確認する

5.2 データ挿入エラー

「主キー違反」エラー

既に存在するキー値を挿入しようとした場合に発生する.私の場合,このエラーで外部からFabric への書き込みに苦戦した.

# 挿入前に存在確認を行う例
def insert_with_check(cursor, table_name, id_column, id_value, data_tuple):
    # 既存データ確認
    check_query = f"SELECT 1 FROM {table_name} WHERE {id_column} = ?"
    cursor.execute(check_query, (id_value,))
    exists = cursor.fetchone() is not None
    
    if not exists:
        # 存在しない場合のみ挿入
        columns = ", ".join([f"{col}" for col in ["id", "name", "value", "date"]])
        placeholders = ", ".join(["?" for _ in range(len(data_tuple))])
        insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
        cursor.execute(insert_query, data_tuple)
        return True
    return False

# 使用例
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
data = (101, '新しい名前', 123.45, '2023-05-15')
inserted = insert_with_check(cursor, "dbo.sample_table", "id", 101, data)
if inserted:
    conn.commit()
    print("データを挿入しました")
else:
    print("IDが既に存在するため、挿入をスキップしました")

「列の値が特定の制約に違反しています」エラー

データ型の不一致や制約違反が原因である.

# データ型変換を明示的に行う例
def safe_insert(cursor, table_name, data_dict):
    # カラムの型情報を取得
    cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name.split('.')[-1]}'")
    columns_info = {row.COLUMN_NAME: row.DATA_TYPE for row in cursor.fetchall()}
    
    # データを適切な型に変換
    for col, value in data_dict.items():
        if col in columns_info:
            data_type = columns_info[col].lower()
            if 'int' in data_type and not isinstance(value, int):
                try:
                    data_dict[col] = int(value)
                except:
                    data_dict[col] = None
            elif 'float' in data_type or 'real' in data_type:
                try:
                    data_dict[col] = float(value)
                except:
                    data_dict[col] = None
    
    # 挿入クエリの構築
    columns = ", ".join(data_dict.keys())
    placeholders = ", ".join(["?" for _ in range(len(data_dict))])
    values = tuple(data_dict.values())
    
    insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
    cursor.execute(insert_query, values)

5.3 パフォーマンスの問題

大量データ挿入時のパフォーマンスを向上させるヒント:

  1. バッチ処理:一度に複数行を挿入する
  2. インデックスの一時無効化:大量挿入前にインデックスを無効化し,挿入後に再構築
  3. トランザクション制御:適切なサイズでトランザクションを分割
  4. コネクションプーリング:接続を再利用する
  5. 適切なドライバー設定:fast_executemanyオプションを使用する

6. まとめ

Microsoft Fabricのウェアハウスは,SQLエンドポイントを通じて外部アプリケーションからのデータ操作をサポートしている.本記事では,Pythonを使用してFabricウェアハウスに接続し,データを書き込む方法を解説した.

要点をまとめると:

  1. 接続設定

    • ファイアウォール設定の構成
    • 適切な認証方法の選択(SQL認証またはAzure AD認証)
    • 適切なODBCドライバーの使用
  2. データ書き込み方法

    • pyodbcを使用した基本的な挿入
    • Pandasを使用したデータフレームからの書き込み
    • バルクインサート処理による大量データの高速挿入
  3. 高度な使用方法

    • ストアドプロシージャを利用した処理
    • エラー処理とトランザクション管理
  4. トラブルシューティング

    • 一般的な接続エラーへの対応
    • データ挿入エラーの解決策
    • パフォーマンス最適化の方法

Microsoft Fabricは比較的新しいプラットフォームであるが,標準的なSQL接続をサポートしているため,既存のPythonアプリケーションからのデータ書き込みが容易である.データウェアハウスへのETL処理やアプリケーション連携など,様々なシナリオで活用できる.


本記事が,Microsoft Fabricウェアハウスへの外部データ連携に役立てば幸いである.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?