Apache Beamを使ったJavaのバッチ処理プログラムの稼働環境をGCP(Google Cloud Platform)からOCI(Oracle Cloud Infrastructure)へ移行したので記事にしてみました。
Apache Beam
Apache Beam(以降Beam)は多様な入力元から読み込んだデータを何らかの処理をし多様な出力先に出力するPipelineを構成し実行するものです(雑な説明です)。
PCollectionと呼ばれる集合を入出力としPTransformという単純な処理をいろいろな形で連結しPipelineのチェーンを構成することで複雑な処理をすることも可能となります。
Beamを使うことで大量のデータを扱えるようになる他にBeam依存の書き方をすることで処理方式を統一できるメリットがあります。
ストリーミング対応やJava以外の言語もサポートしていることに関する説明は割愛します。
Dataflowとデータ・フロー
Beamの実行エンジンとしてはGCPのDataflowやApache Flink、Apache Spark(以降Spark)などから選択できます。OCIのデータ・フローはSparkを基盤としたデータ処理サービスなのでBeamを実行することが可能です。
DataflowとSparkの違い
SparkではDriverが複数のExecutorにジョブを割り当てて処理するのに対し、Dataflowでは自動スケーリングをサポートしたワーカーがスケーリングしながら全てのワーカー上でジョブを処理していきます。
Dataflowもデータ・フローも読み込むデータが増えたり処理が多くなってメモリやCPUが逼迫してくると自動的にスケーリングします。Dataflowでは処理中にワーカーが増減し継続的に処理しますがデータ・フローの場合はスケールすると最初からデータを読み直しているようでちょっと劣っている感じがあります。
バッチ処理プログラムの移行
処理内容
移行したプログラムの処理内容ですが、クラウドストレージ上のファイルを読み込んで何らかの処理をしながらデータベースに登録していくといったものです。この反対の流れの処理も行います。
稼働環境の移行に伴い、実行エンジンはDataflowからデータ・フローへ、RDBMSはCloud SQLからAutonomous Databaseへ、クラウドストレージはCloud Storageからオブジェクト・ストレージへ変更しました。
実行エンジンの変更
Beamでは実行エンジンごとにrunnerが提供されています。DataflowRunnerからSparkRunnerに変更することで実行エンジンを変更できます。本バッチ処理プログラムではローカルで実行することも考慮してrunnerを起動パラメータで指定していたのでSparkへの移行はpom.xmlへの追加と起動パラメータ変更で済みました。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark-3</artifactId>
<version>2.52.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.4.1</version>
</dependency>
$ java -jar ... --runner=DataflowRunner
$ java -jar ... --runner=SparkRunner
RDBMSの変更
Cloud SQLではPostgreSQLを使っていました。Autonomous Database(Oracle 19c)ではサポートされていないLIMITなどのSQLを書き換えました。
... LIMIT 10
... OFFSET 0 ROWS FETCH FIRST 10 ROWS ONLY
クラウドストレージの変更
BeamのFileIOクラスはローカルストレージやGCS、HDFSをサポートしています。OCIのオブジェクト・ストレージはサポートしていないのでプログラムの変更が必要でした。
HDFSコネクタを使うことでOCIのオブジェクト・ストレージをHDFSとして読み書きできるようになります。
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-hdfs-connector</artifactId>
<version>3.3.4.1.2.0</version>
</dependency>
以下のようなcore-site.xmlでHDFSの各パラーメターを設定しました。
Oracle社からは「fs.oci.client.auth.pemFileString」ではなく「fs.oci.client.auth.pemfilepath」を使用することが推奨されていますがデータ・フローのExecutorにpemfileを配置することはできないのでここにpemfileの内容を設定するしかないのでは。
<configuration>
<property>
<name>fs.oci.client.hostname</name>
<value>https://objectstorage.ap-tokyo-1.oraclecloud.com</value>
</property>
<property>
<name>fs.oci.client.auth.tenantId</name>
<value>ocid1.tenancy.oc1.*****</value>
</property>
<property>
<name>fs.oci.client.auth.userId</name>
<value>ocid1.user.oc1.*****</value>
</property>
<property>
<name>fs.oci.client.auth.fingerprint</name>
<value>*****</value>
</property>
<property>
<name>fs.oci.client.auth.pemFileString</name>
<value>
-----BEGIN PRIVATE KEY-----
*****
-----END PRIVATE KEY-----
</value>
</property>
</configuration>
オブジェクトストレージで使用する「oci://バケット名@ネームスペース/...」形式のURLを指定できるようにPipelineOptionsに以下を追加しました。
hdfsConfiguration=[{\"fs.defaultFS\":\"oci://バケット名@ネームスペース\"}]
以上で変更完了です。