ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex) その2 コーディング編

ストリーム処理で文章内にある単語の出現頻度をカウントする(Apache Apex) - Qiita の続きです。前回は環境構築や実行のみを行い、サンプルコードの中身までは触れませんでした。そこで今回はサンプルコードを読み解き、 Apex ストリーミングアプリケーション開発の雰囲気を掴んでいきます。



  • ApplicationWordCount.java
  • LineReader.java
  • WordReader.java
  • WindowWordCount.java
  • FileWordCount.java
  • WordCountWriter.java
  • WCPair.java

topnwords - GitHub



まずはアプリケーション本体です。やっていることは単純で、オペレーターを作成し、それらをストリームで繋いで DAG を作っているだけです。

package com.example.myapexapp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import org.apache.hadoop.conf.Configuration;

public class ApplicationWordCount implements StreamingApplication
  private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);

  public void populateDAG(DAG dag, Configuration conf)
    // create operators

    LineReader lineReader            = dag.addOperator("lineReader", new LineReader());
    WordReader wordReader            = dag.addOperator("wordReader", new WordReader());
    WindowWordCount windowWordCount  = dag.addOperator("windowWordCount", new WindowWordCount());
    FileWordCount fileWordCount      = dag.addOperator("fileWordCount", new FileWordCount());
    WordCountWriter wcWriter         = dag.addOperator("wcWriter", new WordCountWriter());

    // create streams

    dag.addStream("lines",   lineReader.output,  wordReader.input);
    dag.addStream("control", lineReader.control, fileWordCount.control);
    dag.addStream("words",   wordReader.output,  windowWordCount.input);
    dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
    dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);


@ApplicationAnnotation(name="SortedWordCount") でアプリケーション名を設定できるようです。これは前回実行時に出てきた名称と一致します。

apex> launch target/myapexapp-1.0-SNAPSHOT.apa
  1. MyFirstApplication
  2. SortedWordCount     # これ
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) > 

各オペレーターとストリームの関係を図示すると下の図のようになります。ほぼ一直線ですが、 lineReader から fileWordCountcontrol というストリームが飛んでいます。 Apex ではこのようにオペレーターを繋いで DAG をつくりことでアプリケーションを組みます。オペレーターの接続点をポートと呼びます。



先頭 lineReader オペレーターから見ていきます。このオペレーターは、ファイルを読み込み、中身を output ポートに流し、ファイルパスを control に流します。

package com.example.myapexapp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;

// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
public class LineReader extends AbstractFileInputOperator<String>
  private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);

  public final transient DefaultOutputPort<String> output  = new DefaultOutputPort<>();

  @OutputPortFieldAnnotation(optional = true)
  public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();

  private transient BufferedReader br = null;

  private Path path;

  protected InputStream openFile(Path curPath) throws IOException
    LOG.info("openFile: curPath = {}", curPath);
    path = curPath;
    InputStream is = super.openFile(path);
    br = new BufferedReader(new InputStreamReader(is));
    return is;

  protected void closeFile(InputStream is) throws IOException
    br = null;
    path = null;

  // return empty string 
  protected String readEntity() throws IOException
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path

    return null;

  protected void emit(String tuple)



  • LOG: ロガー
  • output: アウトプットポート
  • control: アウトプットポート
  • br: バッファ
  • path: ファイルのパス

特に説明は不要ですが、 control に対して

@OutputPortFieldAnnotation(optional = true)

というアノテーションが追加されています。オプショナルが true になっていますね。何故オプショナルにしているのかはよくわかりません。わかったら追記します。


LineReader では openFile , closeFile , readEntity , emit というメソッドが定義されており、それぞれでファイルを開いた時の処理、閉じたときの処理、開いたファイルから次のタプルを読み込む処理、タプルへの処理を記述しています。

readEntitiy では、バッファから1行読み込み、それを返しています。 EOF まで読み込み終わったら contorl ポートにファイル名を流し、ストリームを終了させています。戻り値として返したものが次のタプルになり、 emit メソッドで処理されます。 null を返すとストリームが終了します。

  // return empty string 
  protected String readEntity() throws IOException
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path

    return null;


WordReader は、 LineReader から送られてきた1行分のデータを単語に分割しています。

package com.example.myapexapp;

import java.util.regex.Pattern;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// extracts words from input line
public class WordReader extends BaseOperator
  // default pattern for word-separators
  private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");

  private String nonWordStr;    // configurable regex
  private transient Pattern nonWord;      // compiled regex

  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();

  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    public void process(String line)
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;

  public String getNonWordStr() {
    return nonWordStr;

  public void setNonWordStr(String regex) {
    nonWordStr = regex;

  public void setup(OperatorContext context)
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);



  • nonWordDefault: デフォルトの単語分割パターン
  • nonWordStr: 単語分割用の正規表現
  • nonWord: 単語分割パターン
  • output: アウトプットポート
  • input: インプットポート

これもほぼ説明は不要かと想いますが、インプットポートはアウトプットポート異なり、単に new するだけでなく、なにやら処理が記述されています。

  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    public void process(String line)
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;

インプットポートには、入力されたデータをどう処理するかを記述します。これは他のオペレーターも同様で、 WordReader 以降の FileWordCount までのオペレーターも同様にインプットポートごとに process メソッドが定義されています。ここでは、入力データをパターンで単語に分割して、アウトプットポートに流しているようです。


  • getNonWordStr
  • setNonWordStr
  • setup

getNonWordStrsetNonWordStr はただの getter と setter ですね。 setup はセットアップ時に呼び出され、 nonWord の設定を行っています。

  public void setup(OperatorContext context)
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);


WindowWordCount は、 WordReader から送られる単語をウィンドウ内で集計して単語と出現頻度のペアにまとめます。

package com.example.myapexapp;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
public class WindowWordCount extends BaseOperator
  private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);

  // wordMap : word => frequency
  protected Map<String, WCPair> wordMap = new HashMap<>();

  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
    public void process(String word)
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);

  // output port which emits the list of word frequencies for current window
  // fileName => list of (word, freq) pairs
  public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();

  public void endWindow()
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());



  • LOG: ロガー
  • wordMap: 単語と出現頻度のペアをマップにまとめたもの
  • input: インプットポート
  • output: アウトプットポート

wordMap 単語と頻度のペアは WCPair として WCPair.java で定義されています。

package com.example.myapexapp;

// a single (word, frequency) pair
public class WCPair {
  public String word;
  public int freq;

  public WCPair() {}

  public WCPair(String w, int f) {
    word = w;
    freq = f;

  public String toString() {
    return String.format("(%s, %d)", word, freq);

input の中身を見ていきます。 WordReader の出力は1単語なので、入力は単語がひとつになります。入力された単語を wordMap から探索し、存在していれば頻度に1を加算し、存在していなければ新たに WCPair を作成してマップに追加します。

  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
    public void process(String word)
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);


メソッドは endWindow のみです。このメソッドは、ウィンドウが終了したときに呼び出されます。マップをリストにしてアウトプットポートに流しています。ウィンドウの設定をしていないようなので、デフォルト値があると思われるのですが、デフォルト値がなんなのかよくわかりません。わかったら追記しておきます。

  public void endWindow()
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());


FileWordCount はファイル全体から単語の出現頻度を集計します。

public class FileWordCount extends BaseOperator
  private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);

  // set to true when we get an EOF control tuple
  protected boolean eof = false;

  // last component of path (i.e. only file name)
  // incoming value from control tuple
  protected String fileName;

  // wordMapFile   : {word => frequency} map, current file, all words
  protected Map<String, WCPair> wordMapFile = new HashMap<>();

  // singleton map of fileName to sorted list of (word, frequency) pairs
  protected transient Map<String, Object> resultFileFinal;

  // final sorted list of (word,frequency) pairs
  protected transient List<WCPair> fileFinalList;

  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
    public void process(List<WCPair> list)
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);

  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
    public void process(String msg)
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.

  // fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
  public final transient DefaultOutputPort<Map<String, Object>>
    fileOutput = new DefaultOutputPort<>();

  public void setup(OperatorContext context)
    // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
    resultFileFinal = new HashMap<>(1);
    fileFinalList = new ArrayList<>();

  public void endWindow()
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        resultFileFinal.put(fileName, fileFinalList);

        // reset for next file
        eof = false;
        fileName = null;
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");

      // sort list from wordMapFile into fileFinalList and emit it
      resultFileFinal.put(fileName, fileFinalList);

      // reset for next file
      eof = false;
      fileName = null;

  // populate fileFinalList with topN frequencies from argument
  // This list is suitable input to WordCountWriter which writes it to a file
  // MUST have map.size() > 0 here
  private void getList(final Map<String, WCPair> map)

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);

    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());


  • LOG: ロガー
  • eof: ファイルの読み込み終わりフラグ
  • fileName: ファイル名
  • wordMapFile: ファイル単位での単語の出現頻度
  • resultFileFinal: 集計結果を格納するマップ
  • fileFinalList: 集計結果を格納するリスト
  • input: インプットポート
  • control: インプットポート、ファイル名
  • fileOutput: アウトプットポート

input を見ていきます。 WindowWordCount とほぼ同じで、単語の出現頻度を集計してマップrに入れていきます。

  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
    public void process(List<WCPair> list)
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);

次に control です。 lineReader からファイル名が流れてくるので、それを保存して eof フラグを ON にします。

  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
    public void process(String msg)
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.


setup はマップとリストの初期化を行っているだけなので省略します。

endWindow を見ていきます。少々長いですが、やっていることは単純で、ウィンドウが終わったタイミングで eof が ON になっている(つまりファイルの読み込みが終わっている)場合、集計結果をアウトプットポートに流します。流すときに getList でソートなどの加工を行っています。

  public void endWindow()
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        resultFileFinal.put(fileName, fileFinalList);

        // reset for next file
        eof = false;
        fileName = null;
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");

      // sort list from wordMapFile into fileFinalList and emit it
      resultFileFinal.put(fileName, fileFinalList);

      // reset for next file
      eof = false;
      fileName = null;

  private void getList(final Map<String, WCPair> map)

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);

    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());


最後は WordCountWriter です。集計結果をファイルに出力するオペレーターです。こちらは今までとは異なり、 AbstractFileOutputOperator を継承しているので、少々実装が違ってきます。

public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
  private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
  private static final String charsetName = "UTF-8";
  private static final String nl = System.lineSeparator();

  private String fileName;    // current file name
  private transient final StringBuilder sb = new StringBuilder();

  public void endWindow()
    if (null != fileName) {

  // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
  // list of pairs: (word, frequency)
  protected String getFileName(Map<String, Object> tuple)
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;

  protected byte[] getBytesForTuple(Map<String, Object> tuple)
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);



  • LOG: ロガー
  • charsetName: エンコード
  • nl: 改行コード
  • fileName: ファイル名
  • sb: StringBuilder


endWindow では、 requestFinalize を呼び出してファイル処理を終了します。

  public void endWindow()
    if (null != fileName) {

getFileName には、ファイル名の取得方法を記述します。

  protected String getFileName(Map<String, Object> tuple)
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;

getBytesForTuple では、タプルからファイルに出力する内容を記述します。ここではリストを整形して一つの文字列にして返しています。

  protected byte[] getBytesForTuple(Map<String, Object> tuple)
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);


Apex ではまずアプリケーション全体をオペレーターの DAG で設計します。 DAG の構成が決まったら各オペレーターの処理を記述するだけです。他のストリーム処理エンジンに詳しくないので、比較はできませんが、分散処理をあまり意識しなくていいので書きやすそうです。また、よく使われそうな処理は Malhar にライブラリとして準備されているため、簡単に書くことができます。


