はじめに
去年に、SOAPとBulk APIのサンプルを書いたので、その続編になります。コードは書いてみたものの、動作確認する時間がとれず、ちゃんと動くかは不明なのであしからず。
PK-chunkingとは
通常のBulk APIは、クエリの結果を1GBごとのファイル(最大15ファイル)に分割して、それらをダウンロードする形になります。
PK-chunkingは、SalesforceのIDを使って、そのクエリを分割する仕組みです。
通常のクエリが下記の場合、
SELECT Name FROM Account
PK-chunkingでは次のように分割するイメージです。
SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK
条件なしだと、クエリの完了までに時間がかかっても、PKを使った分割の条件をいれることによって、それぞれのクエリにかかる時間を短くすることができます。(一つのジョブを、マルチプロセスで分割することで処理時間を短くするイメージです)
また、分割の最大数は25万件なので、結果ファイルのファイルサイズも小さくできることも見込めます。
PK-chunkingの処理概要
公式サイトでは、curlコマンドで一連の流れを実行する手順を説明しています。
例えば、100万件のデータをPK-chunkingしようとした場合、
- requestヘッダーにPK-chunkingとchunksizeの設定追加
- PK-chunkingのジョブが作成される
- そのジョブに、4つのバッチが登録される
- 各バッチで、25万件ずつ取得するためのクエリが実行される(並列)
- バッチが完了すると、クエリの結果ファイルをダウンロードするためのURLが取得できる
- 各URLからファイルをダウンロード
となります。
コードの実装方針
クエリの結果が、非同期に複数のファイルで作成されるので、そこの取扱が難しいです。
実用性を考慮して、この複数の結果を一つのファイルに集約・かつgzip圧縮するサンプルを作成しました。
https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java
複数ファイルをひとつにまとめる
実はこういうケースはパイプを使うというのが、Javaに限らず、定番のやり方です。
シェルだと、named pipe、mkfifoなどになります。
Javaの場合は、
1.PipedOutputStreamに、各ファイルの内容を書き込み
2.別プロセスで、PipedInputStreamを読み込みをすることで上記の内容を取得できる
となります。
try (PipedOutputStream pipedOut = new PipedOutputStream();
PipedInputStream pipedIn = new PipedInputStream(pipedOut);
....
ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
// 読み取り用パイプの内容をファイルに書き込みを別スレッドで開始
executor.submit(() -> {
try {
String line;
while ((line = pipedReader.readLine()) != null) {
bw.write(line);
bw.newLine();
}
} catch (Exception e) {
logger.error("Failed.", e);
}
});
// バッチごとにステータスチェック+結果をパイプに書き込み
for (BatchInfo chunkBatch: batchList) {
// ネットワークの通信量に制約がなければ非同期で行う.
// executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
}
別のやり方
まとめるファイルの数が2~3個など少なく、読み込みが直列でもいいなら、SequenceInputStreamというのがあります。このクラスでラップすることで、複数ファイルの読み込みを、論理的にひとつにできます。内部的には、ひとつずつ順番に読み込んでいます。コンストラクタの引数が、Enumerationか2変数の2パターンしかないので、やや使いづらいです。
gzip圧縮
Javaの場合は、GZIPOutputStreamでラップすればよいです。
文字コードを指定する場合は、OutputStreamWriterで更にラップします。
CSVの読み書きのライブラリを使いたい場合、たいていはWriterを引数にとるコンストラクタがあるので、OutputStreamWriterかBufferWriterを渡すだけです。(とはいえコード量が多くて、ややうんざり感はあります)
OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)
分割されたバッチの取扱
PK-chunkingをすると、
- 分割するためのバッチがまず登録される
- そのバッチのステータスがNOT PROCESSEDになる
- そのあとに、分割されたクエリを実行するバッチが登録される
となります。なので、最初の例でいうと、実際にはバッチは5個作成され、2~5番のバッチから、結果ファイルを取得するということになります。
コードでいうと、
1.ジョブから、登録されているバッチIDをすべて取得
2.先頭のバッチIDのステータスがNot Processedなら、先頭を除いたバッチIDのリストを返却
3.ステータスがエラーなら、処理中断
4.ステータスがそれ以外なら、一定時間待機
となります。
BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
BatchInfo batchInfo = infoList.get(0);
switch (batchInfo.getState()) {
case NotProcessed:
// 先頭以降のバッチがクエリ結果に関するもの
infoList.remove(0);
result.complete(infoList);
break;
case Failed:
logger.warn("batch:" + job.getId() + " failed.");
result.complete(Collections.emptyList());
break;
default:
logger.info("-- waiting --");
logger.info("state: " + batchInfo.getState());
}
ひとつのバッチIDに対して、
- ステータスが完了になるまで定期的にポーリング
- 完了したら、結果ファイルのURLを取得
は、通常のBulk APIと同じで、PK-chunkingだと、それが複数になるだけです。
結果ファイルの取得を、非同期でやるか、できたものから順に直列でやるかは、要件次第でしょう。
速さ優先なら非同期になります。
厄介なのは、論理的には各バッチも結果ファイルが複数になる可能性があります。
最初の例でいうと、分割後の25万件になっても結果ファイルのサイズが1GBを超える場合です。
(その場合、全体のサイズが4GBをこえているので、分割してもBulkクエリの制限にひっかかりそうな気もしますが...)
ですが、今回のサンプルではパイプをつかっているので、各ファイルの結果をパイプに書き込むだけで、その場合も対応できます。
// パイプへの書き込み
for (String resultId: resultIds) {
try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
String line;
while ((line = br.readLine()) != null) {
pipedWriter.write(line);
pipedWriter.newLine();
}
}
まとめ
ネタ元はSalesforceですが、技術要素はほぼJavaな記事になりました...
いろいろ決め打ちすれば、もっと適当な実装で短くかけなくもないのですが、すこしガチ目にやってみることにしました。おかけで、いくつか新しいテクニックを取得できたので、それは良かったです。
Salesforceには、dataloaderという、Salesforceのデータ操作ができるGUIツールがあります。もちろん、Bulk APIを使えますが、PK-chunkingがこの記事を書いた時点ではサポートしていないです。(PRはあがっているようです: https://github.com/forcedotcom/dataloader/pull/138)
いろいろ面倒なのでサポートしていないのもなんとなくわかったような気もします。
p.s
dataloaderのreadmeに、cliで使うためのやり方がでてきました。実態はexecutable jarなのは知っていたので、できると思っていたのですが、公式ででしたくれたのはありがたいです。configファイルのサンプルもでているようです。