18
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

EmbulkのFilter Pluginを試行錯誤しつつJavaで書いた結果

Last updated at Posted at 2015-03-07

ここ何日か、並列データ転送ツールのEmbulkを触っていますが、フィルタプラグインのJavaでの書き方が見付からなかったので、試行錯誤して書いてみた結果のメモです。自分でもEmbulkについてまだあまり把握できていないので、作法的に色々間違っている可能性が高いです。あまり信用しないでください。

input.csv
A,4
B,2
A,8
C,3
B,1
B,2
C,5

こういう入力を受け取って、

+-----------+------------+
| id:string | score:long |
+-----------+------------+
|         A |         12 |
|         B |          5 |
|         C |          8 |
+-----------+------------+

上のような感じに集計するフィルターを書いてみました。

$ embulk.bat new java-filter sum

でテンプレートを生成して、以下のように実装。

SumFilterPlugin.java
package org.embulk.filter;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;

public class SumFilterPlugin implements FilterPlugin {
	public interface PluginTask extends Task {
	}

	@Override
	public void transaction(ConfigSource config, Schema inputSchema,
			FilterPlugin.Control control) {
		PluginTask task = config.loadConfig(PluginTask.class);

		Schema outputSchema = inputSchema;

		control.run(task.dump(), outputSchema);
	}

	@Override
	public PageOutput open(TaskSource taskSource, final Schema inputSchema,
			final Schema outputSchema, final PageOutput output) {
		// PluginTask task = taskSource.loadTask(PluginTask.class);

		return new PageOutput() {
			private PageReader reader = new PageReader(inputSchema);

			@Override
			public void finish() {
				output.finish();
			}

			@Override
			public void close() {
				output.close();
			}

			@Override
			public void add(Page page) {
				reader.setPage(page);

				Map<String, Long> map = new HashMap<>();

				while (reader.nextRecord()) {
					String id = reader.getString(0);
					Long score = reader.getLong(1);

					Long current = map.get(id);
					current = current == null ? 0 : current;

					map.put(id, current + score);
				}

				try (final PageBuilder builder = new PageBuilder(
						Exec.getBufferAllocator(), outputSchema, output)) {
					for (Entry<String, Long> entry : map.entrySet()) {
						String id = entry.getKey();
						Long score = entry.getValue();

						builder.setString(0, id);
						builder.setLong(1, score);
						builder.addRecord();
					}

					builder.finish();
				}
			}
		};
	}
}

設定は以下。

config.yml
in:
  type: file
  path_prefix: input.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    null_string: 'NULL'
    header_line: false
    columns:
    - {name: id, type: string}
    - {name: score, type: long}
exec: {}
filters:
  - type: sum
out: {type: stdout}

ビルドして、

$ ./gradlew.bat gem
:compileJava
:processResources UP-TO-DATE
:classes
:jar
:assemble
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
:check
:build
:classpath
:gemspec
:gemWARNING:  no homepage specified

  Successfully built RubyGem
  Name: embulk-filter-sum
  Version: 0.1.0
  File: embulk-filter-sum-0.1.0.gem

BUILD SUCCESSFUL

Total time: 24.719 secs

プレビューしてみる。

$ embulk.bat preview -I lib config.yml
2015-03-07 19:50:11.981 +0900: Embulk v0.5.1
2015-03-07 19:50:14.842 +0900 [INFO] (preview): Listing local files at directory '.' filtering filename by prefix 'input.csv'
2015-03-07 19:50:14.849 +0900 [INFO] (preview): Loading files [input.csv]
+-----------+------------+
| id:string | score:long |
+-----------+------------+
|         A |         12 |
|         B |          5 |
|         C |          8 |
+-----------+------------+

実行結果は以下。

$ embulk.bat run -I lib config.yml
2015-03-07 19:50:39.210 +0900: Embulk v0.5.1
2015-03-07 19:50:42.147 +0900 [INFO] (transaction): Listing local files at directory '.' filtering filename by prefix 'input.csv'
2015-03-07 19:50:42.153 +0900 [INFO] (transaction): Loading files [input.csv]
2015-03-07 19:50:42.306 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
A,12
B,5
C,8
2015-03-07 19:50:42.435 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-03-07 19:50:42.480 +0900 [INFO] (main): Committed.
2015-03-07 19:50:42.480 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"input.csv"},"out":{}}

それっぽく動いてるみたい。

FilterPlugin#open()に渡されてくるPageOutputをラッピングしてあげる感じで良いのかな? PageBuilderの使い方はまだきちんと把握できてないです。一応、PageBuilder#finish()とかPageBuilder#flush()で渡したPageOutputにページを出力する処理は入るみたいだけども。

Embulkの第一印象は、凄い楽に使えるようにできてるなー、という感じ。.jarの.batへのリネームとか正直目から鱗でした。上手く活用すれば色々捗りそうです。

18
16
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?