Microsoft FabricのウェアハウスにおけるSQL接続とPythonからのデータ書き込み手法
Microsoft Fabricは,データ分析とAI処理のためのオールインワンの分析プラットフォームである.Fabricのウェアハウスでは,SQLエンドポイントを利用して外部からデータの読み書きが可能である.本記事では,Pythonアプリケーションから直接Fabricウェアハウスにデータを書き込む方法について詳しく解説する.
1. 前提条件
- Microsoft Fabricの有効なライセンス(ProまたはPremium)
- Fabricワークスペースの管理権限
- ウェアハウスが作成済みであること
- Python 3.6以上の実行環境
- 必要なPythonパッケージ:
pyodbc
またはpandas
2. Fabricウェアハウスへの接続準備
2.1 SQLエンドポイントの有効化と確認
まずFabricポータルで対象のウェアハウスにアクセスし,SQLエンドポイントを確認する必要がある.
- Microsoft Fabricポータル(https://app.fabric.microsoft.com)にログインする
- ワークスペースに移動し,対象のウェアハウスを開く
- 「設定」メニューを選択し,「接続」タブを確認する
- 「SQL接続文字列」セクションに表示される接続情報を確認する
2.2 ファイアウォール設定
外部からの接続を許可するため,ファイアウォール設定を構成する.
- ウェアハウスの「設定」から「接続」タブに移動
- 「ファイアウォール設定」セクションで「Azure サービスからのアクセスを許可」を有効にする
- 「クライアントIPアドレスの追加」をクリックして,現在使用しているPCのIPアドレスを許可リストに追加する
2.3 認証方法の選択
Fabricウェアハウスに接続するための認証方法を選択する.一般的に次の2つの方法がある:
- SQL認証 - ユーザー名とパスワードを使用
- Azure Active Directory認証 - Microsoft Entraアカウント(旧Azure AD)を使用
セキュリティ要件に応じて適切な認証方法を選択する.本記事ではどちらの方法も解説する.
3. PythonからのSQL接続とデータ書き込み
3.1 必要なライブラリのインストール
まず,必要なPythonライブラリをインストールする.
pip install pyodbc pandas
3.2 SQL認証による接続とデータ挿入
import pyodbc
import pandas as pd
# 接続文字列の設定
server = "ワークスペース名.sql.fabric.microsoft.com" # Fabricポータルで確認したサーバー名
database = "ウェアハウス名" # ウェアハウス名
username = "ユーザー名" # SQLユーザー名
password = "パスワード" # SQLパスワード
# 接続文字列
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}"
# 接続オブジェクトを作成
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
# サンプルデータを挿入するSQL文
insert_query = """
INSERT INTO schema_name.table_name (column1, column2, column3)
VALUES (?, ?, ?)
"""
# 挿入するデータ
data_to_insert = [
('値1', 100, '2023-01-15'),
('値2', 200, '2023-01-16'),
('値3', 300, '2023-01-17')
]
# データ挿入の実行
try:
cursor.executemany(insert_query, data_to_insert)
conn.commit()
print(f"{len(data_to_insert)}行のデータを正常に挿入しました。")
except Exception as e:
conn.rollback()
print(f"エラーが発生しました: {str(e)}")
finally:
cursor.close()
conn.close()
3.3 Azure Active Directory認証による接続
Azure AD認証を使用する場合は,Microsoft Authentication Libraryを利用する方法が推奨される.
pip install msal pyodbc pandas
import pyodbc
import pandas as pd
import msal
# AAD認証情報
tenant_id = "テナントID" # Azure ADのテナントID
client_id = "クライアントID" # アプリケーション(クライアント)ID
client_secret = "クライアントシークレット" # クライアントシークレット
server = "ワークスペース名.sql.fabric.microsoft.com"
database = "ウェアハウス名"
# MSALを使用してトークンを取得
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = ["https://database.windows.net/.default"]
app = msal.ConfidentialClientApplication(
client_id=client_id,
client_credential=client_secret,
authority=authority
)
result = app.acquire_token_for_client(scopes=scope)
if "access_token" in result:
token = result["access_token"]
else:
print(f"トークン取得エラー: {result.get('error')}: {result.get('error_description')}")
exit()
# ODBCドライバーでAzure AD認証を使用して接続
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Authentication=ActiveDirectoryServicePrincipal;UID={client_id};PWD={client_secret}"
# 接続と処理
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
# サンプルデータを挿入
insert_query = """
INSERT INTO schema_name.table_name (column1, column2, column3)
VALUES (?, ?, ?)
"""
data_to_insert = [
('AAD値1', 100, '2023-01-15'),
('AAD値2', 200, '2023-01-16')
]
try:
cursor.executemany(insert_query, data_to_insert)
conn.commit()
print(f"{len(data_to_insert)}行のデータを正常に挿入しました。")
except Exception as e:
conn.rollback()
print(f"エラーが発生しました: {str(e)}")
finally:
cursor.close()
conn.close()
3.4 Pandasを使用したデータフレームからの書き込み
大量のデータを効率的に書き込む場合,Pandasのto_sql
メソッドが便利である.
import pandas as pd
import urllib
import sqlalchemy
# サンプルデータフレームの作成
df = pd.DataFrame({
'column1': ['値A', '値B', '値C', '値D'],
'column2': [150, 250, 350, 450],
'column3': pd.date_range(start='2023-01-01', periods=4)
})
# SQL Server接続用のURLエンコードされた接続文字列
params = urllib.parse.quote_plus(
"DRIVER={ODBC Driver 17 for SQL Server};"
f"SERVER=ワークスペース名.sql.fabric.microsoft.com;"
f"DATABASE=ウェアハウス名;"
f"UID=ユーザー名;"
f"PWD=パスワード"
)
# SQLAlchemyエンジンを作成
engine = sqlalchemy.create_engine(f"mssql+pyodbc:///?odbc_connect={params}")
# データフレームをSQLテーブルに書き込む
try:
df.to_sql(
'table_name', # テーブル名
engine,
schema='schema_name', # スキーマ名
if_exists='append', # 既存テーブルに追加
index=False,
chunksize=1000 # 一度に処理する行数
)
print(f"{len(df)}行のデータを正常に挿入しました。")
except Exception as e:
print(f"エラーが発生しました: {str(e)}")
finally:
engine.dispose()
4. 高度な利用方法
4.1 バルクインサート
大量のデータを高速に挿入する場合は,ODBCドライバーのファストロード機能を使用する方法がある.
import pyodbc
import pandas as pd
import numpy as np
from datetime import datetime
# 大量のテストデータを生成
num_rows = 100000
test_data = {
'id': np.arange(1, num_rows + 1),
'name': [f'名前{i}' for i in range(1, num_rows + 1)],
'value': np.random.rand(num_rows) * 1000,
'date': [datetime.now().strftime('%Y-%m-%d') for _ in range(num_rows)]
}
df = pd.DataFrame(test_data)
# 接続文字列
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER=ワークスペース名.sql.fabric.microsoft.com;DATABASE=ウェアハウス名;UID=ユーザー名;PWD=パスワード"
conn = pyodbc.connect(conn_str)
# バルク挿入用の準備
cursor = conn.cursor()
# テーブルが存在しない場合は作成
create_table_sql = """
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'bulk_test' AND schema_id = SCHEMA_ID('dbo'))
BEGIN
CREATE TABLE dbo.bulk_test (
id INT PRIMARY KEY,
name NVARCHAR(100),
value FLOAT,
date DATE
)
END
"""
cursor.execute(create_table_sql)
conn.commit()
# バッチサイズ
batch_size = 5000
total_rows = len(df)
batches = (total_rows + batch_size - 1) // batch_size
start_time = datetime.now()
print(f"開始時刻: {start_time}")
for i in range(batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, total_rows)
batch = df.iloc[start_idx:end_idx]
# バッチデータをタプルのリストに変換
values = [tuple(x) for x in batch.values]
# 複数行一括挿入
insert_sql = "INSERT INTO dbo.bulk_test (id, name, value, date) VALUES (?, ?, ?, ?)"
cursor.fast_executemany = True # 高速バルク挿入を有効化
cursor.executemany(insert_sql, values)
conn.commit()
print(f"バッチ {i+1}/{batches} 完了: {end_idx - start_idx}行挿入")
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"完了: 合計{total_rows}行を{duration:.2f}秒で挿入 (平均: {total_rows/duration:.2f}行/秒)")
cursor.close()
conn.close()
4.2 ストアドプロシージャの実行
Fabricウェアハウスでは,ストアドプロシージャを作成し,Pythonから実行することも可能である.
import pyodbc
# 接続設定
conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER=ワークスペース名.sql.fabric.microsoft.com;DATABASE=ウェアハウス名;UID=ユーザー名;PWD=パスワード"
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
# ストアドプロシージャの作成例
create_proc = """
CREATE OR ALTER PROCEDURE dbo.InsertSampleData
@name NVARCHAR(100),
@value FLOAT,
@date DATE
AS
BEGIN
INSERT INTO dbo.sample_table (name, value, date_column)
VALUES (@name, @value, @date);
SELECT SCOPE_IDENTITY() AS new_id;
END
"""
try:
cursor.execute(create_proc)
conn.commit()
print("ストアドプロシージャを作成しました。")
# ストアドプロシージャの実行
params = ('テスト名', 123.45, '2023-04-01')
cursor.execute("{CALL dbo.InsertSampleData(?, ?, ?)}", params)
# 結果の取得(SCOPE_IDENTITYの値)
result = cursor.fetchone()
new_id = result.new_id if result else None
conn.commit()
print(f"データを挿入しました。新しいID: {new_id}")
except Exception as e:
conn.rollback()
print(f"エラーが発生しました: {str(e)}")
finally:
cursor.close()
conn.close()
5. トラブルシューティング
5.1 一般的な接続エラー
「サーバーに接続できません」エラー
- ファイアウォール設定を確認する:クライアントIPアドレスが許可リストに含まれているか
- サーバー名の正確性を確認する:完全修飾ドメイン名(FQDN)が正しいか
- ネットワーク接続を確認する:VPN経由の場合は,VPNが正しく構成されているか
「ログイン失敗」エラー
- ユーザー名とパスワードが正しいか確認する
- アカウントがロックされていないか確認する
- Azure AD認証の場合:正しい権限が付与されているか確認する
5.2 データ挿入エラー
「主キー違反」エラー
既に存在するキー値を挿入しようとした場合に発生する.私の場合,このエラーで外部からFabric への書き込みに苦戦した.
# 挿入前に存在確認を行う例
def insert_with_check(cursor, table_name, id_column, id_value, data_tuple):
# 既存データ確認
check_query = f"SELECT 1 FROM {table_name} WHERE {id_column} = ?"
cursor.execute(check_query, (id_value,))
exists = cursor.fetchone() is not None
if not exists:
# 存在しない場合のみ挿入
columns = ", ".join([f"{col}" for col in ["id", "name", "value", "date"]])
placeholders = ", ".join(["?" for _ in range(len(data_tuple))])
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
cursor.execute(insert_query, data_tuple)
return True
return False
# 使用例
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
data = (101, '新しい名前', 123.45, '2023-05-15')
inserted = insert_with_check(cursor, "dbo.sample_table", "id", 101, data)
if inserted:
conn.commit()
print("データを挿入しました")
else:
print("IDが既に存在するため、挿入をスキップしました")
「列の値が特定の制約に違反しています」エラー
データ型の不一致や制約違反が原因である.
# データ型変換を明示的に行う例
def safe_insert(cursor, table_name, data_dict):
# カラムの型情報を取得
cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name.split('.')[-1]}'")
columns_info = {row.COLUMN_NAME: row.DATA_TYPE for row in cursor.fetchall()}
# データを適切な型に変換
for col, value in data_dict.items():
if col in columns_info:
data_type = columns_info[col].lower()
if 'int' in data_type and not isinstance(value, int):
try:
data_dict[col] = int(value)
except:
data_dict[col] = None
elif 'float' in data_type or 'real' in data_type:
try:
data_dict[col] = float(value)
except:
data_dict[col] = None
# 挿入クエリの構築
columns = ", ".join(data_dict.keys())
placeholders = ", ".join(["?" for _ in range(len(data_dict))])
values = tuple(data_dict.values())
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
cursor.execute(insert_query, values)
5.3 パフォーマンスの問題
大量データ挿入時のパフォーマンスを向上させるヒント:
- バッチ処理:一度に複数行を挿入する
- インデックスの一時無効化:大量挿入前にインデックスを無効化し,挿入後に再構築
- トランザクション制御:適切なサイズでトランザクションを分割
- コネクションプーリング:接続を再利用する
- 適切なドライバー設定:fast_executemanyオプションを使用する
6. まとめ
Microsoft Fabricのウェアハウスは,SQLエンドポイントを通じて外部アプリケーションからのデータ操作をサポートしている.本記事では,Pythonを使用してFabricウェアハウスに接続し,データを書き込む方法を解説した.
要点をまとめると:
-
接続設定:
- ファイアウォール設定の構成
- 適切な認証方法の選択(SQL認証またはAzure AD認証)
- 適切なODBCドライバーの使用
-
データ書き込み方法:
- pyodbcを使用した基本的な挿入
- Pandasを使用したデータフレームからの書き込み
- バルクインサート処理による大量データの高速挿入
-
高度な使用方法:
- ストアドプロシージャを利用した処理
- エラー処理とトランザクション管理
-
トラブルシューティング:
- 一般的な接続エラーへの対応
- データ挿入エラーの解決策
- パフォーマンス最適化の方法
Microsoft Fabricは比較的新しいプラットフォームであるが,標準的なSQL接続をサポートしているため,既存のPythonアプリケーションからのデータ書き込みが容易である.データウェアハウスへのETL処理やアプリケーション連携など,様々なシナリオで活用できる.
本記事が,Microsoft Fabricウェアハウスへの外部データ連携に役立てば幸いである.