LoginSignup
0
1

More than 1 year has passed since last update.

DataFlowのパイプラインでCerberusの機能を使って、csvをチェックしてみた。

Last updated at Posted at 2022-11-08

はじめに

現在業務でGCPのDataFlowを使ってデータパイプラインを構築しており、CSVファイルがアプリケーションから来るのですが、たまにユーザが手で直したりなんてことがあるので、バリデーションして、不適当な行があれば省いて、後続のデータパイプラインに流したいと思っています。(上流の運用なおしたいのはやまやまですが。。。)
そこでCerberusというバリデーションライブラリがあったので、いっちょやってみようと思った次第です。

Cerberusとは

Pythonのデータ検証用のライブラリで
「Cerberusは、強力でありながらシンプルで軽量なデータ検証機能をすぐに提供し、簡単に拡張できるように設計されているため、カスタム検証が可能です。依存関係はなく、Python 2.7から3.8、PyPyおよびPyPy3まで徹底的にテストされています。」とのこと
(引用元https://docs.python-cerberus.org/en/stable/index.html)

簡単に機能と使い方を説明すると、データとチェック用のファイルを比較して、OK/NGを判断してくれます。
データをJson形式で用意します

data
{
   "id":"a1",
   "name":"YamadaTaro",
   "address":"Tokyo",
   "sex":"man",
   "age":35
}

またrequiredやnullableなどチェックしたい要素を記載した検証用のスキーマを用意します。

schema
{
    "id": {
        "type": "string",
        "required": true,
        "empty": false,
        "nullable": false
    },
    "name": {
        "type": "string",
        "required": true,
        "empty": false,
        "nullable": false
    },
    "address": {
        "type": "string",
        "required": true,
        "empty": false,
        "nullable": false
    },
    "sex": {
        "type": "string",
        "required": true,
        "empty": false,
        "nullable": false
    },
    "age": {
        "type": "integer",
        "required": true,
        "empty": false,
        "nullable": false
    }
}

Pythonで以下のようにコードを作成し、実行すると、結果が評価されます。

from cerberus import Validator

v = Validator(schema)
v.validate(data)
--> True

本題

前提

利用する外部ライブラリとそのバージョンを記載します。

Python = 3.10
ceruberus = 1.3.4
google-cloud-bigquery = 2.34.4
apache-beam = 2.41.0
google-cloud-storage = 2.5.0
pandas = 1.4.4

今回DataFlowとGCSとBigQueryを利用するので、それを実行できるサービスアカウントを準備してください。
また上記サービスのAPIが利用できることも確認してください。

パイプライン

今回構築するアーキテクチャはこんな感じを予定しています。
アーキテクチャ_dataflow.jpg

GCSから取得したcsvファイルを取得して、それをバリデーションチェックをかけて、OK・NGに振り分けます。
振り分けてOKなものはBigQueryへ、NGなものはGCSへ流していきたいです。

Step1 バケット作成

まずはGCSにバケットを作成します。
image.png

  • 取り込み予定のファイルを配置するCSVフォルダ
  • Schemaを配置するフォルダ

をそれぞれ作っておきます。

Step2 ファイル配置

csvファイルの配置します。
こちらはすでに正常な行と異常な行を含んだファイルとなっています。
これはCSVバケットの中に入れてください。

sample.csv
id,name,address,sex,age
a1,YamadaTaro,Tokyo,man,35
a2,TanakaIchiro,Tokyo,man,52
a3,SatoHanako,Osaka,woman,35
,KurodaJunko,Chiba,woman,22
b1,,Osaka,man,12
b2,OhtaJiro,,man,32
c3,NakataEiko,Hokkaido,,43
d1,AraiTakashi,Akita,man,77

空白になっている項目がありますね。

bigqueryにアップロードする際のスキーマファイルを配置します。
コードの中でも指定することができますが、見づらくなるので外部から指定します。
これはSchemaバケットの中に入れてください。

schema.json
{
	"fields": [{
			"name": "id",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "name",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "address",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "sex",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "age",
			"type": "STRING",
			"mode": "REQUIRED"
		}
	]
}

cerberusで使用するチェック用のスキーマファイルです。
これもSchemaバケットの中に入れてください。

validation.json
{
	"fields": [{
			"name": "id",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "name",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "address",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "sex",
			"type": "STRING",
			"mode": "REQUIRED"
		},
		{
			"name": "age",
			"type": "STRING",
			"mode": "REQUIRED"
		}
	]
}

Step2 実装

お手持ちのローカル開発環境を準備して、以下のようにディレクトリを作成してください。

.
└── dataflow
    ├── conf
    ├── csv
    ├── schema
    └── validation

ローカル環境にdataflow直下にコードを作成します。

dataflow_validation.py
import apache_beam as apachebeam

from google.cloud import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam import DoFn
from google.cloud import storage as gcs
from apache_beam.dataframe import convert
import cerberus
import configparser
import json
import pandas as pd

class DataFlowOptions():
    # DataFlowのオプションを指定
    def __init__(self, settings):
        self.options = PipelineOptions()
        self.options.view_as(GoogleCloudOptions).project = settings.project_id
        self.options.view_as(GoogleCloudOptions).job_name = settings.job_name
        self.options.view_as(GoogleCloudOptions).staging_location = settings.staging_location
        self.options.view_as(GoogleCloudOptions).temp_location = settings.temp_location
        self.options.view_as(GoogleCloudOptions).region = settings.region
        self.options.view_as(GoogleCloudOptions).service_account_email = settings.service_account_email
        self.options.view_as(WorkerOptions).autoscaling_algorithm = settings.autoscaling_algorithm 
        self.options.view_as(WorkerOptions).disk_size_gb = settings.disk_size_gb
        self.options.view_as(WorkerOptions).max_num_workers = settings.max_num_workers
        # self.options.view_as(GoogleCloudOptions).template_location = settings.template_location
        self.options.view_as(StandardOptions).runner = settings.runner 
        setup_option = self.options.view_as(SetupOptions)
        # 外部モジュールをが必要な場合、以下のパラメータが必要
        setup_option.requirements_file = settings.requirements_file 

class LoadSetting():
    # 設定ファイルの読み込み、変数に格納
    def __init__(self):
        inifile=configparser.SafeConfigParser()
        inifile.read('./conf/settings.ini')
        self.table_id = inifile['DEFAULT']['table_id']
        self.job_name = inifile['DEFAULT']['job_name']
        self.gs_output = inifile['DEFAULT']['gs_output_folder']
        self.project_id = inifile['DEFAULT']['project_id']
        self.service_account_email = inifile['DEFAULT']['service_account_email']
        self.gs_bucket = inifile['DEFAULT']['gs_dataflow_bucket']
        gs_staging = inifile['DEFAULT']['gs_staging_folder']
        gs_temp = inifile['DEFAULT']['gs_temp_folder']
        template_location = inifile['DEFAULT']['template_location']
        self.template_name = inifile['DEFAULT']['template_name']
        self.gs_csv_uri = inifile['DEFAULT']['gs_csv_uri']
        self.gs_schema_uri = inifile['DEFAULT']['gs_schema_uri']
        self.gs_error_uri = inifile['DEFAULT']['gs_error_uri']
        self.gs_validation_uri = inifile['DEFAULT']['gs_validation_uri']
        self.staging_location = 'gs://{}/{}'.format(self.gs_bucket, gs_staging)
        self.temp_location = 'gs://{}/{}'.format(self.gs_bucket, gs_temp)
        self.template_location = 'gs://{}/{}'.format(self.gs_bucket, template_location)
        self.disk_size_gb = int(inifile['DEFAULT']['disk_size_gb'])
        self.max_num_workers = int(inifile['DEFAULT']['max_num_workers'])
        self.runner = inifile['DEFAULT']['runner']
        self.region = inifile['DEFAULT']['region']
        self.autoscaling_algorithm = inifile['DEFAULT']['autoscaling_algorithm']
        self.requirements_file = inifile['DEFAULT']['requirements_file']

class ValidateDoFn(DoFn):
    # バリデーション用のFunction
    def process(self, element, validationschema):
        from cerberus import Validator
        import apache_beam as apachebeam
        v = Validator(validationschema)
        print(element)
        if v.validate(element):
            print("This Row is Valid!")
            yield apachebeam.pvalue.TaggedOutput('valid',element)
        else:
            print("This Row is Invalid!")
            yield apachebeam.pvalue.TaggedOutput('invalid',element)

class DataFlowClient():
    def __init__(self):
        # 設定ファイルを読み込み
        self.setting = LoadSetting()
        # パイプラインに必要なファイルを一括ダウンロード
        self.downloadFromGCS(self.setting)

    def downloadFromGCS(self,setting):
        gcsClient = gcs.Client(setting.project_id)
        bucketClient = gcsClient.get_bucket(setting.gs_bucket)
        print("Download csv file")
        # CSVファイルをダウンロード
        csvBlob = bucketClient.get_blob(setting.gs_csv_uri)
        csvBlob.download_to_filename(setting.gs_csv_uri)
        print("Completed to download csv file")
        print("Download Json File")
        # Jsonファイルをダウンロード
        jsonBlob = bucketClient.get_blob(setting.gs_schema_uri)
        jsonBlob.download_to_filename(setting.gs_schema_uri)
        print("Completed to download Json File")
        print("Download validation file")
        # バリデーション用のスキーマファイルをダウンロード
        validationBlob = bucketClient.get_blob(setting.gs_validation_uri)
        validationBlob.download_to_filename(setting.gs_validation_uri)
        print("Completed to download validation file")

    def jsonLoad(self,path):
        json_open = open('./'+ path, 'r')
        return json.load(json_open)


    def run(self):
        setting = self.setting
        # DataFlowのオプションを読み込み
        options = DataFlowOptions(setting)
        p = apachebeam.Pipeline(options=options.options)
        
        # DataFrameに格納
        df = pd.read_csv(setting.gs_csv_uri)
        # バリデーション用のスキーマをJson形式に変換
        validationSchema = self.jsonLoad(setting.gs_validation_uri)
        validate = ( 
            # DataFrameをPCollectionに変換
            convert.to_pcollection(df, pipeline = p )
            # Json形式に変換
            | 'To dictionaries' >> apachebeam.Map(lambda x: dict(x._asdict()))
            # バリデーションチェックを行う
            | 'Validate' >> apachebeam.ParDo(ValidateDoFn(), validationSchema).with_outputs()
        )
        
        tableSchema = self.jsonLoad(setting.gs_schema_uri)
        # バリデーションチェックをクリアした行はBigQueryにロード
        (validate.valid 
            | 'Output: WriteTable' >> apachebeam.io.gcp.bigquery.WriteToBigQuery(
                project = setting.project_id,
                table = setting.table_id,
                schema = tableSchema,
                create_disposition=apachebeam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

        outputGCS = 'gs://{}/{}'.format(setting.gs_bucket, setting.gs_error_uri)
        # バリデーションチェックをクリアできなかった行はGCSにアウトプット
        (validate.invalid 
            | 'Output: InvalidDataToGCS' >>  apachebeam.io.WriteToText(outputGCS)
        )
        print('Run pipeline')
        result = p.run()
        result.wait_until_finish()
        print('Pipeline successfully finished')


if __name__ == '__main__':
    # 設定ファイルのロード
    client = DataFlowClient()
    client.run()

パイプラインの説明を少し。
csvファイルを読み込んで、それをDataFrameに置き換え、パイプラインで弄れるようにPCollectionに変換してます。
それをJson形式に変換します。
そのあとValidation関数でOK、NGを判定しています。
OK、NGをyieldで行ごとに保持しておき、OK、NGで処理先をBigQueryとGCSを出力してます。

あと特徴的なのは
class ValidateDoFn(DoFn):の箇所だと思います。
というのも関数内にimport文を書いているところかなと思います。
公式文書では以下のように記載されています。
「When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times. As such, you should be cautious about including things like state dependency in your user code.」
(引用:https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms)

私のつたない英語読解力で申し訳ないのですが、分散処理をする過程で異なるマシンに関数をコピーするので、依存関係をdef内で完結する必要があるとのこと。

上記のクラスの関数内でimportを消してしまうと、name is not defineになってしまいます。

コードを書いたところで次にconf配下に設定ファイルを配置しましょう。
DataFlowを動かすための設定となっています。

settings
[DEFAULT]
table_id = <your_dataset>.<your_table_id>
job_name = dataflow-sample-jobname
gs_dataflow_bucket = <your_gcs_bucket>
gs_staging_folder = staging
gs_temp_folder = temp
gs_output_folder = output
gs_csv_uri = csv/sample.csv
gs_schema_uri = schema/schema.json
gs_error_uri = error/error.csv
gs_validation_uri = validation/validation.json
project_id = <your_project_id>
service_account_email = <your_service_account_email>
template_name = sample_template_name
region = us-east1
disk_size_gb = 25
max_num_workers = 2
runner = DataflowRunner
autoscaling_algorithm = THROUGHPUT_BASED
requirements_file = requirements.txt

以下の設定値については、各人の設定に書き換えてください。
他の設定についても変えていただいても良いですが、変えなくても動くはずです。

  • table_id
  • gs_dataflow_bucket
  • service_account_email
  • project_id
    (オプションを詳しく知りたい場合は https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options )
    またDataFlowではなく、ApacheBeamとしてローカル環境で動かしたい場合は、runnerをDirectRunnerに変更して実行することができます。

dataflowディレクトリに外部ライブラリダウンロード用のファイルを配置します。

requirements.txt
Cerberus==1.3.4

dataflowで外部ライブラリが必要な場合、こういった形で指定するとよしなにダウンロードしてくれます。

Step4 実行

復習ですが、ストレージに配置されているファイルを読み込んで、OKならBigQueryへ、NGならGCSへ行きます。
アーキテクチャ_dataflow.jpg
NGのファイルがあると、先ほど作成したバケットにerrorディレクトリが作成され、そこに出力されます。

では実行していきましょう。
image.png

問題なければPipeline successfully finishedと出力されているはずです。

DataFlowのサービスを表示をすると
image.png
ってな感じになっていたら大成功です。

BigQueryでクエリを実行すると
image.png
登録されていることを確認してください。

今後

Cerberusを使うことで柔軟にバリデーションができるかなと思います。
Cerberusはregixだったり桁数だったりかなり柔軟にチェックができるので、DataFlowと掛け合わせることで簡単ですが、データがそもそもおかしかったりするデータをはじくことができます。
今回はNGデータはGCSに投げましたが、NGデータについて変換処理をすることもできます。
またComposerと組み合わせることで、NGデータを修正するステップを組み合わせて、また取り込みさせるなんてことができます。

個人的にはDataFlowは外部ライブラリ使うことでより一層柔軟にデータ処理ができるんだなと思って、もっと好きになりました。

すこし説明が少ない部分があったかと思いますが、参考になればうれしいです。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1