はじめに
Autonomous Databaseでは、Delta Shareプロトコルを使ったデータ共有ができます。
Delta ShareはオープンソースのAPIなので、サポートされている任意の受信者(他のデータベース、データ分析ツール、オープンソース・プラットフォーム)とデータをセキュアに共有できます。
今回は、Autonomous Databaseを共有のプロバイダとして、Autonomous Database内の表を共有し、Pythonのpandasで読み込んでみたいと思います。
データ共有は、Autonomous Databaseの付属ツールであるDatabase Actionsから実行できます。
※コマンドでも同様に実行できます。
データ共有へのアクセス
ADMINユーザーで以下を実行することで、特定のユーザーがデータ共有へアクセスすることができます。
begin
DBMS_SHARE.ENABLE_SCHEMA(
SCHEMA_NAME => '<ユーザー名>',
ENABLED => TRUE
);
commit;
end;
共有の作成
-
上記で有効化されたユーザーでDatabase Actionsにサインインすると、以下のようにData Studio内に
データ共有
が表示されます。
-
クリックすると、
共有の提供
と共有の消費
が選択できます。今回はAutonomous Databaseから表データを共有するので、共有の提供
をクリックします。
-
以下のように各項目入力していきます。入力項目については、こちらをご参照ください。
-
資格証明の作成ができたので、クラウド・ストアを追加します。先ほど作成した資格証明が選択されていることを確認し、バケットURIを指定します。
namespace
やbucket-name
は適宜修正してください。
-
DEMO_OBJ_STORE
というOCIネイティブ資格証明を作成できたので、それを指定し、次
をクリックします。なお、今回はスケジューリングは有効化しませんが、毎n分/毎n時間/毎n日/毎n週や開始日/終了日を指定しスケジューリングも可能です。
-
共有する表を選択します。
使用可能な表
には、demo
スキーマの所有している表が表示されています。今回は、MOVIE_SALES_2020Q2
という表を共有します。
-
作成すると、
SAMPLE
ユーザーに受信権限が与えられます。ただし、新規受信者はアクティベーションする必要があります。メールアイコンをクリックすると、メーラーが起動し、受信者のアドレスにプロファイルのダウンロード・リンクを含むメールを送信することができます。
これで共有の作成は完了です。
続いて、受信者側として作業を行います。
共有の消費
-
Get Profile Information
をクリックすると、delta_share_profile.jsonファイルがダウンロードできます。 -
delta_share_profile.jsonを確認してみます。以下のように、エンドポイントと、ベアラートークンなどの情報が格納されています。
{ "shareCredentialsVersion": 1, "endpoint": "https://xxxxx.adb.ap-tokyo-1.oraclecloudapps.com/ords/demo/_delta_sharing", "tokenEndpoint": "https://xxxxx.adb.ap-tokyo-1.oraclecloudapps.com/ords/demo/oauth/token", "bearerToken": "aPFOYWgJz75f5_cLwZcpgw", "expirationTime": "2023-08-31T07:52:52.0Z", "clientID": "jONlE9VrRibYH0gDCILPTg..", "clientSecret": "WUwt2vsZyWUzOclt6Ea4mQ.." }
-
プロファイルを取得できたので、共有されている表の情報を取得したいと思います。今回はJupyter Notebookでpandasからロードできるか確認してみます。delta-sharingはpythonパッケージとして提供されており、pip経由でインストールできます。
pip3 install delta-sharing
-
以下のPythonプログラムをJupyter Notebook上で実行します。
delta_read.pyimport requests import os import delta_sharing import json import shutil import datetime import pandas as pd from datetime import datetime, timedelta pd.options.display.width = 100 def update_share_profile_token(profile_file, create_backup=True): # プロファイルの読み込み f = open(profile_file) data = json.load(f) f.close() # トークンのアップデート token_endpoint = data['tokenEndpoint'] client_id = data['clientID'] client_secret = data['clientSecret'] if token_endpoint is None or client_id is None or client_secret is None: # 何もしない return # 新しいトークンの取得 response = requests.post( token_endpoint, data={"grant_type": "client_credentials"}, auth=(client_id, client_secret)) response_json = response.json() print(response_json) if response_json["access_token"] is None: # アップデートしない return data['bearerToken']=response_json["access_token"] if response_json["expires_in"] is not None: # 有効期限のアップデート time_now = datetime.utcnow() expiration_time = time_now + timedelta(0,response_json["expires_in"]) # デルタ・タイム・フォーマット data["expirationTime"] = expiration_time.strftime("%Y-%m-%dT%H:%M:%S.0Z") if create_backup: # プロファイル・ファイルのバックアップ作成 shutil.copy(profile_file, profile_file+".bk") json_object = json.dumps(data, indent=2) with open(profile_file, "w") as outfile: outfile.write(json_object) # プロファイル・ファイルの指定 profile_file = "/home/opc/python/delta_share_profile.json" update_share_profile_token(profile_file) # 共有クライアントの作成 client = delta_sharing.SharingClient(profile_file) # すべての共有のリスト shares = client.list_shares() print(shares) # 最初の共有のスキーマの表示 schemas = client.list_schemas(shares[0]) print(schemas) # 最初の共有のテーブルの表示 tables = client.list_tables(schemas[0]) print(tables) ## pandasで10件だけロード for tab in tables: delta_path = profile_file + '#' + tab.share + '.' + tab.schema + '.' + tab.name df = delta_sharing.load_as_pandas(delta_path, limit=10) print(df)
-
Database Actions側で、共有を非公開にしてみます。3点リーダーをクリックし、
非公開
をクリックします。
この状態で、受信者側で先ほどと同様にdelta_read.pyを実行してみます。
以下のようにlist_schemasの部分でIndexErrorになりました。
おわりに
今回はAutonomous Database内の表を、pandasからロードするという内容でしたが、他にもSparkで読み込んだり、他データベース、BIツールから読み込むこともできます。