Spark streaming で取得するファイルの名前をデバッグ用と思われるメソッドを使用して、やや強引に?取得してみます。
StreamingContext の生成
StreamingGetFileName.java
// create StreamingContext
SparkConf conf = new SparkConf().setAppName("StreamingGetFileName");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
とりあえず、対象ディレクトリをチェックする間隔を 10 秒にしています。
JavaDStream(RDDのまとまり) の取得
StreamingGetFileName.java
// create DStream from text file
String logDir = "/tmp/logs";
JavaDStream<String> logData = jssc.textFileStream(logDir);
ストリーミングで取得するファイルが格納されるディレクトリを指定します。
ファイル名の取得
StreamingGetFileName.java
// try to get file name
logData.foreachRDD(rdd -> {
Pattern p = Pattern.compile("file:.*?\\s+");
Matcher m = p.matcher(rdd.toDebugString());
if (m.find()){
System.out.println(m.group());
}
return null;
});
toDebugString() は↓みたいな文字列を返してくれるので、適当に必要な文字列だけを取り出しています。
file:/tmp/logs/20170707000324 NewHadoopRDD[0] at textFileStream at StreamingGetFileName.java:24
結果
こんな感じで取得することが出来ました。
result
file:/tmp/logs/20170710235536
file:/tmp/logs/20170710235546
file:/tmp/logs/20170710235556
file:/tmp/logs/20170710235606
file:/tmp/logs/20170710235616