LoginSignup
14
14

More than 5 years have passed since last update.

ApacheCamel(camel-scala)で巨大CSV(big csv)を扱う

Last updated at Posted at 2015-09-09

ざっくり

  • データ連携とかする場合ってで数百万行のデータってあるよね
  • 何も考えずにやったらオンメモリでやって直ぐにメモリパンクするよね
  • Iteratableに処理をするのはどうするの?
  • できたー
  • でもRFC4180に対応してないよね
  • ってことで、自前のsplitterを作る
  • 紆余曲折あり腑に落ちないところが残りつつ、なんとか出来た

いまさら発見(;・∀・)

まぁ大量データのことを考えれば当たり前なんだけど、てっきり勘違いした。
bodyの中はファイルの中身じゃないのね。
よくサンプルでlog(INFO, "${body}")でファイルの中をデバッグしてたから、実際のbodyにsetされているオブジェクトとしてはorg.apache.camel.component.file.GenericFileで、ファイルのポインタだけもっているイメージ。
当たり前といえば当たり前。。。

という前提知識を持って以下解説。

回答

相変わらず楽ちん。

  "file:///tmp?fileName=large.csv&noop=true" ==> {
    // "\\n"とする。これがミソ。"\n"だと分割してくれない。
    split(tokenize("\\n")).streaming {
      // 改行コードで分割された1行
      unmarshal(new org.apache.camel.model.dataformat.CsvDataFormat)
      -->("mock:streaming")
    }
  }

だが、しかし!!!

CSVってRFC4180によれば、”で括っていれば、セル内で改行も許されているよね?

1,AAA,BBB,"cc
CC",DDD,EEE
2,AAA,BBB,CC,DDD,EEE

このファイルはデータとしては2行のデータ。

custom splitter

CustomSplitterという形で自分でIteratorを返すBeanを呼び出せば良い。
今回はAppacheCommonsのCSVを使ってみることにした。
なので、sbtに以下を追加。

  "org.apache.commons" % "commons-csv" % "1.2",

CsvIteratorFactory (bean)

package jp.den3umegumi.experimental.camel.java.bean;

import org.apache.camel.Body;
import org.apache.camel.component.file.GenericFile;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;

import java.io.File;
import java.nio.charset.Charset;
import java.util.Iterator;

public class CsvIteratorFactory {
    public Iterator createIterator(@Body Object body) {
        GenericFile<File> gf = (GenericFile<File>) body;
        File f = gf.getFile();
        try {
            CSVParser csvParser = CSVParser.parse(f, Charset.forName("UTF-8"), CSVFormat.RFC4180);
            return csvParser.iterator();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

なぜJavaで書いているかというと、結局ScalaDSLではCustomSplitterは動かせなかった。
なので、試しにJavaDSLで書いてみたときにCustomSplitterをJava化ので、そのときのコードを再利用。

Route(scala)

↓のようにやれば良いはずだけど、前述のとおり、期待通りに動かなかった。
どうやったら、ScalaDSLでCustomSplitterを使えるのだろうか。
Following case.
ScalaDSL case did not run to as expected.
How do I use to custom-splitter in camel-scala. (<- for google search)

  // このimportにたどり着くのに結構時間かかる。そういうところがcamelの面倒なところ。
  import org.apache.camel.builder.ExpressionBuilder.beanExpression

  "file:///tmp?fileName= large.csv&noop=true" ==> {
    split(beanExpression(new CsvIteratorFactory, "createIterator")).streaming {
      log(LoggingLevel.DEBUG, "SCALA:${body}")
    }
    log(LoggingLevel.DEBUG, "END")
  }

why camel-scala! why!

Route(java)

でも、Javaだとできる。。。
But, JavaDSL run to as expected.

    @Override
    public void configure() throws Exception {
        from("file:///tmp?fileName= large.csv&noop=true")
                .split(beanExpression(new CsvIteratorFactory(), "createIterator"))
                .streaming()
                .log(LoggingLevel.DEBUG, "JAVA:${body}");
    }

DSL自作

beanは動かなかったけどtokenizeは出来たので、自分でDSLを作ってしまおうかと。

RouteBuilder

試行錯誤しつつ作ったDSLがtokenizeCsvcloseCsvの2つ。
tokenizeCsvでファイルを開けているので手動でcloseする必要がある。
この辺るはもっとうまくやりたい。

package jp.den3umegumi.experimental.camel.route

import jp.den3umegumi.experimental.camel.dsl.languages.RichLanguages
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.ScalaRouteBuilder
import org.apache.camel.{CamelContext, LoggingLevel}
import org.apache.camel.builder.ExpressionBuilder.beanExpression

class LargeFileRouteBuilder(context: CamelContext = new DefaultCamelContext) extends ScalaRouteBuilder(context: CamelContext) with RichLanguages {

  "file:///tmp?fileName=large.csv&noop=true" ==> {
    split(tokenizeCsv).streaming {
      log(LoggingLevel.DEBUG, "SCALA:${body}")
    }
    closeCsv(_)
    log(LoggingLevel.DEBUG, "END")
  }
}

RichLanguages (自作DSL)

package jp.den3umegumi.experimental.camel.dsl.languages

import jp.den3umegumi.experimental.camel.language.tokenizer.CsvTokenizerLanguage
import org.apache.camel.Exchange
import org.apache.camel.scala.dsl.languages.{LanguageFunction, Languages}
import org.apache.commons.csv.CSVParser

trait RichLanguages extends Languages {
  def tokenizeCsv(exchange: Exchange) = RichLanguages.evaluate(exchange)("csv_tokenizer")

  def closeCsv(exchange: Exchange) = {
    val parser = exchange.getIn.getHeader("CSV_FILE_PARSER", classOf[CSVParser])
    parser.close
  }
}

object RichLanguages {
  def evaluate(exchange: Exchange)(expression: String): Any = {
    val language = expression match {
      case "csv_tokenizer" => new CsvTokenizerLanguage
      case _ => throw new IllegalArgumentException(s"unkown expression.[${expression}]")
    }
    new LanguageFunction(language, "")
  }
}
package jp.den3umegumi.experimental.camel.language.tokenizer;

import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.IsSingleton;
import org.apache.camel.Predicate;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.spi.Language;
import org.apache.camel.support.ExpressionAdapter;
import org.apache.camel.util.ExpressionToPredicateAdapter;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;

public class CsvTokenizerLanguage implements Language, IsSingleton {

    public Expression createExpression() {
        return new ExpressionAdapter() {
            public Object evaluate(Exchange exchange) {
                GenericFile<File> gf = exchange.getIn().getBody(GenericFile.class);
                File f = gf.getFile();
                CSVParser csvParser = null;
                try {
                    csvParser = CSVParser.parse(f, Charset.forName("UTF-8"), CSVFormat.RFC4180);
                } catch (IOException e) {
                }
                exchange.getIn().setHeader("CSV_FILE_PARSER", csvParser);
                return csvParser.iterator();
            }

            @Override
            public String toString() {
                return "csvTokenizer";
            }
        };
    }

    public Predicate createPredicate(String expression) {
        return ExpressionToPredicateAdapter.toPredicate(createExpression(expression));
    }

    @Override
    public Expression createExpression(String expression) {
        return createExpression();
    }

    public boolean isSingleton() {
        return false;
    }
}

今回は、まだコードの整理が出来ていないので、gitにpushしてない。

14
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
14