0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OracleからSnowflakeへテーブルを自動レプリケーションする方法

Posted at

みなさん、こんにちは!

Snowflake Summit 2025で発表されたOpenflowという機能があります。
様々なデータソースから一元的にデータを取り込むことができる便利な機能ですが、残念ながら執筆時点でOracleコネクタは完成しておらず、Oracleと接続することができません。

そこで本記事では、そんなOpenflowのOracleコネクタの代替となる方法を考えてみました。
OracleからSnowflakeへ特定のテーブルを自動レプリケーションするものです。

以降で詳しい手順についてご紹介します。

手順

事前準備

OracleはAWS RDSでインスタンスを作成し、起動している状態です。

今回はJavaプログラムを作成するため、Javaの統合開発環境であるEclipseを利用します。
Eclipseの導入方法と使用方法についてはこちらの記事の「準備」部分をご参照ください。

Oralceでテーブル作成

以下のSQLを実行し、Oracleにレプリケーション元となるテーブルとデータを作成します。

qiita.rb
CREATE TABLE sample_data (
    id NUMBER(6) CONSTRAINT pk1 PRIMARY KEY,
    content VARCHAR2(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT ALL
INTO sample_data (id, content) VALUES (1, 'Test 1')
INTO sample_data (id, content) VALUES (2, 'Test 2')
SELECT * FROM DUAL;

Snowflakeでデータベース作成

ワークシートで以下のSQLを実行し、レプリケーション先となるデータベースを作成しておきます。

qiita.rb
CREATE DATABASE TEST;
USE DATABASE TEST;

Javaプログラム作成

プロジェクトフォルダ直下に以下のxmlファイルを配置します。

pom.xml

qiita.rb
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>snowpipe-streaming-oracle</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
  </properties>

  <!-- 依存関係の設定 -->
  <dependencies>
    <dependency>
      <groupId>com.oracle.database.jdbc</groupId>
      <artifactId>ojdbc8</artifactId>
      <version>23.8.0.25.04</version>
    </dependency>
    <dependency>
      <groupId>net.snowflake</groupId>
      <artifactId>snowflake-jdbc</artifactId>
      <version>3.25.0</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.3.4</version>
    </dependency>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>aws-lambda-java-events</artifactId>
      <version>3.15.0</version>
    </dependency>
  </dependencies>

  <!-- ビルド設定 -->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

ReplicationHandler.java

qiita.rb
import com.fasterxml.jackson.databind.node.ObjectNode;

public class ReplicationHandler {
    public static void main(String args[]) {
        DBManager dbManager = new DBManager();
        SnowflakeManager sfManager = new SnowflakeManager();

        try {
            dbManager.getConnection();
            sfManager.getConnection();

            String table = "SAMPLE_DATA"; // 対象テーブル

            // Oracleテーブルデータ取得
            ObjectNode tableData = dbManager.fetchTableData(table);

            // Snowflakeにテーブルをレプリケーション
            sfManager.replicateTable(table, tableData);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            dbManager.close();
            sfManager.close();
        }
    }
}

SnowflakeManager.java

qiita.rb
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class SnowflakeManager {
    // 接続情報
    private static final String USER = "<USER>";
    private static final String PASSWORD = "<PASSWORD>";
    private static final String ACCOUNT = "<ACCOUNT>";
    private static final String WAREHOUSE = "COMPUTE_WH";
    private static final String DB = "TEST";
    private static final String SCHEMA = "PUBLIC";
    private static final String ROLE = "ACCOUNTADMIN";
    private static final String DRIVER = "net.snowflake.client.jdbc.SnowflakeDriver";
    private static final String URL = "jdbc:snowflake://<ACCOUNT_ID>.snowflakecomputing.com/";

    private Connection connection = null;

    // 型の対応Oracle : Snowflake
    Map<String, String> typeMap = new HashMap<String, String>() {
        {
            put("VARCHAR2", "STRING");
            put("TIMESTAMP(6)", "TIMESTAMP");
        }
    };

    // Snowflake接続メソッド
    public void getConnection() throws ClassNotFoundException, SQLException {
        Properties properties = new Properties();
        properties.put("user", USER);
        properties.put("password", PASSWORD);
        properties.put("account", ACCOUNT);
        properties.put("warehouse", WAREHOUSE);
        properties.put("db", DB);
        properties.put("schema", SCHEMA);
        properties.put("role", ROLE);

        Class.forName(DRIVER);
        this.connection = DriverManager.getConnection(URL, properties);
        System.out.println("Snowflakeへの接続に成功しました。");
    }

    // レプリケーションメソッド
    public void replicateTable(String tableName, ObjectNode tableData) throws SQLException {
        JsonNode columns = tableData.get("columns");
        JsonNode records = tableData.get("records");

        Statement statement = connection.createStatement();

        // テーブル作成
        String sql = "CREATE OR REPLACE TABLE " + tableName + "(";
        for (JsonNode column : columns) {
            String columnName = column.fields().next().getKey();
            String columnType = column.fields().next().getValue().asText();
            sql += columnName + " " + typeMap.getOrDefault(columnType, columnType) + ", ";
        }
        sql = sql.substring(0, sql.length() - 2) + ")";
        statement.executeUpdate(sql);

        // データ挿入
        sql = "INSERT INTO " + tableName + " VALUES ";
        for (JsonNode record : records) {
            sql += "(";
            for (JsonNode value : record) {
                sql += value + ", ";
            }
            sql = sql.substring(0, sql.length() - 2) + "), ";
        }
        sql = sql.substring(0, sql.length() - 2);
        sql = sql.replace("\"", "\'");
        statement.executeUpdate(sql);

        statement.close();
        System.out.println("テーブル'" + tableName + "'のレプリケーションに成功しました。");
    }

    // Snowflake接続切断メソッド
    public void close() {
        if (this.connection != null) {
            try {
                this.connection.close();
                System.out.println("Snowflakeへの接続を切断しました。");
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

DBManager.java

qiita.rb
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class DBManager {
    // 接続情報
    private static final String DRIVER = "oracle.jdbc.driver.OracleDriver";
    private static final String URL = "jdbc:oracle:thin:@//<ENDPOINT>:<PORT>/orcl";
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "<ADMIN_PASSWORD>";

    private Connection connection = null;

    // DB接続メソッド
    public void getConnection() throws ClassNotFoundException, SQLException {
        Class.forName(DRIVER);
        this.connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
        System.out.println("Oracleデータベースへの接続に成功しました。");
    }

    // テーブルデータ取得メソッド
    public ObjectNode fetchTableData(String tableName) throws SQLException {
        ObjectMapper mapper = new ObjectMapper();
        ObjectNode tableNode = mapper.createObjectNode();

        DatabaseMetaData metaData = this.connection.getMetaData();
        ResultSet tables = metaData.getTables(null, null, tableName, new String[] { "TABLE" });

        if (tables.next()) {
            // カラム定義取得
            ArrayNode columnsNode = mapper.createArrayNode();
            ResultSet columns = metaData.getColumns(null, null, tableName, null);
            while (columns.next()) {
                String columnName = columns.getString("COLUMN_NAME");
                String columnType = columns.getString("TYPE_NAME");
                columnsNode.add(mapper.createObjectNode().put(columnName, columnType));
            }
            columns.close();

            // テーブルデータ取得
            ArrayNode recordsNode = mapper.createArrayNode();
            Statement statement = this.connection.createStatement();
            ResultSet record = statement.executeQuery("SELECT * FROM " + tableName);
            while (record.next()) {
                ArrayNode rowNode = mapper.createArrayNode();
                for (JsonNode columnNode : columnsNode) {
                    String columnName = columnNode.fields().next().getKey();
                    String columnType = columnNode.fields().next().getValue().asText();
                    switch (columnType) {
                    case "NUMBER":
                        rowNode.add(record.getInt(columnName));
                        break;
                    default:
                        rowNode.add(record.getString(columnName));
                    }
                }
                recordsNode.add(rowNode);
            }
            statement.close();
            record.close();

            tableNode.set("columns", columnsNode);
            tableNode.set("records", recordsNode);
        }

        tables.close();

        return tableNode;
    }

    // DB接続切断メソッド
    public void close() {
        if (this.connection != null) {
            try {
                this.connection.close();
                System.out.println("Oracleデータベースへの接続を切断しました。");
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

各ファイル配置後、以下を実行します。

  1. pom.xmlを開き、右クリック→「Maven install」
  2. ReplicationHandler.javaを開き、右クリック→「実行」
    image.png

OracleとSnowflakeに接続し、テーブルがレプリケーションされていればOKです。
image.png

パッケージ作成

作成したプログラムをMavenでパッケージ化し、AWS Lambdaで利用できるようにします。

ReplicationHandler.java を以下のように修正します。

qiita.rb
import java.sql.Timestamp;
import java.util.HashMap;

import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class ReplicationHandler {
    public APIGatewayProxyResponseEvent main() {
        DBManager dbManager = new DBManager();
        SnowflakeManager sfManager = new SnowflakeManager();

        try {
            dbManager.getConnection();
            sfManager.getConnection();

            String table = "SAMPLE_DATA"; // 対象テーブル

            // Oracleテーブルデータ取得
            ObjectNode tableData = dbManager.fetchTableData(table);

            // Snowflakeにテーブルをレプリケーション
            sfManager.replicateTable(table, tableData);
        } catch (Exception e) {
            return buildResponse(500, e.getMessage());
        } finally {
            dbManager.close();
            sfManager.close();
        }

        return buildResponse(200, "Success");
    }

    public APIGatewayProxyResponseEvent buildResponse(int statusCode, String body) {
        APIGatewayProxyResponseEvent response = new APIGatewayProxyResponseEvent();
        response.setIsBase64Encoded(false);
        response.setStatusCode(statusCode);
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Content-Type", "application/json");
        response.setHeaders(headers);
        long millis = System.currentTimeMillis();
        Timestamp timestamp = new Timestamp(millis);
        response
                .setBody("{\"data\": [[0, {\"result\": \"" + body + "\", \"timestamp\": \"" + timestamp.toString() + "\"}]]}");

        return response;
    }
}

pom.xml を開き、以下を実行します。

  1. 「Maven clean」
  2. 「Maven install」

target/直下にjarファイルが作成されていればOKです。
image.png
2つ作成されていますが、依存関係パッケージもまとめられているXXX-jar-with-dependencies.jarが対象です。

Lambda関数作成

上記のjarファイルをS3にアップロードし、オブジェクトURLを控えておきます。
image.png

Lambdaのコンソールを開き、関数を作成します。ランタイムは最新のJava21を選択しています。
image.png

関数が作成されたら、「コード」→「コードソース」→「アップロード元」で「Amazon S3 の場所」を選択し、jarファイルのURLを入力します。
image.png

「ランタイム設定」で「編集」をクリックし、ハンドラを以下に変更します。

ReplicationHandler::main

また、デフォルトのタイムアウトは15秒で短いので、「設定」→「一般設定」の「編集」からタイムアウトを最大値の15分に変更しておきます。

API Gateway作成

Snowflakeから呼び出すためのAPIを作成します。
こちらの記事と同じ要領でAPI Gatewayの作成を行います。

外部関数作成

Snowflakeに戻り、ワークシートで以下のSQLを実行します。

qiita.rb
CREATE OR REPLACE EXTERNAL FUNCTION replicate_oracle_table()
  RETURNS STRING
  API_INTEGRATION = int_replication_api
  AS '<APIの呼び出しURL>';

動作確認

作成した外部関数を実行してみます。

qiita.rb
SELECT oracle_snowflake_replication()

image.png

正常に実行されたことが確認できました!

定期実行

タスクを作成することで、外部関数を定期実行させることが可能です。

qiita.rb
-- タスク作成
CREATE OR REPLACE TASK oracle_replication_task
  WAREHOUSE = compute_wh -- 適切なウェアハウスを指定
  SCHEDULE = 'USING CRON */10 * * * * UTC' -- 10分ごとに実行 (UTCタイムゾーン)
AS
  SELECT replicate_oracle_table();

-- タスク有効化
ALTER TASK oracle_replication_task RESUME;

-- タスク一時停止
ALTER TASK oracle_replication_task SUSPEND;

補足

今回作成したプログラムはあくまで簡易的なものなので、本格的な自動レプリケーションを行う場合は以下の点に注意が必要です。

  • OracleとSnowflakeのデータ型のマッピングを完全にする
  • API Gatewayのタイムアウトは30秒であるため、処理に時間がかかる場合はタイムアウト回避の検討が必要
    ・呼び出し専用Lambda関数を用意し、本体の処理は非同期で実行させるなど

さいごに

OracleからSnowflakeへテーブルを自動レプリケーションする方法についてご紹介しました。

OpenflowのOracleコネクタが正式にリリースされれば必要なくなってしまう方法かもしれませんが、Oracleコネクタで対応しきれない細かいカスタマイズを行いたい場合や、Oracleコネクタを使うに及ばない処理で課金を節約したい場合には有効な手段になるかと思います。ユースケースに応じて、本記事の方法も参考にしていただければ幸いです。

なお、OpenflowのセットアップからDB連携までを解説した記事も公開しています。こちらも併せてご覧ください。

Snowflake Openflowのセットアップ~DB連携まで

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?