NiFi

Apache NiFiのCustom Processorを作ってみた

More than 1 year has passed since last update.


この記事はなに?

Apache NiFiは,システム間のデータフローを管理するために作られたデータフローオーケストレーションツールです.

NiFiには,"Processor"と呼ばれる,「データにどのような加工を施すか」を記述するための要素が存在します.

多くの汎用的なProcessorについてはすでにNiFiに組込まれているので,それを使えばいいのですが,少し変わったことをしようとすると,自分でProcessorを実装し(= Custom Processor),これをNiFiに組込む必要があります.

NiFiについては,まだ比較的新しいツールであるためか,日本語の記事が少ないです.

また,Custom Processorについては,記事そのものが少ないため,その作り方について調べたことをまとめておきます.


注意点


環境


  • Mac OS X 10.11.6

  • Apache NiFi 1.5.0

  • JDK version 1.8.0_92


手順


Custom Processorのテンプレートを作る

Maven ArchetypeのApache NiFi extenstionを使うことで,Custom Processorのテンプレートを作ることができる.

https://cwiki.apache.org/confluence/display/NiFi/Maven+Projects+for+Extensions

基本的に上記ページの

https://cwiki.apache.org/confluence/display/NiFi/Maven+Projects+for+Extensions#MavenProjectsforExtensions-ProcessorProjects

の手順に従う.

$ mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DnifiVersion=1.5.0

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.0.1:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.0.1:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:3.0.1:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype [org.apache.nifi:nifi-processor-bundle-archetype:1.5.0] found in catalog remote
Define value for property 'groupId': com.sample
Define value for property 'artifactId': samplenifiprocessor
Define value for property 'version' 1.0-SNAPSHOT: : 1.0.0
Define value for property 'artifactBaseName': nifi
Define value for property 'package' com.sample.processors.nifi: :
[INFO] Using property: nifiVersion = 1.5.0
Confirm properties configuration:
groupId: com.sample
artifactId: samplenifiprocessor
version: 1.0.0
artifactBaseName: nifi
package: com.sample.processors.nifi
nifiVersion: 1.5.0
Y: :
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: nifi-processor-bundle-archetype:0.3.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.sample
[INFO] Parameter: artifactId, Value: samplenifiprocessor
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: package, Value: com.sample.processors.nifi
[INFO] Parameter: packageInPathFormat, Value: com/sample/processors/nifi
[INFO] Parameter: package, Value: com.sample.processors.nifi
[INFO] Parameter: artifactBaseName, Value: nifi
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: groupId, Value: com.sample
[INFO] Parameter: artifactId, Value: samplenifiprocessor
[INFO] Parameter: nifiVersion, Value: 1.5.0
[INFO] Project created from Archetype in dir: /Users/lethe2211/projects/samplenifiprocessor
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 09:21 min
[INFO] Finished at: 2018-01-12T23:18:25+09:00
[INFO] Final Memory: 17M/316M
[INFO] ------------------------------------------------------------------------

いろいろ聞かれるので,上記ページの内容に従って答える.

(nifiVersionのパラメタを1.5.0にしていることに注意.)

$ cd samplenifiprocessor

$ tree .
.
├── nifi-nifi-nar
│   └── pom.xml
├── nifi-nifi-processors
│   ├── pom.xml
│   └── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │   └── sample
│   │   │   └── processors
│   │   │   └── nifi
│   │   │   └── MyProcessor.java
│   │   └── resources
│   │   └── META-INF
│   │   └── services
│   │   └── org.apache.nifi.processor.Processor
│   └── test
│   └── java
│   └── com
│   └── sample
│   └── processors
│   └── nifi
│   └── MyProcessorTest.java
└── pom.xml

こんな感じでテンプレができた.

ここで言うMyProcessor.javaがprocessorのロジックを記述する部分となる.


Custom Processorを実装する

ここまででボイラープレートは完成したので,ロジックの実装に移っていきたい.


参考

Custom Processorの実装には,NiFi自体への深い理解が必要.

最低限,事前に,

NiFi Developer Guide: https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html

と,

NiFiのコードベース上にある組込済Processorのソースコードをいくつか読むことをおすすめする(Custom Processorも組込済Processorと同じ方法でビルドされるため,実装が参考になる).


コードから読み解く実装例

ここでは,CountText Processorを例に取り,ざっくりとどういう流れで実装すればいいかを解説する.

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java

@EventDriven // このProcessorではイベントドリブンなProcessorのスケジューリングを行うことをNiFiに知らせる

@SideEffectFree // このProcessorではNiFiの外部の環境に変更を加えない(べき等である)ことをNiFiに知らせる.パフォーマンスに影響する
@SupportsBatching // ProcessSessionのコミットを同時に行って良いことをNiFiに知らせる
@Tags({"count", "text", "line", "word", "character"}) // Processorの検索画面に表示されるタグを指定する.Processorに関連のある言葉を指定しておくと検索しやすくなる
@InputRequirement(Requirement.INPUT_REQUIRED) // このProcessorには入力(前段のProcessorからの接続)が必要なことをNiFiに知らせる
@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. "
+ "The resulting flowfile will not have its content modified.") // Processorの検索画面に表示される説明を書く
@WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"),
@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"),
@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"),
@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"),
}) // このProcessorが,以上のAttributeに新たに値を書き込むことを運用者に知らせる
@SeeAlso(SplitText.class)
public class CountText extends AbstractProcessor { // ProcessorはAbstractProcessorを継承する必要がある
private static final List<Charset> STANDARD_CHARSETS = Arrays.asList(
StandardCharsets.UTF_8,
StandardCharsets.US_ASCII,
StandardCharsets.ISO_8859_1,
StandardCharsets.UTF_16,
StandardCharsets.UTF_16LE,
StandardCharsets.UTF_16BE);

private static final Pattern SYMBOL_PATTERN = Pattern.compile("[\\s-\\._]");
private static final Pattern WHITESPACE_ONLY_PATTERN = Pattern.compile("\\s");

// Attribute keys
public static final String TEXT_LINE_COUNT = "text.line.count";
public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count";
public static final String TEXT_WORD_COUNT = "text.word.count";
public static final String TEXT_CHARACTER_COUNT = "text.character.count";

// このProcessorのPropertyについての設定
public static final PropertyDescriptor TEXT_LINE_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-line-count")
.displayName("Count Lines")
.description("If enabled, will count the number of lines present in the incoming text.")
.required(true) // 必須
.allowableValues("true", "false") // "true"か"false"かしか値として認めない
.defaultValue("true")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) // BooleanとしてValidationを行う
.build();
public static final PropertyDescriptor TEXT_LINE_NONEMPTY_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-line-nonempty-count")
.displayName("Count Non-Empty Lines")
.description("If enabled, will count the number of lines that contain a non-whitespace character present in the incoming text.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor TEXT_WORD_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-word-count")
.displayName("Count Words")
.description("If enabled, will count the number of words (alphanumeric character groups bounded by whitespace)" +
" present in the incoming text. Common logical delimiters [_-.] do not bound a word unless 'Split Words on Symbols' is true.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor TEXT_CHARACTER_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-character-count")
.displayName("Count Characters")
.description("If enabled, will count the number of characters (including whitespace and symbols, but not including newlines and carriage returns) present in the incoming text.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor SPLIT_WORDS_ON_SYMBOLS_PD = new PropertyDescriptor.Builder()
.name("split-words-on-symbols")
.displayName("Split Words on Symbols")
.description("If enabled, the word count will identify strings separated by common logical delimiters [ _ - . ] as independent words (ex. split-words-on-symbols = 4 words).")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor CHARACTER_ENCODING_PD = new PropertyDescriptor.Builder()
.name("character-encoding")
.displayName("Character Encoding")
.description("Specifies a character encoding to use.")
.required(true)
.allowableValues(getStandardCharsetNames())
.defaultValue(StandardCharsets.UTF_8.displayName())
.build();

private static Set<String> getStandardCharsetNames() {
return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
}

// このProcessorが持つRelationshipについての設定
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The flowfile contains the original content with one or more attributes added containing the respective counts")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
.build();

private static final List<PropertyDescriptor> properties;
private static final Set<Relationship> relationships;

static {
properties = Collections.unmodifiableList(Arrays.asList(TEXT_LINE_COUNT_PD,
TEXT_LINE_NONEMPTY_COUNT_PD,
TEXT_WORD_COUNT_PD,
TEXT_CHARACTER_COUNT_PD,
SPLIT_WORDS_ON_SYMBOLS_PD,
CHARACTER_ENCODING_PD));

relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
REL_FAILURE)));
}

private volatile boolean countLines;
private volatile boolean countLinesNonEmpty;
private volatile boolean countWords;
private volatile boolean countCharacters;
private volatile boolean splitWordsOnSymbols;
private volatile String characterEncoding = StandardCharsets.UTF_8.name();

private volatile int lineCount;
private volatile int lineNonEmptyCount;
private volatile int wordCount;
private volatile int characterCount;

// 指定したRelationshipをNiFiに知らせる
@Override
public Set<Relationship> getRelationships() {
return relationships;
}

@OnScheduled // このProcessorがFlowFileを受け付けられる状態になったらこのメソッドが呼ばれる
public void onSchedule(ProcessContext context) {
this.countLines = context.getProperty(TEXT_LINE_COUNT_PD).isSet()
? context.getProperty(TEXT_LINE_COUNT_PD).asBoolean() : false;
this.countLinesNonEmpty = context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).isSet()
? context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).asBoolean() : false;
this.countWords = context.getProperty(TEXT_WORD_COUNT_PD).isSet()
? context.getProperty(TEXT_WORD_COUNT_PD).asBoolean() : false;
this.countCharacters = context.getProperty(TEXT_CHARACTER_COUNT_PD).isSet()
? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false;
this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet()
? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false;
this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue();
}

// このProcessorがFlowFileを受け付ける度にonTriggerが呼ばれる
// 基本的にロジックはここに書くことになる
/**
* Will count text attributes of the incoming stream.
*/

@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
// FlowFileの情報を取得
FlowFile sourceFlowFile = processSession.get();
if (sourceFlowFile == null) {
return;
}
// Processorでの処理はスレッドセーフである必要がある
AtomicBoolean error = new AtomicBoolean();

lineCount = 0;
lineNonEmptyCount = 0;
wordCount = 0;
characterCount = 0;

// ProcessSessionを経由してのみFlowFileの内容読み取り,変更が可能.
processSession.read(sourceFlowFile, in -> {
long start = System.nanoTime();

// Iterate over the lines in the text input
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, characterEncoding)); // FlowFileの内容を読み出す
String line;
while ((line = bufferedReader.readLine()) != null) {
if (countLines) {
lineCount++;
}

if (countLinesNonEmpty) {
if (line.trim().length() > 0) {
lineNonEmptyCount++;
}
}

if (countWords) {
wordCount += countWordsInLine(line, splitWordsOnSymbols);
}

if (countCharacters) {
characterCount += line.length();
}
}
long stop = System.nanoTime();
if (getLogger().isDebugEnabled()) {
final long durationNanos = stop - start;
DecimalFormat df = new DecimalFormat("#.###");
getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
}
if (getLogger().isInfoEnabled()) {
String message = generateMetricsMessage();
getLogger().info(message);
}

// Update session counters
processSession.adjustCounter("Lines Counted", (long) lineCount, false);
processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false);
processSession.adjustCounter("Words Counted", (long) wordCount, false);
processSession.adjustCounter("Characters Counted", (long) characterCount, false);
} catch (IOException e) {
error.set(true);
getLogger().error(e.getMessage() + " Routing to failure.", e);
}
});

if (error.get()) {
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
Map<String, String> metricAttributes = new HashMap<>();
if (countLines) {
metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount));
}
if (countLinesNonEmpty) {
metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount));
}
if (countWords) {
metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount));
}
if (countCharacters) {
metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount));
}
FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes); // Attributeに書き込む

processSession.transfer(updatedFlowFile, REL_SUCCESS); // "success"のRelationshipへFlowFileを渡す
}
}

private String generateMetricsMessage() {
StringBuilder sb = new StringBuilder("Counted ");
List<String> metrics = new ArrayList<>();
if (countLines) {
metrics.add(lineCount + " lines");
}
if (countLinesNonEmpty) {
metrics.add(lineNonEmptyCount + " non-empty lines");
}
if (countWords) {
metrics.add(wordCount + " words");
}
if (countCharacters) {
metrics.add(characterCount + " characters");
}
sb.append(StringUtils.join(metrics, ", "));
return sb.toString();
}

int countWordsInLine(String line, boolean splitWordsOnSymbols) throws IOException {
if (line == null || line.trim().length() == 0) {
return 0;
} else {
Pattern regex = splitWordsOnSymbols ? SYMBOL_PATTERN : WHITESPACE_ONLY_PATTERN;
final String[] words = regex.split(line);
// TODO: Trim individual words before counting to eliminate whitespace words?
if (getLogger().isDebugEnabled()) {
getLogger().debug("Split [" + line + "] to [" + StringUtils.join(Arrays.asList(words), ", ") + "] (" + words.length + ")");
}
return words.length;
}
}

// 指定したProcessorをNiFiに知らせる
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
}


参考文献


Processorにおけるエラーハンドリング

Custom Processorでは,基本的にonTriggerメソッドにロジックを記述するが,エラーハンドリングをする際のルールを以下に示す.



  1. onTriggerメソッドにおいてProcessExceptionがthrowされ,catchされなかった場合,session(におけるProcessorに対する変更)はロールバックされ,Processorは"Penalty Duration"に書かれた時間だけ,同じFlowFileの受け付けを停止する(`Penalizeと呼ばれる.そのExceptionは開発者側で想定済とみなされるため,そのFlowFileの受け付けのみを一定時間停止する).


  2. onTriggerメソッドにおいてProcessException以外のExceptionがthrowされ,catchされなかった場合,session(におけるProcessorに対する変更)はロールバックされ,Processorは"Yield Duration"に書かれた時間だけ動作を停止する(Administratively yieldingと呼ばれる.そのExceptionは開発者側で想定外とみなされるため,Processorごと停止する).


  3. onTriggerメソッド内にコールバックがある場合,そこでIOExceptionがthrowされた場合,ProcessExceptionにラップされた状態でonTriggerメソッドから送出される.


Custom Processorのテスト

NiFi側がnifi-mockというProcessorのテスト用フレームワークを用意している.


参考文献

public class JsonProcessorTest {

@Test
public void testOnTrigger() throws IOException {
// モックされる対象のFlowFileのContent
InputStream content = new ByteArrayInputStream("{\"hello\":\"nifi rocks\"}".getBytes());

// JsonProcessor用のTest runner
TestRunner runner = TestRunners.newTestRunner(new JsonProcessor());

// プロパティを追加
runner.setProperty(JsonProcessor.JSON_PATH, "$.hello");

// FlowFileをJsonProcessorに投入
runner.enqueue(content);

// 指定した回数(ここでは1回),投入されたFlowFileを処理
runner.run(1);

// 処理後,キューが空であることを確認
runner.assertQueueEmpty();

// 処理されたFlowFileが,Relationship,"SUCCESS"に行ったことを確認
List<MockFlowFile> results = runner.getFlowFilesForRelationship(JsonProcessor.SUCCESS);
assertTrue("1 match", results.size() == 1);
MockFlowFile result = results.get(0);
String resultValue = new String(runner.getContentAsByteArray(result));
System.out.println("Match: " + IOUtils.toString(runner.getContentAsByteArray(result)));

// 処理後のFlowFileの,AttributeとContentの確認
result.assertAttributeEquals(JsonProcessor.MATCH_ATTR, "nifi rocks");
result.assertContentEquals("nifi rocks");
}

}

これ単体でもある程度の品質を担保できると思うが,これに加えて,ロジック部分を切り出して,ユニットテストを行うようにしてもよい.


NARファイルをデプロイする

作成したCustom ProcessorをNiFi上で使用するためには,"NARファイル"と呼ばれるNiFi用のファイルをビルドし,NiFi上に配置する必要がある.


NARファイルをビルドする

前述のApache NiFi extenstionを使えばすでに雛形ができているので,mvn installするだけで,NARファイルのビルドを行うことができる.

$ pwd

# /path/to/samplenifiprocessor/dir

$ mvn install
[INFO] Scanning for projects...
[INFO] Inspecting build with total of 3 modules...
[INFO] Installing Nexus Staging features:
[INFO] ... total of 3 executions of maven-deploy-plugin replaced with nexus-staging-maven-plugin
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO]
[INFO] samplenifiprocessor
[INFO] nifi-nifi-processors
[INFO] nifi-nifi-nar
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building samplenifiprocessor 1.0.0
[INFO] ------------------------------------------------------------------------

...

------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.195 s
[INFO] Finished at: 2017-10-10T14:11:06+09:00
[INFO] Final Memory: 35M/543M
[INFO] ------------------------------------------------------------------------

$ ls -l nifi-nifi-nar/target/nifi-nifi-nar-1.0.0.nar
# 存在することを確認


NARファイルを配置する

$ nifi status

Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home
NiFi home: /usr/local/Cellar/nifi/1.5.0/libexec

Bootstrap Config File: /usr/local/Cellar/nifi/1.5.0/libexec/conf/bootstrap.conf

2017-10-10 15:57:46,657 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 53593, PID=39325
# "NiFi home"の値を確認する

$ cd /path/to/samplenifiprocessor/dir/nifi-nifi-nar/target/nifi-nifi-nar-1.0.0.nar /usr/local/Cellar/nifi/1.5.0/libexec/lib/
# "NiFi home"/lib以下にNarファイルをコピー

$ nifi restart
# NiFiを再起動