EmbulkのFilter Pluginを試行錯誤しつつJavaで書いた結果

  • 16
    いいね
  • 1
    コメント
この記事は最終更新日から1年以上が経過しています。

ここ何日か、並列データ転送ツールのEmbulkを触っていますが、フィルタプラグインのJavaでの書き方が見付からなかったので、試行錯誤して書いてみた結果のメモです。自分でもEmbulkについてまだあまり把握できていないので、作法的に色々間違っている可能性が高いです。あまり信用しないでください。

input.csv
A,4
B,2
A,8
C,3
B,1
B,2
C,5

こういう入力を受け取って、

+-----------+------------+
| id:string | score:long |
+-----------+------------+
|         A |         12 |
|         B |          5 |
|         C |          8 |
+-----------+------------+

上のような感じに集計するフィルターを書いてみました。

$ embulk.bat new java-filter sum

でテンプレートを生成して、以下のように実装。

SumFilterPlugin.java
package org.embulk.filter;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;

public class SumFilterPlugin implements FilterPlugin {
    public interface PluginTask extends Task {
    }

    @Override
    public void transaction(ConfigSource config, Schema inputSchema,
            FilterPlugin.Control control) {
        PluginTask task = config.loadConfig(PluginTask.class);

        Schema outputSchema = inputSchema;

        control.run(task.dump(), outputSchema);
    }

    @Override
    public PageOutput open(TaskSource taskSource, final Schema inputSchema,
            final Schema outputSchema, final PageOutput output) {
        // PluginTask task = taskSource.loadTask(PluginTask.class);

        return new PageOutput() {
            private PageReader reader = new PageReader(inputSchema);

            @Override
            public void finish() {
                output.finish();
            }

            @Override
            public void close() {
                output.close();
            }

            @Override
            public void add(Page page) {
                reader.setPage(page);

                Map<String, Long> map = new HashMap<>();

                while (reader.nextRecord()) {
                    String id = reader.getString(0);
                    Long score = reader.getLong(1);

                    Long current = map.get(id);
                    current = current == null ? 0 : current;

                    map.put(id, current + score);
                }

                try (final PageBuilder builder = new PageBuilder(
                        Exec.getBufferAllocator(), outputSchema, output)) {
                    for (Entry<String, Long> entry : map.entrySet()) {
                        String id = entry.getKey();
                        Long score = entry.getValue();

                        builder.setString(0, id);
                        builder.setLong(1, score);
                        builder.addRecord();
                    }

                    builder.finish();
                }
            }
        };
    }
}

設定は以下。

config.yml
in:
  type: file
  path_prefix: input.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    null_string: 'NULL'
    header_line: false
    columns:
    - {name: id, type: string}
    - {name: score, type: long}
exec: {}
filters:
  - type: sum
out: {type: stdout}

ビルドして、

$ ./gradlew.bat gem
:compileJava
:processResources UP-TO-DATE
:classes
:jar
:assemble
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
:check
:build
:classpath
:gemspec
:gemWARNING:  no homepage specified

  Successfully built RubyGem
  Name: embulk-filter-sum
  Version: 0.1.0
  File: embulk-filter-sum-0.1.0.gem

BUILD SUCCESSFUL

Total time: 24.719 secs

プレビューしてみる。

$ embulk.bat preview -I lib config.yml
2015-03-07 19:50:11.981 +0900: Embulk v0.5.1
2015-03-07 19:50:14.842 +0900 [INFO] (preview): Listing local files at directory '.' filtering filename by prefix 'input.csv'
2015-03-07 19:50:14.849 +0900 [INFO] (preview): Loading files [input.csv]
+-----------+------------+
| id:string | score:long |
+-----------+------------+
|         A |         12 |
|         B |          5 |
|         C |          8 |
+-----------+------------+

実行結果は以下。

$ embulk.bat run -I lib config.yml
2015-03-07 19:50:39.210 +0900: Embulk v0.5.1
2015-03-07 19:50:42.147 +0900 [INFO] (transaction): Listing local files at directory '.' filtering filename by prefix 'input.csv'
2015-03-07 19:50:42.153 +0900 [INFO] (transaction): Loading files [input.csv]
2015-03-07 19:50:42.306 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
A,12
B,5
C,8
2015-03-07 19:50:42.435 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-03-07 19:50:42.480 +0900 [INFO] (main): Committed.
2015-03-07 19:50:42.480 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"input.csv"},"out":{}}

それっぽく動いてるみたい。

FilterPlugin#open()に渡されてくるPageOutputをラッピングしてあげる感じで良いのかな? PageBuilderの使い方はまだきちんと把握できてないです。一応、PageBuilder#finish()とかPageBuilder#flush()で渡したPageOutputにページを出力する処理は入るみたいだけども。

Embulkの第一印象は、凄い楽に使えるようにできてるなー、という感じ。.jarの.batへのリネームとか正直目から鱗でした。上手く活用すれば色々捗りそうです。