BigQueryに作成されるテーブルは放置するとデータ保存必要がどんどん膨らむ。
そのため、テーブルの最終アクセス日を確認するクエリを作成。
プロジェクト横断して結果確認できるように組織権限が必要だったのと
データフレーム化のためにセッション権限が必要だったところがちょっと苦労した。
前提条件
to_dataframeを実行するときに、readsessions権限が必要。
次の1、2のいずれかで権限を付与しておく。
-
権限を個別指定する場合、次の3つを付与
- bigquery.readsessions.create
- bigquery.readsessions.getData
- bigquery.readsessions.update
-
下記基本ロールのいずれかを付与
- BigQuery 読み取りセッション ユーザー
- BigQuery ユーザー
- BigQuery 管理者
また、プロジェクトを横断してINFORMATION_SCHEMAを参照するため
組織権限で次の権限を付与したロールを作成し実行ユーザーに付与してもらう。
- bigquery.datasets.get
- bigquery.jobs.list
- bigquery.jobs.listAll
- bigquery.reservationAssignments.list
- bigquery.routines.get
- bigquery.routines.list
- bigquery.tables.get
- bigquery.tables.getData
- bigquery.tables.list
必要なモジュールをインストール
$ pip install pandas db-dtypes google-cloud-bigquery
ソースコード
実行すると、プロジェクト横断したテーブル/ビューの最終アクセス日が確認できる。
注意点としては、2022年4月20日以降にスキーマまたはデータが変更されていないテーブルは
対象行が存在しないため、当該テーブル/ビューはnullになるケースがある
なお、ローカルから実行するときはBigQueryにアクセスする認証情報を承認するため
下記のコマンドを実行し、認証情報を取得しておくと良い。
$ gcloud auth application-default login
from google.cloud import bigquery
class SQL:
@staticmethod
def get_project_ids():
_SQL = """
SELECT
DISTINCT project_id
FROM
`region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
"""
return(_SQL)
@staticmethod
def get_information_schema(project_id_list:list):
repeat_string_org = """
SELECT
table_catalog as project_id
,table_schema as dataset_id
,table_name as table_id
,table_type
,DATE(DATETIME_TRUNC(creation_time, DAY, '+9')) as created_date
FROM
`{}.region-asia-northeast1`.INFORMATION_SCHEMA.TABLES
"""
repeat_string = ""
id_length = len(project_id_list)
if id_length == 1:
repeat_string = repeat_string_org.format(project_id_list[0])
elif id_length > 1:
for i,project_id in enumerate(project_id_list):
if i == 0:
repeat_string = repeat_string_org.format(project_id) + 'UNION DISTINCT'
elif i == id_length - 1:
repeat_string += repeat_string_org.format(project_id)
else:
repeat_string += repeat_string_org.format(project_id) + 'UNION DISTINCT'
else:
raise IndexError('project_id does not exist.' )
_SQL = f"""
WITH
/*
ネストされた配列[referenced_table]をフラット化
*/
ORG AS
(
SELECT
start_time
,referenced_table.project_id
,referenced_table.dataset_id
,referenced_table.table_id
FROM
`region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
CROSS JOIN
UNNEST(referenced_tables) as referenced_table
/*
CTASやINSERTなどの対象テーブルを取得
ドキュメントは配列とあるが、間違っている?
*/
UNION DISTINCT
SELECT
start_time
,destination_table.project_id
,destination_table.dataset_id
,destination_table.table_id
FROM
`region-asia-northeast1`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
)
, TBL AS
( /* テーブルの作成日を取得 */
{repeat_string}
),
GET_MAX AS
(
SELECT
project_id
,dataset_id
,table_id
,DATE(max(DATETIME_TRUNC(start_time, DAY, '+9'))) as referenced_date
FROM
ORG
GROUP BY 1,2,3
)
SELECT
TBL.project_id
,TBL.dataset_id
,TBL.table_id
,TBL.table_type
,TBL.created_date
,GET_MAX.referenced_date
,DATE_DIFF(GET_MAX.referenced_date,TBL.created_date, DAY) as diff_date --参照されてないときはnull
FROM
TBL
LEFT JOIN
GET_MAX
USING
(project_id,dataset_id,table_id)
"""
return(_SQL)
class BQ:
def __init__(self):
self.__client = bigquery.Client()
@property
def client(self):
return(self.__client)
def query_execute(self, SQL:str, get_dateframe:bool=False):
"""
Get Query Result
Parameters
----------
SQL : string
get_dataframe : bool. if True, you get DataFrame, else, you get RowIterator
Returns
-------
google.cloud.bigquery.table.RowIterator
"""
if not get_dateframe:
rows = self.client.query(SQL).result()
else:
rows = self.client.query(SQL).to_dataframe()
return(rows)
def main():
bq = BQ()
# Get Project ID List
res = bq.query_execute(SQL.get_project_ids())
row_list = []
for row in res:
row_list.extend([row.project_id])
# print(row_list)
# Show SQL
# print(SQL.get_information_schema(row_list))
# Get Result
df = bq.query_execute(SQL.get_information_schema(row_list), True)
print(df) # デーフレーム化せずに、BigQuery内にテーブル作ってもOK
if __name__=='main':
main()
参考
Pythonライブラリの使用
BigQuery IAMによるアクセス制御
GitHub googleapis/python-bigquery
Python Client for Google BigQuery
ぽ靴な缶