はじめに
CloudComposerを利用せずにGCE内のDockerにてAirflowを運用しております。
BigQueryをAirflowで制御する必要があり、接続に少し苦労しましたので書き残しておきます。
BigQueryOperatorで制御する方法はよくあるのですが、
PythonOperatorでGoogleSDKを使ってBigQueryを制御する方法はあまりありませんでした。
(それぞれのOperatorで制御しろよって話ですが、Pythonで自由に作りたい、、、)
TL;DR
<エラー内容>
client = bigquery.Client()
tables = client.list_tables(dataset_id)
↓ エラー出力
google.api_core.exceptions.Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/[project]/datasets/[dataset]/tables?prettyPrint=false: Request had insufficient authentication scopes.
<はまった原因>
単純にサービスアカウントを読み込んでいなかっただけです。
<解消方法>
サービス アカウント キー ファイルを使用した認証に記載の内容を実施するだけです。
Detail
1. サービスアカウントキーの発行
サービスアカウントキーを発行します。
発行手順は他のサイトを参照下さい。
2. 出力されたjsonをディレクトリに配置します。
インスタンスのdockerとリンクしているディレクトリにjsonファイルを配置します。
リンクしているディレクトリは、docker-compose.yamlのvolumesになります。
Docker内のAirflowのリンクディレクトリについては、設定にもよりますがairflow.cfgの中に記載があると思います。
cfgを読むのが面倒くさい、、、なんか長い、、って方は、実際にdocker内に入って確認すると良いと思います。
docker ps
・・・ Docker名とかが出てくる
docker exec -it [web_serverのDocker名] bash
初期インストール方法は、別の方の記事を参考いただければと思います。
3. 実際にコードを書いてみる
普通にサービス アカウント キー ファイルを使用した認証を参考としただけですが、マスクした形で実際のコードを載せておきます。
oO(テストコードですし自分のためにも)
内容は、データセットのテーブル一覧の出力です。
認証の部分は、credentialsに設定するservice_accountにて、key.jsonを呼び出して設定しております。
その内容をbigquery.Clientの引数にて渡しております。
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta, timezone
import pendulum
from google.oauth2 import service_account
from google.cloud import bigquery
JST = timezone(timedelta(hours=+9), 'JST')
batch_date = (datetime.now(JST)).strftime('%Y%m%d')
project = "projectName"
detaset = "datasetName"
credentials = service_account.Credentials.from_service_account_file(
"/opt/airflow/credential/key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
def start(**kwargs):
print("start job")
print(batch_date)
def getTables(dataset_id):
datasetId = project + "." + detaset
print(datasetId) # projectName.datasetName
client = bigquery.Client(credentials=credentials,
project=credentials.project_id,)
tables = client.list_tables(datasetId)
for table in tables:
print("{}.{}.{}".format(table.project, table.dataset_id, table.table_id))
## projectName.datasetName.tableName
with DAG(
'viewTables',
catchup=False,
start_date=datetime(2021, 4, 25, 00, 00,
tzinfo=pendulum.timezone('Asia/Tokyo')),
schedule_interval=timedelta(minutes=60),
default_args={'owner': 'airflow'},
tags=['tag'],
) as dag:
startTask = PythonOperator(
task_id='start',
provide_context=True,
python_callable=start,
)
getTablesTask = PythonOperator(
task_id='get_tables',
provide_context=True,
python_callable=getTables,
)
startTask >> getTablesTask
4. ついでにBigQueryOperaterでも使えるようにしておく
PythonOperatorとは関係ないですが、BigQueryOperaterを使う場合もkey情報は必要です。
Cloud Composerは、この辺を自動でやってくれる?と思いますが、自前で立ち上げた場合は、UI上で以下の設定が必要です。
bigquery_defaultは変更可能ですが、Cloud Composerでの標準の名前でしょうか??
↓
このように利用します。
t2 = BigQueryOperator(
task_id='copy_table',
sql="""
select * from `[source_projectName].[source_datasetName].[source_tableName]`
""",
destination_dataset_table='dest_projectName.dest_databaseName.dest_tableName',
use_legacy_sql=False,
bigquery_conn_id='bigquery_default',
dag=dag,
)
最後に
airflowは色んな所で苦労します。
Cloud Composerのマネージドにすればこの辺が解消されるのでしょうか??