ここ何日か、並列データ転送ツールのEmbulkを触っていますが、フィルタプラグインのJavaでの書き方が見付からなかったので、試行錯誤して書いてみた結果のメモです。自分でもEmbulkについてまだあまり把握できていないので、作法的に色々間違っている可能性が高いです。あまり信用しないでください。
input.csv
A,4
B,2
A,8
C,3
B,1
B,2
C,5
こういう入力を受け取って、
+-----------+------------+
| id:string | score:long |
+-----------+------------+
| A | 12 |
| B | 5 |
| C | 8 |
+-----------+------------+
上のような感じに集計するフィルターを書いてみました。
$ embulk.bat new java-filter sum
でテンプレートを生成して、以下のように実装。
SumFilterPlugin.java
package org.embulk.filter;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
public class SumFilterPlugin implements FilterPlugin {
public interface PluginTask extends Task {
}
@Override
public void transaction(ConfigSource config, Schema inputSchema,
FilterPlugin.Control control) {
PluginTask task = config.loadConfig(PluginTask.class);
Schema outputSchema = inputSchema;
control.run(task.dump(), outputSchema);
}
@Override
public PageOutput open(TaskSource taskSource, final Schema inputSchema,
final Schema outputSchema, final PageOutput output) {
// PluginTask task = taskSource.loadTask(PluginTask.class);
return new PageOutput() {
private PageReader reader = new PageReader(inputSchema);
@Override
public void finish() {
output.finish();
}
@Override
public void close() {
output.close();
}
@Override
public void add(Page page) {
reader.setPage(page);
Map<String, Long> map = new HashMap<>();
while (reader.nextRecord()) {
String id = reader.getString(0);
Long score = reader.getLong(1);
Long current = map.get(id);
current = current == null ? 0 : current;
map.put(id, current + score);
}
try (final PageBuilder builder = new PageBuilder(
Exec.getBufferAllocator(), outputSchema, output)) {
for (Entry<String, Long> entry : map.entrySet()) {
String id = entry.getKey();
Long score = entry.getValue();
builder.setString(0, id);
builder.setLong(1, score);
builder.addRecord();
}
builder.finish();
}
}
};
}
}
設定は以下。
config.yml
in:
type: file
path_prefix: input.csv
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
null_string: 'NULL'
header_line: false
columns:
- {name: id, type: string}
- {name: score, type: long}
exec: {}
filters:
- type: sum
out: {type: stdout}
ビルドして、
$ ./gradlew.bat gem
:compileJava
:processResources UP-TO-DATE
:classes
:jar
:assemble
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
:check
:build
:classpath
:gemspec
:gemWARNING: no homepage specified
Successfully built RubyGem
Name: embulk-filter-sum
Version: 0.1.0
File: embulk-filter-sum-0.1.0.gem
BUILD SUCCESSFUL
Total time: 24.719 secs
プレビューしてみる。
$ embulk.bat preview -I lib config.yml
2015-03-07 19:50:11.981 +0900: Embulk v0.5.1
2015-03-07 19:50:14.842 +0900 [INFO] (preview): Listing local files at directory '.' filtering filename by prefix 'input.csv'
2015-03-07 19:50:14.849 +0900 [INFO] (preview): Loading files [input.csv]
+-----------+------------+
| id:string | score:long |
+-----------+------------+
| A | 12 |
| B | 5 |
| C | 8 |
+-----------+------------+
実行結果は以下。
$ embulk.bat run -I lib config.yml
2015-03-07 19:50:39.210 +0900: Embulk v0.5.1
2015-03-07 19:50:42.147 +0900 [INFO] (transaction): Listing local files at directory '.' filtering filename by prefix 'input.csv'
2015-03-07 19:50:42.153 +0900 [INFO] (transaction): Loading files [input.csv]
2015-03-07 19:50:42.306 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
A,12
B,5
C,8
2015-03-07 19:50:42.435 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-03-07 19:50:42.480 +0900 [INFO] (main): Committed.
2015-03-07 19:50:42.480 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"input.csv"},"out":{}}
それっぽく動いてるみたい。
FilterPlugin#open()に渡されてくるPageOutputをラッピングしてあげる感じで良いのかな? PageBuilderの使い方はまだきちんと把握できてないです。一応、PageBuilder#finish()とかPageBuilder#flush()で渡したPageOutput
にページを出力する処理は入るみたいだけども。
Embulkの第一印象は、凄い楽に使えるようにできてるなー、という感じ。.jarの.batへのリネームとか正直目から鱗でした。上手く活用すれば色々捗りそうです。