S3 へアップロード
Airflow から Python をつかって S3 にファイルをアップロードする小さいサンプルです。BashOperator も混ざっていいますが、S3 にあげる処理とは関係ありません、サンプルとしていおいてあるだけです。
import datetime as dt
import boto3
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'me',
'start_date': dt.datetime(2021, 2, 11),
'retries': 1,
'retrby_delay': dt.timedelta(minutes=5)
}
s3 = boto3.resource('s3')
def upload_file_to_s3():
f = open("newFile.txt", "w")
f.write("Wow")
f.close()
s3.Bucket("bucket_name").upload_file("newFile.txt", "dst.txt")
with DAG('Sample',
default_args=default_args,
schedule_interval='*/1 * * * *',
) as dag:
touch_hoge = BashOperator(task_id='print_hello',
bash_command='echo hello')
upload_to_s3 = PythonOperator(task_id='upload_to_s3',
python_callable=upload_file_to_s3)
touch_hoge >> upload_to_s3
AWS の Airflow を使っているので、Airflow のロールに S3 にアクセスできるポリシーを追加しておくことで、上記により S3 へのアップロードが成功します。ポリシーなどが足りないとしっかりと以下のエラーがでます。
boto3.exceptions.S3UploadFailedError: Failed to upload newFile.txt to bucket-name/dst.txt:
An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
cf. https://www.sicara.ai/blog/2019-01-28-automate-aws-tasks-thanks-to-airflow-hooks
Redshift 操作
クエリは何でもいいのですが、Redshift のクエリを叩くサンプルです。
import datetime as dt
import boto3
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'me',
'start_date': dt.datetime(2021, 2, 11),
'retries': 1,
'retrby_delay': dt.timedelta(minutes=5)
}
def insert_to_redshift():
rsd = boto3.client('redshift-data')
resp = rsd.execute_statement(
ClusterIdentifier='your_cluster_identifier',
Database='your_dev',
DbUser='your_suser',
Sql='insert into visitors (name) select name from users;'
)
print(resp)
return "OK"
with DAG('copyRedshiftTable',
default_args=default_args,
schedule_interval='*/10 * * * *'
) as dag:
insert_redshift = PythonOperator(task_id='insert_data',
python_callable=insert_to_redshift)
insert_redshift
これもロールを適当に付与しておくと、無事動きました。