#バッチ処理問題・・・
###Equalumの紹介・検証を行っていると、「バッチ処理・・」という質問・確認を頂くケースが増えており、そろそろ「それなりの提案」をしないといけないな・・と覚悟を決め始めている今日この頃なので、今回はその辺にフォーカスした検証等を実施してみたいと思います。
#Equalum環境でのバッチ処理について
Equalumの強みは、圧倒的に高い即時性と低遅延を優れたCDC技術や、最先端の実績あるクラウド系技術と有機的に組み合わせる事により、極めてハイスピードでノーコード設定されているFLOW処理を行いながら、指定されたターゲットのデータベース上にストリーミング処理を行える点に有ります。
また、ストリーミングの過程のタイミングを活かして、結構な種類のETL的FLOW処理をノーコードで設定出来ますので、かなりの領域までEqualumだけでデータ利活用をサポート出来るのでは?と考えております・・・
#でも! But!・・しかし!!!!
オリジナル側のデータベースの変化が、即時に同期されてし状況がターゲットデータベースに反映されても、存在しないデータを勝手に作る事はできませんので、集計系を中心に「ストリーミングだけでは可能にならない」作業も依然として存在することも事実です。
ですので、これらの「貯めてドン!系作業」をターゲットのデータシステム上で上手く実行する方法・・・を何とか出来ないかという発想と、Python環境で一般的に利用可能な「各種のデータ関連モジュール」もついでに組み込めれば助かる!という事で、今回のPrefect検証の実施に至りました。
そこで今回は、「ストリーミング後の利活用側データベース」上で効率良く「データ移動の為のバッチ処理」ではなく、「王道のデータ処理の為のバッチ(的)処理」環境構築・・という感じで検証を進めて行きたいと思います。
#Prefectなる謎のソリューション・・・??
色々とGoogle先生にお伺いを立てていたところ、PrefectなるPython環境が有る!という事をご提案頂き、今回はその線でPrefectの検証を行ってみたいと思います。
彼らのソリューションは、ホームページから頂くと・・・・
イノベーションのためのエンジン
と書かれており、概要的にはPrefectのPythonライブラリを何時もの様に環境に導入し、Pythonワールドでワークフローをアプリケーションレベルで設計、構築、テスト、実行するための全ての必要なモノが提供されている・・・・との事です。(素晴らしい!)
また、専用のUI環境も有るとの事なので、稼働後の維持管理も楽そうな感じなので・・・・非常に期待度大!なソリューションだと言えるでしょう。
#では、先ずは環境の導入から・・・・
今回は、オンプレミス環境でのお試しを想定していますので、彼らのオープンソース環境をローカルに導入する事にします。
トップページのPRODUCTからGET THE CODEを選択します。
左側のGET THE CODEボタンを選択します。
無事にGitHubへ移動できました。
一応GitHubが確認できましたので、Prefectの環境を実際に導入したいと思います。
お約束のPiP更新から・・
% pip install --upgrade pip
Prefectのインストール
% pip install prefect
因みに事務所のWindows環境で、ユーザ云々のエラーが出ましたので、
$ pip install --user prefect
のオプション付きを使って導入しました。Macの環境は最初のパターンで導入出来ると思います。
こんな感じで導入が始まります。
無事にインストールが終わったら、導入されたバージョンを確認してみます。
因みに、その他の導入方法としては・・・・
% conda install -c conda-forge prefect
% pipenv install --pre prefect
が有る様なので、環境に合わせて選択が可能です。
% prefect version
この時点では0.15.13が入ってきました。
無事に導入出来た様なので、次は彼らの概要にも書かれているUI環境を確認してみたいと思います。
#UI環境を確認してみる・・・・
彼らのドキュメントによれば、通常導入の場合デフォルトが彼らのクラウド環境を、利用の際のバックエンドとして使用するようになっている様なので、コマンドを使ってPrefect Serverをローカルで動く様に構成します。
% prefect backend server
クラウドにする場合は
% prefect backend cloud
次に、Prefectのサーバを起動します。
% prefect server start
必要な環境構築が自動的に始まります。
暫くすると自動的にサーバがローカルで立ち上がりますので、**http://localhost:8080**をアクセスしてUIを見てみます。
まっさらなダッシュボードが出てきました。
#エージェントなるものを走らせてみる・・・・
先程のダッシュボードに表示されている中央下段のロボットアイコンのagentが、現状起動されていない!という赤色表示ですので、コマンドを使って1個ローカルにエージェントを起動させてみます。
先程のサーバを起動したターミナルとは別のターミナルを起動し以下のコマンドを使います。
% prefect agent local start
非常にシンプルで解り易いコマンドを発行すると・・・・・
サクッと!エージェントが立ち上がってきました。
#では!お約束のHello World!を走らせてみる・・・・・
では、今回の最後の検証として「お約束のHello World!」を走らせてみたいと思います。
ソースコードは、彼らのドキュメントから使わせて頂きます。
import prefect
from prefect import task, Flow
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("Hello world!")
with Flow("hello-flow") as flow:
hello_task()
flow.run()
#今回のまとめ
今回は、検証環境としてのPrefect導入と、彼らの公式ドキュメントに出ているHello Worldを動かしてみました。次回以降では、もう少し処理ステップを複雑・高度化し、既存のPython環境とのコラボレーションに関する可能性を深掘りしてみたいと思います。
狙いとしては、Equalumでオリジナル側から即時・低遅延同期で展開されるターゲット側のテーブルを、Python環境をフルに活用しての「バッチ処理的」仕組みの検証まで行ければと考えております。
なお、下記の先輩記事も参考にさせて頂きました。この場をお借りして御礼申し上げます。