LoginSignup
2
2

More than 1 year has passed since last update.

[Airflow]PythonOperatorでBigQueryへの接続

Posted at

はじめに

CloudComposerを利用せずにGCE内のDockerにてAirflowを運用しております。
BigQueryをAirflowで制御する必要があり、接続に少し苦労しましたので書き残しておきます。

BigQueryOperatorで制御する方法はよくあるのですが、
PythonOperatorでGoogleSDKを使ってBigQueryを制御する方法はあまりありませんでした。
(それぞれのOperatorで制御しろよって話ですが、Pythonで自由に作りたい、、、)

TL;DR

<エラー内容>

client = bigquery.Client()
tables = client.list_tables(dataset_id)

↓ エラー出力

google.api_core.exceptions.Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/[project]/datasets/[dataset]/tables?prettyPrint=false: Request had insufficient authentication scopes.

<はまった原因>

単純にサービスアカウントを読み込んでいなかっただけです。

<解消方法>

サービス アカウント キー ファイルを使用した認証に記載の内容を実施するだけです。

Detail

1. サービスアカウントキーの発行

サービスアカウントキーを発行します。
発行手順は他のサイトを参照下さい。

2. 出力されたjsonをディレクトリに配置します。

インスタンスのdockerとリンクしているディレクトリにjsonファイルを配置します。

リンクしているディレクトリは、docker-compose.yamlのvolumesになります。

Docker内のAirflowのリンクディレクトリについては、設定にもよりますがairflow.cfgの中に記載があると思います。
cfgを読むのが面倒くさい、、、なんか長い、、って方は、実際にdocker内に入って確認すると良いと思います。

docker ps
・・・ Docker名とかが出てくる
docker exec -it [web_serverのDocker名] bash

初期インストール方法は、別の方の記事を参考いただければと思います。

3. 実際にコードを書いてみる

普通にサービス アカウント キー ファイルを使用した認証を参考としただけですが、マスクした形で実際のコードを載せておきます。
oO(テストコードですし自分のためにも)

内容は、データセットのテーブル一覧の出力です。
認証の部分は、credentialsに設定するservice_accountにて、key.jsonを呼び出して設定しております。
その内容をbigquery.Clientの引数にて渡しております。

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta, timezone
import pendulum

from google.oauth2 import service_account
from google.cloud import bigquery


JST = timezone(timedelta(hours=+9), 'JST')
batch_date = (datetime.now(JST)).strftime('%Y%m%d')
project = "projectName"
detaset = "datasetName"
credentials = service_account.Credentials.from_service_account_file(
    "/opt/airflow/credential/key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],
)


def start(**kwargs):
    print("start job")
    print(batch_date)


def getTables(dataset_id):
    datasetId = project + "." + detaset
    print(datasetId)  # projectName.datasetName
    client = bigquery.Client(credentials=credentials,
                             project=credentials.project_id,)
    tables = client.list_tables(datasetId)

    for table in tables:
        print("{}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        ## projectName.datasetName.tableName


with DAG(
    'viewTables',
    catchup=False,
    start_date=datetime(2021, 4, 25, 00, 00,
                        tzinfo=pendulum.timezone('Asia/Tokyo')),
    schedule_interval=timedelta(minutes=60),
    default_args={'owner': 'airflow'},
    tags=['tag'],
) as dag:

    startTask = PythonOperator(
        task_id='start',
        provide_context=True,
        python_callable=start,
    )

    getTablesTask = PythonOperator(
        task_id='get_tables',
        provide_context=True,
        python_callable=getTables,
    )

startTask >> getTablesTask

4. ついでにBigQueryOperaterでも使えるようにしておく

PythonOperatorとは関係ないですが、BigQueryOperaterを使う場合もkey情報は必要です。
Cloud Composerは、この辺を自動でやってくれる?と思いますが、自前で立ち上げた場合は、UI上で以下の設定が必要です。
bigquery_defaultは変更可能ですが、Cloud Composerでの標準の名前でしょうか??:confused:
DAGs_-_Airflow.png

Edit_Connection_-_Airflow.png

このように利用します。

t2 = BigQueryOperator(
    task_id='copy_table',
    sql="""
    select * from `[source_projectName].[source_datasetName].[source_tableName]`
    """,
    destination_dataset_table='dest_projectName.dest_databaseName.dest_tableName',
    use_legacy_sql=False,
    bigquery_conn_id='bigquery_default',
    dag=dag,
)

最後に

airflowは色んな所で苦労します。
Cloud Composerのマネージドにすればこの辺が解消されるのでしょうか??:neutral_face:

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2