Sparkは、デフォルトではS3のスキーマ(s3, s3n, s3a)の設定が入ってなく、クラスパスも入ってないようなので、自分で設定する必要がある。
(ただし、EMR上や既存のHadoopクラスタで動作させるときは、既に以下の設定がcore-site.xmlなどに書かれていると思うので、不要です。これはあくまでhadoopの設定をしてないローカルでの動作を想定しています。)
ローカルではhadoop-awsがクラスパスに入ってなかったので、別途追加してあげる必要がある。
import org.apache.spark.{SparkConf, SparkContext}
object Hello {
def main(args: Array[String]): Unit = {
val fromFile = args(0) // ex. <bucketName>/<filePath>
val toFile = args(1) // ex. <bucketName>/<filePath>
val conf = new SparkConf().setAppName("local")
val sc = new SparkContext(conf)
val accessKey = sys.env("AWS_ACCESS_KEY_ID")
val secretKey = sys.env("AWS_SECRET_ACCESS_KEY")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secretKey)
sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
//s3スキーマは、hadoopで使われてるものとEMRで使われてるもので挙動が違うっぽいので
//ローカルでの使用はオススメしない。
//val logDataS3 = sc.textFile(s"s3://$fromFile")
//logDataS3.saveAsTextFile(s"s3://${toFile}_s3")
val logDataS3n = sc.textFile(s"s3n://$fromFile")
logDataS3n.saveAsTextFile(s"s3n://${toFile}_s3n")
val logDataS3a = sc.textFile(s"s3a://$fromFile")
logDataS3a.saveAsTextFile(s"s3a://${toFile}_s3a")
}
}
実行
./bin/spark-submit --class com.sample.Main --master local \
--packages org.apache.hadoop:hadoop-aws:2.7.1 \
yourSparkApp-assembly-1.0.jar \
<bucket>/input/path <bucket>/output/path
--packages org.apache.hadoop:hadoop-aws:2.7.1
を指定しているところが大事。
これがないとClassNotFoundになる。
(EMR上では最初からクラスパスに入ってるはずなので不要)
備考
・ローカルでs3nで試すと、S3にゴミファイルができてしまった。
(おそらく、Hadoopがメタな情報として置いていたのが消されずに残ってしまった。)
そもそもs3nは古いスキーマっぽく、パフォーマンスなどの面でs3aの方が良さそうなので、問題なければそちらを使うとよさそう。
・コード中のコメントに書いたが、s3スキーマはローカルで使用するのはオススメしない。