LoginSignup
1
2

More than 1 year has passed since last update.

INFORMATION_SCHEMAを活用してテーブル/ビューの最終アクセス日を確認する

Last updated at Posted at 2022-07-28

BigQueryに作成されるテーブルは放置するとデータ保存必要がどんどん膨らむ。
そのため、テーブルの最終アクセス日を確認するクエリを作成。
プロジェクト横断して結果確認できるように組織権限が必要だったのと
データフレーム化のためにセッション権限が必要だったところがちょっと苦労した。

前提条件

to_dataframeを実行するときに、readsessions権限が必要。
次の1、2のいずれかで権限を付与しておく。

  1. 権限を個別指定する場合、次の3つを付与

    • bigquery.readsessions.create
    • bigquery.readsessions.getData
    • bigquery.readsessions.update
  2. 下記基本ロールのいずれかを付与

    • BigQuery 読み取りセッション ユーザー
    • BigQuery ユーザー
    • BigQuery 管理者

また、プロジェクトを横断してINFORMATION_SCHEMAを参照するため
組織権限で次の権限を付与したロールを作成し実行ユーザーに付与してもらう。

  • bigquery.datasets.get
  • bigquery.jobs.list
  • bigquery.jobs.listAll
  • bigquery.reservationAssignments.list
  • bigquery.routines.get
  • bigquery.routines.list
  • bigquery.tables.get
  • bigquery.tables.getData
  • bigquery.tables.list

必要なモジュールをインストール

$ pip install pandas db-dtypes google-cloud-bigquery

ソースコード

実行すると、プロジェクト横断したテーブル/ビューの最終アクセス日が確認できる。
注意点としては、2022年4月20日以降にスキーマまたはデータが変更されていないテーブルは
対象行が存在しないため、当該テーブル/ビューはnullになるケースがある

なお、ローカルから実行するときはBigQueryにアクセスする認証情報を承認するため
下記のコマンドを実行し、認証情報を取得しておくと良い。

$ gcloud auth application-default login
main.py
from google.cloud import bigquery

class SQL:
    
    @staticmethod
    def get_project_ids():
        _SQL = """
        SELECT
          DISTINCT project_id
        FROM
          `region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
        """
        return(_SQL)
    
    @staticmethod
    def get_information_schema(project_id_list:list):
       
        repeat_string_org = """
          SELECT
            table_catalog as project_id
            ,table_schema as dataset_id
            ,table_name as table_id
            ,table_type
            ,DATE(DATETIME_TRUNC(creation_time, DAY, '+9')) as created_date
          FROM
            `{}.region-asia-northeast1`.INFORMATION_SCHEMA.TABLES
        """
        repeat_string = ""
        id_length = len(project_id_list)
        if id_length == 1:
            repeat_string = repeat_string_org.format(project_id_list[0])
        elif id_length > 1:
            for i,project_id in enumerate(project_id_list):
                if i == 0:
                    repeat_string = repeat_string_org.format(project_id) + 'UNION DISTINCT'
                elif i == id_length - 1:
                    repeat_string += repeat_string_org.format(project_id)
                else:
                    repeat_string += repeat_string_org.format(project_id) + 'UNION DISTINCT'
        else:
            raise IndexError('project_id does not exist.' )

        _SQL = f"""
        WITH
        /*
        ネストされた配列[referenced_table]をフラット化
        */
          ORG AS
          (
          SELECT
            start_time
            ,referenced_table.project_id
            ,referenced_table.dataset_id
            ,referenced_table.table_id
          FROM
             `region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
          CROSS JOIN
            UNNEST(referenced_tables) as referenced_table
        /*
        CTASやINSERTなどの対象テーブルを取得
        ドキュメントは配列とあるが、間違っている?
        */
        UNION DISTINCT
          SELECT
            start_time
            ,destination_table.project_id
            ,destination_table.dataset_id
            ,destination_table.table_id
          FROM
             `region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
          )
        , TBL AS
        ( /* テーブルの作成日を取得 */
         {repeat_string}
        ),
        GET_MAX AS
        (
          SELECT
             project_id
            ,dataset_id
            ,table_id
            ,DATE(max(DATETIME_TRUNC(start_time, DAY, '+9'))) as referenced_date
          FROM
            ORG
          GROUP BY 1,2,3
        )
        
        SELECT
            TBL.project_id
            ,TBL.dataset_id
            ,TBL.table_id
            ,TBL.table_type
            ,TBL.created_date
            ,GET_MAX.referenced_date
            ,DATE_DIFF(GET_MAX.referenced_date,TBL.created_date, DAY) as diff_date --参照されてないときはnull
        FROM
          TBL
        LEFT JOIN
          GET_MAX
        USING
          (project_id,dataset_id,table_id)        
        """
        return(_SQL)

class BQ:

    def __init__(self):
        self.__client = bigquery.Client()

    @property
    def client(self):
        return(self.__client)

    def query_execute(self, SQL:str, get_dateframe:bool=False):
        """
        Get Query Result

        Parameters
        ----------
        SQL : string
        get_dataframe : bool. if True, you get DataFrame, else, you get RowIterator

        Returns
        -------
        google.cloud.bigquery.table.RowIterator
        """
        if not get_dateframe:
            rows = self.client.query(SQL).result()
        else:
            rows = self.client.query(SQL).to_dataframe()
        return(rows)

def main():
    bq = BQ()
    
    # Get Project ID List
    res = bq.query_execute(SQL.get_project_ids())
    row_list = []
    for row in res:
        row_list.extend([row.project_id])
    # print(row_list)    
    # Show SQL
    # print(SQL.get_information_schema(row_list))
    # Get Result
    df = bq.query_execute(SQL.get_information_schema(row_list), True)
    print(df) # デーフレーム化せずに、BigQuery内にテーブル作ってもOK
    
    
if __name__=='main':
    main()    

参考

Pythonライブラリの使用
BigQuery IAMによるアクセス制御
GitHub googleapis/python-bigquery
Python Client for Google BigQuery
ぽ靴な缶

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2