FileOutputCommitterの新アルゴリズムについて (MAPREDUCE-5485)

  • 4
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

Hadoop 2.7.3/2.8.0 (どちらもまだリリースされていない) 以降では、mapreduce.fileoutputcommitter.algorithm.versionを指定することで、OutputCommitterのアルゴリズムを変更できる。設定可能な値は1(Hadoop 2系のデフォルト)と、2(Hadoop 3系のデフォルト)のみ。以下で、詳細について解説する。

新アルゴリズムの解説

FileOutputCommitter#isCommitJobRepeatable
  @Override
  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
    return algorithmVersion == 2;
  }

2だと、commitJobがrepeatableである。なんじゃそりゃ。本機能が導入されたチケット(MAPREDUCE-5485)を読もう。

There are chances MRAppMaster crush during job committing, or NodeManager restart cause the committing AM exit due to container expire. In these cases, the job will fail. However, some jobs can redo commit so failing the job becomes unnecessary. Let clients tell AM to allow redo commit or not is a better choice.

うーん。そもそもcommitJobって何だろう。

FileOutputCommitter#commitJob
  /**
   * The job has completed, so do works in commitJobInternal().
   * Could retry on failure if using algorithm 2.
   * @param context the job's context
   */
  public void commitJob(JobContext context) throws IOException {
FileOutputCommitter#commitJobInternal
  /**
   * The job has completed, so do following commit job, include:
   * Move all committed tasks to the final output dir (algorithm 1 only).
   * Delete the temporary directory, including all of the work directories.
   * Create a _SUCCESS file to make it as successful.
   * @param context the job's context
   */
  @VisibleForTesting
  protected void commitJobInternal(JobContext context) throws IOException {

MapReduceジョブでは、タスクの出力結果を一時領域に書き込み、タスクが完了すると、commitJobInternalを実行して、最終的な出力先に移動する。

ここまで読んでおわかりいただけただろうか。つまり、commitJobの途中でMRAppMasterがクラッシュした場合、MapReduceジョブ自体を最初からやり直すのではなく、commitJobの処理だけリトライすればよいので、それを可能にしようというのがOutputCommitterの新アルゴリズムの概要である。

FileOutputCommitter#commitJob
  public void commitJob(JobContext context) throws IOException {
    int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
        context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
            FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
    int attempt = 0;
    boolean jobCommitNotFinished = true;
    while (jobCommitNotFinished) {
      try {
        commitJobInternal(context);
        jobCommitNotFinished = false;
      } catch (Exception e) {
        if (++attempt >= maxAttemptsOnFailure) {
          throw e;
        } else {
          LOG.warn("Exception get thrown in job commit, retry (" + attempt +
              ") time.", e);
        }
      }
    }
  }

新アルゴリズムを採用すると、maxAttemptsOnFailureに、プロパティmapreduce.fileoutputcommitter.failures.attemptsで指定した値が入る。ここに2以上を設定することで、commitJobInternalメソッドが失敗しても、リトライされる。

もっと詳しく知りたい人は、ソースコードを読んでください。FileOutputCommitter.java