ざっくり
- データ連携とかする場合ってで数百万行のデータってあるよね
- 何も考えずにやったらオンメモリでやって直ぐにメモリパンクするよね
- Iteratableに処理をするのはどうするの?
- できたー
- でもRFC4180に対応してないよね
- ってことで、自前のsplitterを作る
- 紆余曲折あり腑に落ちないところが残りつつ、なんとか出来た
いまさら発見(;・∀・)
まぁ大量データのことを考えれば当たり前なんだけど、てっきり勘違いした。
bodyの中はファイルの中身じゃないのね。
よくサンプルでlog(INFO, "${body}")
でファイルの中をデバッグしてたから、実際のbodyにsetされているオブジェクトとしてはorg.apache.camel.component.file.GenericFile
で、ファイルのポインタだけもっているイメージ。
当たり前といえば当たり前。。。
という前提知識を持って以下解説。
回答
相変わらず楽ちん。
"file:///tmp?fileName=large.csv&noop=true" ==> {
// "\\n"とする。これがミソ。"\n"だと分割してくれない。
split(tokenize("\\n")).streaming {
// 改行コードで分割された1行
unmarshal(new org.apache.camel.model.dataformat.CsvDataFormat)
-->("mock:streaming")
}
}
だが、しかし!!!
CSVってRFC4180によれば、”で括っていれば、セル内で改行も許されているよね?
1,AAA,BBB,"cc
CC",DDD,EEE
2,AAA,BBB,CC,DDD,EEE
このファイルはデータとしては2行のデータ。
custom splitter
CustomSplitterという形で自分でIteratorを返すBeanを呼び出せば良い。
今回はAppacheCommonsのCSVを使ってみることにした。
なので、sbtに以下を追加。
"org.apache.commons" % "commons-csv" % "1.2",
CsvIteratorFactory (bean)
package jp.den3umegumi.experimental.camel.java.bean;
import org.apache.camel.Body;
import org.apache.camel.component.file.GenericFile;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import java.io.File;
import java.nio.charset.Charset;
import java.util.Iterator;
public class CsvIteratorFactory {
public Iterator createIterator(@Body Object body) {
GenericFile<File> gf = (GenericFile<File>) body;
File f = gf.getFile();
try {
CSVParser csvParser = CSVParser.parse(f, Charset.forName("UTF-8"), CSVFormat.RFC4180);
return csvParser.iterator();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
なぜJavaで書いているかというと、結局ScalaDSLではCustomSplitterは動かせなかった。
なので、試しにJavaDSLで書いてみたときにCustomSplitterをJava化ので、そのときのコードを再利用。
Route(scala)
↓のようにやれば良いはずだけど、前述のとおり、期待通りに動かなかった。
どうやったら、ScalaDSLでCustomSplitterを使えるのだろうか。
Following case.
ScalaDSL case did not run to as expected.
How do I use to custom-splitter in camel-scala. (<- for google search)
// このimportにたどり着くのに結構時間かかる。そういうところがcamelの面倒なところ。
import org.apache.camel.builder.ExpressionBuilder.beanExpression
"file:///tmp?fileName= large.csv&noop=true" ==> {
split(beanExpression(new CsvIteratorFactory, "createIterator")).streaming {
log(LoggingLevel.DEBUG, "SCALA:${body}")
}
log(LoggingLevel.DEBUG, "END")
}
why camel-scala! why!
Route(java)
でも、Javaだとできる。。。
But, JavaDSL run to as expected.
@Override
public void configure() throws Exception {
from("file:///tmp?fileName= large.csv&noop=true")
.split(beanExpression(new CsvIteratorFactory(), "createIterator"))
.streaming()
.log(LoggingLevel.DEBUG, "JAVA:${body}");
}
DSL自作
beanは動かなかったけどtokenizeは出来たので、自分でDSLを作ってしまおうかと。
RouteBuilder
試行錯誤しつつ作ったDSLがtokenizeCsv
とcloseCsv
の2つ。
tokenizeCsvでファイルを開けているので手動でcloseする必要がある。
この辺るはもっとうまくやりたい。
package jp.den3umegumi.experimental.camel.route
import jp.den3umegumi.experimental.camel.dsl.languages.RichLanguages
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.scala.dsl.builder.ScalaRouteBuilder
import org.apache.camel.{CamelContext, LoggingLevel}
import org.apache.camel.builder.ExpressionBuilder.beanExpression
class LargeFileRouteBuilder(context: CamelContext = new DefaultCamelContext) extends ScalaRouteBuilder(context: CamelContext) with RichLanguages {
"file:///tmp?fileName=large.csv&noop=true" ==> {
split(tokenizeCsv).streaming {
log(LoggingLevel.DEBUG, "SCALA:${body}")
}
closeCsv(_)
log(LoggingLevel.DEBUG, "END")
}
}
RichLanguages (自作DSL)
package jp.den3umegumi.experimental.camel.dsl.languages
import jp.den3umegumi.experimental.camel.language.tokenizer.CsvTokenizerLanguage
import org.apache.camel.Exchange
import org.apache.camel.scala.dsl.languages.{LanguageFunction, Languages}
import org.apache.commons.csv.CSVParser
trait RichLanguages extends Languages {
def tokenizeCsv(exchange: Exchange) = RichLanguages.evaluate(exchange)("csv_tokenizer")
def closeCsv(exchange: Exchange) = {
val parser = exchange.getIn.getHeader("CSV_FILE_PARSER", classOf[CSVParser])
parser.close
}
}
object RichLanguages {
def evaluate(exchange: Exchange)(expression: String): Any = {
val language = expression match {
case "csv_tokenizer" => new CsvTokenizerLanguage
case _ => throw new IllegalArgumentException(s"unkown expression.[${expression}]")
}
new LanguageFunction(language, "")
}
}
package jp.den3umegumi.experimental.camel.language.tokenizer;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.IsSingleton;
import org.apache.camel.Predicate;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.spi.Language;
import org.apache.camel.support.ExpressionAdapter;
import org.apache.camel.util.ExpressionToPredicateAdapter;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
public class CsvTokenizerLanguage implements Language, IsSingleton {
public Expression createExpression() {
return new ExpressionAdapter() {
public Object evaluate(Exchange exchange) {
GenericFile<File> gf = exchange.getIn().getBody(GenericFile.class);
File f = gf.getFile();
CSVParser csvParser = null;
try {
csvParser = CSVParser.parse(f, Charset.forName("UTF-8"), CSVFormat.RFC4180);
} catch (IOException e) {
}
exchange.getIn().setHeader("CSV_FILE_PARSER", csvParser);
return csvParser.iterator();
}
@Override
public String toString() {
return "csvTokenizer";
}
};
}
public Predicate createPredicate(String expression) {
return ExpressionToPredicateAdapter.toPredicate(createExpression(expression));
}
@Override
public Expression createExpression(String expression) {
return createExpression();
}
public boolean isSingleton() {
return false;
}
}
今回は、まだコードの整理が出来ていないので、gitにpushしてない。