2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft Fabric Python Notebook で SharePoint ドキュメントライブラリのファイルの取込み処理を作ってみる【手動認可コードフロー】

Last updated at Posted at 2025-06-19

はじめに

で着想を得たので、自前で OAuth っぽい仕組みを作って SharePoint ドキュメントをレイクハウスに増分取込する仕組みを実装してみました。

以下のメリットがあるかなと思います

  • データパイプラインでの実装はここまでくると大変だがNotebookであればカスタマイズが楽
  • python カーネルで動作させるので CU 消費が比較的軽い

今回作成したコードは多分に GPT に実装をまかせています。シークレット周りのハンドリングやパフォーマンス検証など十分に確認のうえご利用ください。

セットアップ

  1. アプリケーションの作成
    https://learn.microsoft.com/ja-jp/entra/identity-platform/quickstart-register-app を参考に、アプリを登録します。その際、リダイレクト URL を設定してください。
    後続の設定とそろえる必要があるのですが、localhostで構いません。
    image.png

  2. Key Vault のセットアップ
    アプリのシークレットを作成したらシークレット値をkey vault に保存します。※RBACによる権限が必要です。ここでも後続の設定と名称をそろえる必要があります。画像ではほかのシークレットも映ってますが、ここではclient-secretだけです

    image.png

リフレッシュトークンの設定

以下の流れで Graph API を利用するための リフレッシュトークンを保存します。

  1. Python ノートブックとして起動
    image.png

  2. SharePoint サイトの ID を取得するための URL を発行します

    python
    
    site_url = "https://example.sharepoint.com/sites/FabricDemo"  #https://example.sharepoint.com/sites/example
    
    check_site_id_url = f"{site_url}//_api/site/id"
    print(check_site_id_url)
    
    

    image.png

  3. URL にアクセスして site_id を取得したらメモしておいてください
    image.png

  4. 作成したアプリケーションを利用するための情報を設定します

    python
    # 作業結果から入力
    
    client_id=""
    tenant_id=""
    key_vault_url="https://commonakv.vault.azure.net/",
    
    # 設定内容に応じて変更
    
    redirect_url = "http://localhost"
    client_secret_name = "client-secret"
    refresh_token_name = "refresh-token"
    
  5. 認可URLを発行します

    python
    
    
    # ================= Do not edit code below this line. ==================
    
    # Authorization endpoint URL
    
    authorization_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize"
    
    scopes = ["Files.Read.All"]
    scope = " ".join(["https://graph.microsoft.com/{}".format(s) for s in scopes])
    scope += ("&offline_access")
    
    # Construct the authorization request URL.
    
    auth_params = {
    "client_id": client_id,
    "redirect_uri": redirect_url,
    "response_type": "code",
    "scope": scope
    }
    
    auth_url = authorization_url + "?" + "&".join([f"{key}={value}" for key, value in auth_params.items()])
    
    print(auth_url)
    
    

    image.png

  6. 表示されたURL にアクセスしてファイル読取を委任する許可を与えます
    image.png

  7. 次の画面でエラーとなりますが、URL から 認可コードが取得できます。※赤枠は途中で囲ってます
    image.png

  8. 認可コードを設定

    python
    # 前のセルからアクセスした先の URL の code 部分を設定。末尾の &session_state= などのコードではない部分が入らないように注意
    
    code =  ''
    
  9. 認可コードを利用してリフレッシュトークンを取得し、Key Vault に登録します。

    python
    # Use the authentication code to get the refresh token.
    
    # ================= Do not edit code below this line. ==================
    
    import requests
    
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    
    client_secret = notebookutils.credentials.getSecret(key_vault_url,client_secret_name)
    
    # Request parameters
    token_params = {
        "client_id": client_id,
        "client_secret": client_secret,
        "code": code,
        "grant_type": "authorization_code",
        "redirect_uri": redirect_url,
        "scope": "profile openid email https://graph.microsoft.com/Files.Read.All offline_access"
    }
    
    
    # Send POST request to token endpoint
    response = requests.post(token_url, data=token_params)
    
    
    # Handle response
    if response.status_code == 200:
        token_data = response.json()
        refresh_token = token_data.get("refresh_token")
        notebookutils.credentials.putSecret(key_vault_url, refresh_token_name, refresh_token)
        print("Created secret refresh token")
    else:
        print("Error:", response.status_code, response.text)
    
    

    ここでは、以下を行っています。

    • 得られた認可コード(code)を使って、プログラムからトークンエンドポイント(/token)へPOSTリクエストを送る。
    • 成功すると**リフレッシュトークン(refresh_token)**が返る。
    • 得られたリフレッシュトークンをKey Vaultに保存し、今後はこれを使ってアクセストークンを自動取得・更新できるようにする。

ファイル取込の実施

トークンの設定ができたら Graph API を使用してファイルを取り込めます。以下を工夫してみました。

  • 扱いやすくするためにSharePointSiteDownloader としてクラス化
  • 取得済みのファイル情報を管理テーブルに反映し、増分取込に対応
  1. 先ほども設定した、各種プロパティを記載しておきます。table_nameが管理テーブルの名称になります。

    python
    
    # 作業結果から入力
    
    site_id = ""
    client_id = ""
    tenant_id = ""
    key_vault_url="https://<keyvault name>.vault.azure.net/"
    
    
    # 設定内容に応じて変更
    
    redirect_url = "http://localhost"
    client_secret_name = "client-secret"
    refresh_token_name = "refresh-token"
    access_token_name = "access-token"
    
    table_name ="sharepoint_ingested"
    
  2. 関数は以下のようにしています。前述の クラスの実装のほか、DeltaTableを使うことで Python カーネル上で Delta の書き込み処理をします。

    python
    import os
    import requests
    import pandas as pd
    from datetime import datetime, timezone
    from deltalake import DeltaTable ,write_deltalake
    
    class SharePointSiteDownloader:
        def __init__(
            self,
            tenant_id,
            client_id,
            redirect_url,
            key_vault_url,
            client_secret_name,
            refresh_token_name,
            access_token_name,
            site_id
        ):
            self.tenant_id = tenant_id
            self.client_id = client_id
            self.redirect_url = redirect_url   # 固定値
            
            self.key_vault_url = key_vault_url
            self.client_secret_name = client_secret_name
            self.refresh_token_name = refresh_token_name
            self.access_token_name = access_token_name
    
            self.site_id = site_id
            self.graph_url = "https://graph.microsoft.com/v1.0"
    
            # --- Key Vaultからシークレットを取得 ---
            # client_secret
            self.client_secret = notebookutils.credentials.getSecret(key_vault_url, client_secret_name)
            # refresh_token
            self.refresh_token = notebookutils.credentials.getSecret(key_vault_url, refresh_token_name)
    
            # --- refresh_tokenでaccess_tokenを取得&両方Key Vaultに上書き ---
            self.access_token, new_refresh_token = self.get_access_token_and_update_refresh_token()
    
            # 最新化(必ずKey Vaultに上書きする
            notebookutils.credentials.putSecret(key_vault_url, access_token_name, self.access_token)
            if new_refresh_token and new_refresh_token != self.refresh_token:
                notebookutils.credentials.putSecret(key_vault_url, refresh_token_name, new_refresh_token)
                self.refresh_token = new_refresh_token
    
        def get_access_token_and_update_refresh_token(self):
            token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
            params = {
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "refresh_token": self.refresh_token,
                "grant_type": "refresh_token",
                "redirect_uri": self.redirect_url,
                "scope": "https://graph.microsoft.com/Files.Read.All offline_access"
            }
            response = requests.post(token_url, data=params)
            if response.status_code == 200:
                token_data = response.json()
                access_token = token_data.get("access_token")
                refresh_token = token_data.get("refresh_token")
                return access_token, refresh_token
            else:
                raise Exception(f"トークン取得失敗: {response.status_code}, {response.text}")
    
        def get_headers(self):
            # 毎回Key Vaultから取得した最新access_tokenでヘッダー作成
            access_token = notebookutils.credentials.getSecret(self.key_vault_url, self.access_token_name)
            return {"Authorization": f"Bearer {access_token}"}
    
        def list_drives(self):
            url = f"{self.graph_url}/sites/{self.site_id}/drives"
            response = requests.get(url, headers=self.get_headers())
            if response.status_code == 200:
                drives = response.json()
                print("ドライブ一覧:")
                for drive in drives.get("value", []):
                    print(f"- {drive['name']} (ID: {drive['id']})")
            else:
                print("Error:", response.status_code, response.text)
    
    
        def get_and_download_all_items(
            self,
            drive_id,
            save_root="/lakehouse/default/Files/download",
            prev_max_lmts=None,
        ):
            """
            指定ドライブの全ファイルを(差分判定ありで)ダウンロードし、pandas DataFrameでメタデータを返す
            """
            base_url = f"{self.graph_url}/sites/{self.site_id}/drives/{drive_id}"
            metadata_rows = []
            if prev_max_lmts is not None:
                if not isinstance(prev_max_lmts, pd.Timestamp):
                    prev_max_lmts = pd.to_datetime(prev_max_lmts, utc=True, errors="coerce")
                if prev_max_lmts.tz is None:
                    prev_max_lmts = prev_max_lmts.tz_localize('UTC')
    
            def download_file_and_record(item, current_path):
                file_url = f"{base_url}/items/{item['id']}/content"
                response = requests.get(file_url, headers=self.get_headers(), stream=True)
                if response.status_code == 200:
                    os.makedirs(current_path, exist_ok=True)
                    file_path = os.path.join(current_path, item["name"])
                    with open(file_path, "wb") as f:
                        for chunk in response.iter_content(8192):
                            f.write(chunk)
                    # print(f"✅ Downloaded: {file_path}")
    
                    imported_at = datetime.now(timezone.utc)
                    row = {
                        "file_id": item.get("id"),
                            "name": item.get("name"),
                            "size_in_bytes": item.get("size"),
                            "created_timestamp": item.get("createdDateTime"),
                            "last_modified_timestamp": item.get("lastModifiedDateTime"),
                            "site_id": self.site_id,
                            "drive_id": drive_id,
                            "file_folder_path": item.get("parentReference", {}).get("path"),
                            "quick_xor_hash": item.get("file", {}).get("hashes", {}).get("quickXorHash"),
                            "mime_type": item.get("file", {}).get("mimeType"),
                            "web_url": item.get("webUrl"),
                            "download_path": file_path
    
                    }
                    metadata_rows.append(row)
                else:
                    print(f"❌ Failed to download {item['name']} ({response.status_code})")
    
            def get_items_with_paging(url):
                while url:
                    response = requests.get(url, headers=self.get_headers())
                    if response.status_code != 200:
                        print("Error:", response.status_code, response.text)
                        return
                    data = response.json()
                    yield from data.get("value", [])
                    url = data.get("@odata.nextLink")
    
            def traverse_folder(parent_path="/root", current_path=save_root):
                url = f"{base_url}{parent_path}/children"
                for item in get_items_with_paging(url):
                    lmts = item.get("lastModifiedDateTime")
                    lmts_dt = pd.to_datetime(lmts, utc=True, errors="coerce") if lmts else None
                    is_new_or_updated = (prev_max_lmts is None) or (lmts_dt and lmts_dt > prev_max_lmts)
    
                    if "folder" in item:
                        subfolder_path = os.path.join(current_path, item["name"])
                        traverse_folder(f"/items/{item['id']}", subfolder_path)
                    else:
                        if is_new_or_updated:
                            download_file_and_record(item, current_path)
                        # else:
                        #     print(f"スキップ: {item.get('name')} (lastModifiedDateTime={lmts})")
    
            
    
    
            traverse_folder()  
            df =pd.DataFrame(metadata_rows)
    
            return df
    
    
    
    def merge_table(df, table_name):
        table_path = f"/lakehouse/default/Tables/{table_name}"
        storage_options = {
            "bearer_token": notebookutils.credentials.getToken('storage'),
            "use_fabric_endpoint": "true",
            "allow_unsafe_rename": "true"
        }
    
        try:
            dt = DeltaTable(table_path, storage_options=storage_options)
            is_delta_exists = True
        except Exception as e:
            print("⚠️ Delta Table not found. Creating new one. Detail:",str(e))
            is_delta_exists = False
        print(f"merge into {table_name}")
        if is_delta_exists:
            (dt.merge(
                source=df,
                predicate='target.file_id = source.file_id',
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update_all()
            .when_not_matched_insert_all()
            .execute())
        else:
            write_deltalake(
                table_path,
                df,
                mode='append',
                schema_mode='merge',
                engine='rust',
                storage_options=storage_options
                )
    
  3. 最初に設定した情報を使って、client をインスタンス化します。ここまでの設定がうまくいっていれば、セルが成功し、 Key Vault にも Access token のシークレットが追加されます

    python
    
    client = SharePointSiteDownloader(
        tenant_id=tenant_id,
        client_id=client_id,
        redirect_url=redirect_url,
        key_vault_url=key_vault_url,
        client_secret_name=client_secret_name,
        refresh_token_name=refresh_token_name,
        access_token_name=access_token_name,
        site_id=site_id
    )
    
  4. 取込対象のdrive を探索します

    python
    
    client.list_drives()
    
    

    image.png

  5. 取込対象のdrive_id をセットします。

    python
    drive_id=''
    
  6. 管理テーブルの最終更新日をベースに(管理テーブルがまだなければ全件)、取得処理を実行します。

    python
    
    import duckdb
    
    table_path = f"/lakehouse/default/Tables/{table_name}"
    
    if notebookutils.fs.exists(table_path):
        print("Get max last_modified_timestamp...")
        query = f"""
        select max(cast(last_modified_timestamp as datetime)) as max_last_modified_timestamp
        from delta_scan('{table_path}') 
        limit 1 """
    
        max_lmts = duckdb.sql(query).df()['max_last_modified_timestamp'].iloc[0]
        print(f"max last_modified_timestamp:{max_lmts}")
    else:
        print("Set max last_modified_timestamp is None")
        max_lmts = None
    
    result_df = client.get_and_download_all_items(
        drive_id=drive_id,
        prev_max_lmts=max_lmts
    )
    
    display(result_df)
    merge_table(result_df,table_name)
    

    image.png

  7. レイクハウスを確認すると、ファイルおよびテーブルへの反映ができていることがわかります。
    image.png
    SharePoint の状態
    image.png

以上、参考になれば幸いです。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?