GCPにはCloud DLPという個人情報(っぽく見える情報)が含まれていないかを機械学習で自動的に検知するサービスがあります。
このサービスを使うと、BigQueryやCloud Storageなどに個人情報が格納されてないかをチェックできます。
GCPのwebコンソールからスキャンするとテーブル1つ1つ単位でしかチェックができません。
データセットの中身全てをチェックしようとすると煩雑だったために、データセット内の全てのテーブルをスキャンするスクリプトを作りました。
import google.cloud.dlp
from google.cloud import bigquery
def scan(project_id, dataset_name, table_name):
dlp = google.cloud.dlp_v2.DlpServiceClient()
# 以下を参考に検知したい情報を列挙する
# https://cloud.google.com/dlp/docs/infotypes-reference
info_types = [
"CREDIT_CARD_NUMBER",
"EMAIL_ADDRESS",
"GCP_CREDENTIALS",
"IMEI_HARDWARE_ID",
"IP_ADDRESS",
"MAC_ADDRESS_LOCAL",
"MAC_ADDRESS",
"PHONE_NUMBER",
"PASSPORT",
"ADVERTISING_ID",
"AGE",
"CREDIT_CARD_TRACK_NUMBER",
"DATE_OF_BIRTH",
"FEMALE_NAME",
"FIRST_NAME",
"GENDER",
"JAPAN_BANK_ACCOUNT",
"JAPAN_DRIVERS_LICENSE_NUMBER",
"JAPAN_INDIVIDUAL_NUMBER",
"JAPAN_PASSPORT",
"LAST_NAME",
"MALE_NAME",
"PERSON_NAME",
"STREET_ADDRESS",
]
info_types = [{"name": info_type} for info_type in info_types]
inspect_config = {
"info_types": info_types,
"min_likelihood": "POSSIBLE",
"limits": {},
"include_quote": True,
}
storage_config = {
"big_query_options": {
"table_reference": {
"project_id": project_id,
"dataset_id": dataset_name,
"table_id": table_name,
},
"sample_method": "RANDOM_START",
"rows_limit_percent": 1, # テーブル全体の何%をスキャンするかを指定。大きくしすぎると費用がかかるので注意。
}
}
# 発見した個人情報を格納するデーブルを指定
actions = [
{
"save_findings": {
"output_config": {
"table": {
"project_id": project_id,
"dataset_id": "dlp_result",
"table_id": f"{dataset_name}_{table_name}",
}
}
}
}
]
inspect_job = {
"inspect_config": inspect_config,
"storage_config": storage_config,
"actions": actions,
}
parent = f"projects/{project_id}/locations/global"
operation = dlp.create_dlp_job(
request={"parent": parent, "inspect_job": inspect_job}
)
print("Inspection operation started: {} {}.{}".format(operation.name, dataset_name, table_name))
project_id = 'プロジェクトIDを指定'
dataset_id = 'データセット名を指定'
sql = f"SELECT table_name FROM {project_id}.`region-us`.INFORMATION_SCHEMA.TABLES;"
bq_client = bigquery.Client()
for row in bq_client.query(sql):
table_name = row[0]
scan(project_id, dataset_name, table_name)