はじめに
で着想を得たので、自前で OAuth っぽい仕組みを作って SharePoint ドキュメントをレイクハウスに増分取込する仕組みを実装してみました。
以下のメリットがあるかなと思います
- データパイプラインでの実装はここまでくると大変だがNotebookであればカスタマイズが楽
- python カーネルで動作させるので CU 消費が比較的軽い
今回作成したコードは多分に GPT に実装をまかせています。シークレット周りのハンドリングやパフォーマンス検証など十分に確認のうえご利用ください。
セットアップ
-
アプリケーションの作成
https://learn.microsoft.com/ja-jp/entra/identity-platform/quickstart-register-app を参考に、アプリを登録します。その際、リダイレクト URL を設定してください。
後続の設定とそろえる必要があるのですが、localhostで構いません。
-
Key Vault のセットアップ
アプリのシークレットを作成したらシークレット値をkey vault に保存します。※RBACによる権限が必要です。ここでも後続の設定と名称をそろえる必要があります。画像ではほかのシークレットも映ってますが、ここではclient-secretだけです
リフレッシュトークンの設定
以下の流れで Graph API を利用するための リフレッシュトークンを保存します。
-
SharePoint サイトの ID を取得するための URL を発行します
pythonsite_url = "https://example.sharepoint.com/sites/FabricDemo" #https://example.sharepoint.com/sites/example check_site_id_url = f"{site_url}//_api/site/id" print(check_site_id_url)
-
作成したアプリケーションを利用するための情報を設定します
python# 作業結果から入力 client_id="" tenant_id="" key_vault_url="https://commonakv.vault.azure.net/", # 設定内容に応じて変更 redirect_url = "http://localhost" client_secret_name = "client-secret" refresh_token_name = "refresh-token"
-
認可URLを発行します
python# ================= Do not edit code below this line. ================== # Authorization endpoint URL authorization_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize" scopes = ["Files.Read.All"] scope = " ".join(["https://graph.microsoft.com/{}".format(s) for s in scopes]) scope += ("&offline_access") # Construct the authorization request URL. auth_params = { "client_id": client_id, "redirect_uri": redirect_url, "response_type": "code", "scope": scope } auth_url = authorization_url + "?" + "&".join([f"{key}={value}" for key, value in auth_params.items()]) print(auth_url)
-
認可コードを設定
python# 前のセルからアクセスした先の URL の code 部分を設定。末尾の &session_state= などのコードではない部分が入らないように注意 code = ''
-
認可コードを利用してリフレッシュトークンを取得し、Key Vault に登録します。
python# Use the authentication code to get the refresh token. # ================= Do not edit code below this line. ================== import requests token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" client_secret = notebookutils.credentials.getSecret(key_vault_url,client_secret_name) # Request parameters token_params = { "client_id": client_id, "client_secret": client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": redirect_url, "scope": "profile openid email https://graph.microsoft.com/Files.Read.All offline_access" } # Send POST request to token endpoint response = requests.post(token_url, data=token_params) # Handle response if response.status_code == 200: token_data = response.json() refresh_token = token_data.get("refresh_token") notebookutils.credentials.putSecret(key_vault_url, refresh_token_name, refresh_token) print("Created secret refresh token") else: print("Error:", response.status_code, response.text)
ここでは、以下を行っています。
- 得られた認可コード(code)を使って、プログラムからトークンエンドポイント(/token)へPOSTリクエストを送る。
- 成功すると**リフレッシュトークン(refresh_token)**が返る。
- 得られたリフレッシュトークンをKey Vaultに保存し、今後はこれを使ってアクセストークンを自動取得・更新できるようにする。
ファイル取込の実施
トークンの設定ができたら Graph API を使用してファイルを取り込めます。以下を工夫してみました。
- 扱いやすくするためにSharePointSiteDownloader としてクラス化
- 取得済みのファイル情報を管理テーブルに反映し、増分取込に対応
-
先ほども設定した、各種プロパティを記載しておきます。table_nameが管理テーブルの名称になります。
python# 作業結果から入力 site_id = "" client_id = "" tenant_id = "" key_vault_url="https://<keyvault name>.vault.azure.net/" # 設定内容に応じて変更 redirect_url = "http://localhost" client_secret_name = "client-secret" refresh_token_name = "refresh-token" access_token_name = "access-token" table_name ="sharepoint_ingested"
-
関数は以下のようにしています。前述の クラスの実装のほか、DeltaTableを使うことで Python カーネル上で Delta の書き込み処理をします。
pythonimport os import requests import pandas as pd from datetime import datetime, timezone from deltalake import DeltaTable ,write_deltalake class SharePointSiteDownloader: def __init__( self, tenant_id, client_id, redirect_url, key_vault_url, client_secret_name, refresh_token_name, access_token_name, site_id ): self.tenant_id = tenant_id self.client_id = client_id self.redirect_url = redirect_url # 固定値 self.key_vault_url = key_vault_url self.client_secret_name = client_secret_name self.refresh_token_name = refresh_token_name self.access_token_name = access_token_name self.site_id = site_id self.graph_url = "https://graph.microsoft.com/v1.0" # --- Key Vaultからシークレットを取得 --- # client_secret self.client_secret = notebookutils.credentials.getSecret(key_vault_url, client_secret_name) # refresh_token self.refresh_token = notebookutils.credentials.getSecret(key_vault_url, refresh_token_name) # --- refresh_tokenでaccess_tokenを取得&両方Key Vaultに上書き --- self.access_token, new_refresh_token = self.get_access_token_and_update_refresh_token() # 最新化(必ずKey Vaultに上書きする notebookutils.credentials.putSecret(key_vault_url, access_token_name, self.access_token) if new_refresh_token and new_refresh_token != self.refresh_token: notebookutils.credentials.putSecret(key_vault_url, refresh_token_name, new_refresh_token) self.refresh_token = new_refresh_token def get_access_token_and_update_refresh_token(self): token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token" params = { "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, "grant_type": "refresh_token", "redirect_uri": self.redirect_url, "scope": "https://graph.microsoft.com/Files.Read.All offline_access" } response = requests.post(token_url, data=params) if response.status_code == 200: token_data = response.json() access_token = token_data.get("access_token") refresh_token = token_data.get("refresh_token") return access_token, refresh_token else: raise Exception(f"トークン取得失敗: {response.status_code}, {response.text}") def get_headers(self): # 毎回Key Vaultから取得した最新access_tokenでヘッダー作成 access_token = notebookutils.credentials.getSecret(self.key_vault_url, self.access_token_name) return {"Authorization": f"Bearer {access_token}"} def list_drives(self): url = f"{self.graph_url}/sites/{self.site_id}/drives" response = requests.get(url, headers=self.get_headers()) if response.status_code == 200: drives = response.json() print("ドライブ一覧:") for drive in drives.get("value", []): print(f"- {drive['name']} (ID: {drive['id']})") else: print("Error:", response.status_code, response.text) def get_and_download_all_items( self, drive_id, save_root="/lakehouse/default/Files/download", prev_max_lmts=None, ): """ 指定ドライブの全ファイルを(差分判定ありで)ダウンロードし、pandas DataFrameでメタデータを返す """ base_url = f"{self.graph_url}/sites/{self.site_id}/drives/{drive_id}" metadata_rows = [] if prev_max_lmts is not None: if not isinstance(prev_max_lmts, pd.Timestamp): prev_max_lmts = pd.to_datetime(prev_max_lmts, utc=True, errors="coerce") if prev_max_lmts.tz is None: prev_max_lmts = prev_max_lmts.tz_localize('UTC') def download_file_and_record(item, current_path): file_url = f"{base_url}/items/{item['id']}/content" response = requests.get(file_url, headers=self.get_headers(), stream=True) if response.status_code == 200: os.makedirs(current_path, exist_ok=True) file_path = os.path.join(current_path, item["name"]) with open(file_path, "wb") as f: for chunk in response.iter_content(8192): f.write(chunk) # print(f"✅ Downloaded: {file_path}") imported_at = datetime.now(timezone.utc) row = { "file_id": item.get("id"), "name": item.get("name"), "size_in_bytes": item.get("size"), "created_timestamp": item.get("createdDateTime"), "last_modified_timestamp": item.get("lastModifiedDateTime"), "site_id": self.site_id, "drive_id": drive_id, "file_folder_path": item.get("parentReference", {}).get("path"), "quick_xor_hash": item.get("file", {}).get("hashes", {}).get("quickXorHash"), "mime_type": item.get("file", {}).get("mimeType"), "web_url": item.get("webUrl"), "download_path": file_path } metadata_rows.append(row) else: print(f"❌ Failed to download {item['name']} ({response.status_code})") def get_items_with_paging(url): while url: response = requests.get(url, headers=self.get_headers()) if response.status_code != 200: print("Error:", response.status_code, response.text) return data = response.json() yield from data.get("value", []) url = data.get("@odata.nextLink") def traverse_folder(parent_path="/root", current_path=save_root): url = f"{base_url}{parent_path}/children" for item in get_items_with_paging(url): lmts = item.get("lastModifiedDateTime") lmts_dt = pd.to_datetime(lmts, utc=True, errors="coerce") if lmts else None is_new_or_updated = (prev_max_lmts is None) or (lmts_dt and lmts_dt > prev_max_lmts) if "folder" in item: subfolder_path = os.path.join(current_path, item["name"]) traverse_folder(f"/items/{item['id']}", subfolder_path) else: if is_new_or_updated: download_file_and_record(item, current_path) # else: # print(f"スキップ: {item.get('name')} (lastModifiedDateTime={lmts})") traverse_folder() df =pd.DataFrame(metadata_rows) return df def merge_table(df, table_name): table_path = f"/lakehouse/default/Tables/{table_name}" storage_options = { "bearer_token": notebookutils.credentials.getToken('storage'), "use_fabric_endpoint": "true", "allow_unsafe_rename": "true" } try: dt = DeltaTable(table_path, storage_options=storage_options) is_delta_exists = True except Exception as e: print("⚠️ Delta Table not found. Creating new one. Detail:",str(e)) is_delta_exists = False print(f"merge into {table_name}") if is_delta_exists: (dt.merge( source=df, predicate='target.file_id = source.file_id', source_alias="source", target_alias="target", ) .when_matched_update_all() .when_not_matched_insert_all() .execute()) else: write_deltalake( table_path, df, mode='append', schema_mode='merge', engine='rust', storage_options=storage_options )
-
最初に設定した情報を使って、client をインスタンス化します。ここまでの設定がうまくいっていれば、セルが成功し、 Key Vault にも Access token のシークレットが追加されます
pythonclient = SharePointSiteDownloader( tenant_id=tenant_id, client_id=client_id, redirect_url=redirect_url, key_vault_url=key_vault_url, client_secret_name=client_secret_name, refresh_token_name=refresh_token_name, access_token_name=access_token_name, site_id=site_id )
-
取込対象のdrive を探索します
pythonclient.list_drives()
-
取込対象のdrive_id をセットします。
pythondrive_id=''
-
管理テーブルの最終更新日をベースに(管理テーブルがまだなければ全件)、取得処理を実行します。
pythonimport duckdb table_path = f"/lakehouse/default/Tables/{table_name}" if notebookutils.fs.exists(table_path): print("Get max last_modified_timestamp...") query = f""" select max(cast(last_modified_timestamp as datetime)) as max_last_modified_timestamp from delta_scan('{table_path}') limit 1 """ max_lmts = duckdb.sql(query).df()['max_last_modified_timestamp'].iloc[0] print(f"max last_modified_timestamp:{max_lmts}") else: print("Set max last_modified_timestamp is None") max_lmts = None result_df = client.get_and_download_all_items( drive_id=drive_id, prev_max_lmts=max_lmts ) display(result_df) merge_table(result_df,table_name)
以上、参考になれば幸いです。