ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex) その2 -コーディング編- - 駄文型より。
ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex) - Qiita の続きです。前回は環境構築や実行のみを行い、サンプルコードの中身までは触れませんでした。そこで今回はサンプルコードを読み解き、 Apex ストリーミングアプリケーション開発の雰囲気を掴んでいきます。
概要
サンプルコードのファイルは以下の7つです。
- ApplicationWordCount.java
- LineReader.java
- WordReader.java
- WindowWordCount.java
- FileWordCount.java
- WordCountWriter.java
- WCPair.java
参考
ApplicationWordCount
まずはアプリケーション本体です。やっていることは単純で、オペレーターを作成し、それらをストリームで繋いで DAG を作っているだけです。
package com.example.myapexapp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// create operators
LineReader lineReader = dag.addOperator("lineReader", new LineReader());
WordReader wordReader = dag.addOperator("wordReader", new WordReader());
WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount());
FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount());
WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter());
// create streams
dag.addStream("lines", lineReader.output, wordReader.input);
dag.addStream("control", lineReader.control, fileWordCount.control);
dag.addStream("words", wordReader.output, windowWordCount.input);
dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
}
}
@ApplicationAnnotation(name="SortedWordCount")
でアプリケーション名を設定できるようです。これは前回実行時に出てきた名称と一致します。
apex> launch target/myapexapp-1.0-SNAPSHOT.apa
1. MyFirstApplication
2. SortedWordCount # これ
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) >
各オペレーターとストリームの関係を図示すると下の図のようになります。ほぼ一直線ですが、 lineReader
から fileWordCount
へ control
というストリームが飛んでいます。 Apex ではこのようにオペレーターを繋いで DAG をつくりことでアプリケーションを組みます。オペレーターの接続点をポートと呼びます。
LineReader
先頭 lineReader
オペレーターから見ていきます。このオペレーターは、ファイルを読み込み、中身を output
ポートに流し、ファイルパスを control
に流します。
package com.example.myapexapp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
private transient BufferedReader br = null;
private Path path;
@Override
protected InputStream openFile(Path curPath) throws IOException
{
LOG.info("openFile: curPath = {}", curPath);
path = curPath;
InputStream is = super.openFile(path);
br = new BufferedReader(new InputStreamReader(is));
return is;
}
@Override
protected void closeFile(InputStream is) throws IOException
{
super.closeFile(is);
br.close();
br = null;
path = null;
}
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
@Override
protected void emit(String tuple)
{
output.emit(tuple);
}
}
メンバー変数
このオペレータークラスのメンバは以下の5つです。
- LOG: ロガー
- output: アウトプットポート
- control: アウトプットポート
- br: バッファ
- path: ファイルのパス
特に説明は不要ですが、 control に対して
@OutputPortFieldAnnotation(optional = true)
というアノテーションが追加されています。オプショナルが true になっていますね。何故オプショナルにしているのかはよくわかりません。わかったら追記します。
メソッド
LineReader
では openFile
, closeFile
, readEntity
, emit
というメソッドが定義されており、それぞれでファイルを開いた時の処理、閉じたときの処理、開いたファイルから次のタプルを読み込む処理、タプルへの処理を記述しています。
readEntitiy
では、バッファから1行読み込み、それを返しています。 EOF まで読み込み終わったら contorl ポートにファイル名を流し、ストリームを終了させています。戻り値として返したものが次のタプルになり、 emit
メソッドで処理されます。 null を返すとストリームが終了します。
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
WordReader
WordReader は、 LineReader から送られてきた1行分のデータを単語に分割しています。
package com.example.myapexapp;
import java.util.regex.Pattern;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// extracts words from input line
public class WordReader extends BaseOperator
{
// default pattern for word-separators
private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
private String nonWordStr; // configurable regex
private transient Pattern nonWord; // compiled regex
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
public String getNonWordStr() {
return nonWordStr;
}
public void setNonWordStr(String regex) {
nonWordStr = regex;
}
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
}
メンバー変数
- nonWordDefault: デフォルトの単語分割パターン
- nonWordStr: 単語分割用の正規表現
- nonWord: 単語分割パターン
- output: アウトプットポート
- input: インプットポート
これもほぼ説明は不要かと想いますが、インプットポートはアウトプットポート異なり、単に new するだけでなく、なにやら処理が記述されています。
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
インプットポートには、入力されたデータをどう処理するかを記述します。これは他のオペレーターも同様で、 WordReader 以降の FileWordCount までのオペレーターも同様にインプットポートごとに process
メソッドが定義されています。ここでは、入力データをパターンで単語に分割して、アウトプットポートに流しているようです。
メソッド
- getNonWordStr
- setNonWordStr
- setup
getNonWordStr
と setNonWordStr
はただの getter と setter ですね。 setup
はセットアップ時に呼び出され、 nonWord
の設定を行っています。
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
WindowWordCount
WindowWordCount は、 WordReader から送られる単語をウィンドウ内で集計して単語と出現頻度のペアにまとめます。
package com.example.myapexapp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
// wordMap : word => frequency
protected Map<String, WCPair> wordMap = new HashMap<>();
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
// output port which emits the list of word frequencies for current window
// fileName => list of (word, freq) pairs
//
public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
@Override
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
}
メンバー変数
- LOG: ロガー
- wordMap: 単語と出現頻度のペアをマップにまとめたもの
- input: インプットポート
- output: アウトプットポート
wordMap
単語と頻度のペアは WCPair
として WCPair.java
で定義されています。
package com.example.myapexapp;
// a single (word, frequency) pair
public class WCPair {
public String word;
public int freq;
public WCPair() {}
public WCPair(String w, int f) {
word = w;
freq = f;
}
@Override
public String toString() {
return String.format("(%s, %d)", word, freq);
}
}
input
の中身を見ていきます。 WordReader
の出力は1単語なので、入力は単語がひとつになります。入力された単語を wordMap
から探索し、存在していれば頻度に1を加算し、存在していなければ新たに WCPair
を作成してマップに追加します。
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
メソッド
メソッドは endWindow
のみです。このメソッドは、ウィンドウが終了したときに呼び出されます。マップをリストにしてアウトプットポートに流しています。ウィンドウの設定をしていないようなので、デフォルト値があると思われるのですが、デフォルト値がなんなのかよくわかりません。わかったら追記しておきます。
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
FileWordCount
FileWordCount はファイル全体から単語の出現頻度を集計します。
public class FileWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
// set to true when we get an EOF control tuple
protected boolean eof = false;
// last component of path (i.e. only file name)
// incoming value from control tuple
protected String fileName;
// wordMapFile : {word => frequency} map, current file, all words
protected Map<String, WCPair> wordMapFile = new HashMap<>();
// singleton map of fileName to sorted list of (word, frequency) pairs
protected transient Map<String, Object> resultFileFinal;
// final sorted list of (word,frequency) pairs
protected transient List<WCPair> fileFinalList;
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
// fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
public final transient DefaultOutputPort<Map<String, Object>>
fileOutput = new DefaultOutputPort<>();
@Override
public void setup(OperatorContext context)
{
// singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
resultFileFinal = new HashMap<>(1);
fileFinalList = new ArrayList<>();
}
@Override
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
// populate fileFinalList with topN frequencies from argument
// This list is suitable input to WordCountWriter which writes it to a file
// MUST have map.size() > 0 here
//
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
}
メンバー変数
- LOG: ロガー
- eof: ファイルの読み込み終わりフラグ
- fileName: ファイル名
- wordMapFile: ファイル単位での単語の出現頻度
- resultFileFinal: 集計結果を格納するマップ
- fileFinalList: 集計結果を格納するリスト
- input: インプットポート
- control: インプットポート、ファイル名
- fileOutput: アウトプットポート
input
を見ていきます。 WindowWordCount
とほぼ同じで、単語の出現頻度を集計してマップrに入れていきます。
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
次に control
です。 lineReader
からファイル名が流れてくるので、それを保存して eof
フラグを ON にします。
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
メソッド
setup
はマップとリストの初期化を行っているだけなので省略します。
endWindow
を見ていきます。少々長いですが、やっていることは単純で、ウィンドウが終わったタイミングで eof
が ON になっている(つまりファイルの読み込みが終わっている)場合、集計結果をアウトプットポートに流します。流すときに getList
でソートなどの加工を行っています。
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
WordCountWriter
最後は WordCountWriter です。集計結果をファイルに出力するオペレーターです。こちらは今までとは異なり、 AbstractFileOutputOperator
を継承しているので、少々実装が違ってきます。
public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
private static final String charsetName = "UTF-8";
private static final String nl = System.lineSeparator();
private String fileName; // current file name
private transient final StringBuilder sb = new StringBuilder();
@Override
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
// input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
// list of pairs: (word, frequency)
//
@Override
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
@Override
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
}
メンバー変数
- LOG: ロガー
- charsetName: エンコード
- nl: 改行コード
- fileName: ファイル名
- sb: StringBuilder
メソッド
endWindow
では、 requestFinalize
を呼び出してファイル処理を終了します。
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
getFileName
には、ファイル名の取得方法を記述します。
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
getBytesForTuple
では、タプルからファイルに出力する内容を記述します。ここではリストを整形して一つの文字列にして返しています。
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
まとめ
Apex ではまずアプリケーション全体をオペレーターの DAG で設計します。 DAG の構成が決まったら各オペレーターの処理を記述するだけです。他のストリーム処理エンジンに詳しくないので、比較はできませんが、分散処理をあまり意識しなくていいので書きやすそうです。また、よく使われそうな処理は Malhar にライブラリとして準備されているため、簡単に書くことができます。