Posted at

Java上でカラム情報を付与してPostgresqlのcopyコマンドを実行する方法


はじめに

題材としてピンポイント過ぎて弱い気がしますが執筆。

PostgreSQLのcopyコマンドは、

ファイルをテーブルに格納する際に高速で処理を行う事ができます。

しかし、CSVをそのまま登録することはできるが、

値を補完したいって場合に、


  1. CSVファイルを読み込む。

  2. カラムを付与してCSVファイルを作成する。

  3. CopyManagerでcopyコマンドを実行してCSVをロードする。

って、順番で処理をしていると、

ストレージへの書込が2度(ファイルとDB)走るので、

一本化した方が良いと思い実装。


Javaでcopyコマンドをナイーブに実行するには

簡単に書くとこんな感じ、

  public static long copy(Connection conn, String filePath, String tableName) throws Exception {

CopyManager copyManager = new CopyManager((BaseConnection) conn);
Reader reader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF8"));
String sql = "copy " + tableName + " FROM STDIN WITH DELIMITER ','";
long result = copyManager.copyIn(sql, reader);
reader.close();
return result;
}

CopyManager にコネクションを渡して、

CSVのReaderクラスインターフェースのインスタンスを作成して、

copyInに渡して流し込む。

この時、CSVがそのままテーブルのカラム構成になっている事が前提。


どこで付与するか?

Readerを自前で作成して、

Readするタイミングでシステムカラムを付与してあげようと思い、

以下のようなReadメソッドを実装したReaderクラスを作成。

public class CsvFileWithSysColReader extends Reader {

/** カラムを付与した文字列を格納するキュー */
private final Queue<Character> csvBuffer;
/** CSVのReader コンストラクタで引数で設定してあげる */
private final BufferedReader reader = new ArrayDeque<>();
//~~略~~
/**
* コンストラクタ
*
* @param reader ラップするBufferdReader
* @throws IOException
*/

public CsvFileWithSysColReader(final BufferedReader reader)
throws IOException {
// 引数にskipHeaderとかのBooleanを持たせてヘッダスキップ機能を持たせるのも良いかも。
this.reader = reader;
}

@Override
public int read(final char[] cbuf, final int off, final int len) throws IOException {
int readCount = len;
// CSVの読込バッファサイズがCopyコマンドの読込桁数未満だったらファイルから読み取る。
while (this.csvBuffer.size() < len) {
// ファイルの最終だった場合は、終了して現在の桁数のみ返す。
if (loadLine() == 0) {
readCount = this.csvBuffer.size();
break;
}
}
// 読込があった場合はその桁数を返す。
if (readCount != 0) {
for (int i = off; i < readCount; i++) {
cbuf[i] = this.csvBuffer.poll();
}
return readCount;
} else {
// 無かった場合は終端の-1を返却する。
return -1;
}
}

/**
* 行ロード
*
* @return ロード結果文字数
* @throws IOException
*/

private int loadLine() throws IOException {
final String line = this.reader.readLine();
if (line == null) {
// 取得行が無ければ0を返却して終了する。
return 0;
} else {
// addSysCol:システムカラムを付与する関数です、好きに編集してください。
// この辺りでチェックとかしてエラーカラムを付与するとかも可能
final String lineWithSysCol= addSysCol(line);
for (int i = 0; i < lineWithSysCol.length(); i++) {
this.csvBuffer.add(lineWithSysCol.charAt(i));
}
return lineWithSysCol.length();
}
}
//~~略~~
}

そしてこんな感じで実行する。

  public static long copy(final Connection conn, final String filePath, final String tableName) throws Exception {

final CopyManager copyManager = new CopyManager((BaseConnection) conn);
final Reader reader = new CsvFileWithSysColReader(new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF8")));
final String sql = "copy " + tableName + " FROM STDIN WITH DELIMITER ','";
final long result = copyManager.copyIn(sql, reader);
reader.close();
return result;
}

上記CsvFileWithSysColReader.addSysCol(String line)の実装で以下のように幅が広がります。


  1. CSVに無い情報を外部から渡して付与する。

  2. 妥当性検証して、結果をロードするテーブルカラムに設定する

  3. テーブルカラム桁数に合わせて桁あふれをカットする

  4. 連番を振る

  5. etc...


おわりに

アプリケーション開発において、

I/Oが固定される事が多いので、

Interfaceでラップクラスを作ってそこで処理を吸収すれば

どうにかなる事が多いです。

ちな、

実際は色々な機能を付けて実装したのですが、

そこから機能を落として要点だけを書くようにしたのですが、

抜けがあったらごめんなさい。。。