1
1

More than 3 years have passed since last update.

(AWS の) Airflow DAG の小さいサンプル(S3 にファイルをアップロードや Redshift 操作)

Last updated at Posted at 2021-02-11

S3 へアップロード

Airflow から Python をつかって S3 にファイルをアップロードする小さいサンプルです。BashOperator も混ざっていいますが、S3 にあげる処理とは関係ありません、サンプルとしていおいてあるだけです。

import datetime as dt
import boto3
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
  'owner': 'me',
  'start_date': dt.datetime(2021, 2, 11),
  'retries': 1,
  'retrby_delay': dt.timedelta(minutes=5)
}

s3 = boto3.resource('s3')
def upload_file_to_s3():
  f = open("newFile.txt", "w")
  f.write("Wow")
  f.close()
  s3.Bucket("bucket_name").upload_file("newFile.txt", "dst.txt")

with DAG('Sample',
  default_args=default_args,
  schedule_interval='*/1 * * * *',
  ) as dag:

  touch_hoge = BashOperator(task_id='print_hello',
                              bash_command='echo hello')
  upload_to_s3 = PythonOperator(task_id='upload_to_s3',
                              python_callable=upload_file_to_s3)

touch_hoge >> upload_to_s3

AWS の Airflow を使っているので、Airflow のロールに S3 にアクセスできるポリシーを追加しておくことで、上記により S3 へのアップロードが成功します。ポリシーなどが足りないとしっかりと以下のエラーがでます。

boto3.exceptions.S3UploadFailedError: Failed to upload newFile.txt to bucket-name/dst.txt:
An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

cf. https://www.sicara.ai/blog/2019-01-28-automate-aws-tasks-thanks-to-airflow-hooks

Redshift 操作

クエリは何でもいいのですが、Redshift のクエリを叩くサンプルです。

import datetime as dt
import boto3
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
  'owner': 'me',
  'start_date': dt.datetime(2021, 2, 11),
  'retries': 1,
  'retrby_delay': dt.timedelta(minutes=5)
}

def insert_to_redshift():
  rsd = boto3.client('redshift-data')
  resp = rsd.execute_statement(
    ClusterIdentifier='your_cluster_identifier',
    Database='your_dev',
    DbUser='your_suser',
    Sql='insert into visitors (name) select name from users;'
  )
  print(resp)
  return "OK"

with DAG('copyRedshiftTable',
  default_args=default_args,
  schedule_interval='*/10 * * * *'
  ) as dag:

  insert_redshift = PythonOperator(task_id='insert_data',
                                    python_callable=insert_to_redshift)

insert_redshift

これもロールを適当に付与しておくと、無事動きました。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1