14
19

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ローカルのSparkでS3のファイルを扱う

Last updated at Posted at 2016-05-16

Sparkは、デフォルトではS3のスキーマ(s3, s3n, s3a)の設定が入ってなく、クラスパスも入ってないようなので、自分で設定する必要がある。

(ただし、EMR上や既存のHadoopクラスタで動作させるときは、既に以下の設定がcore-site.xmlなどに書かれていると思うので、不要です。これはあくまでhadoopの設定をしてないローカルでの動作を想定しています。)

ローカルではhadoop-awsがクラスパスに入ってなかったので、別途追加してあげる必要がある。

import org.apache.spark.{SparkConf, SparkContext}

object Hello {
  def main(args: Array[String]): Unit = {

    val fromFile = args(0) // ex. <bucketName>/<filePath>
    val toFile = args(1) // ex. <bucketName>/<filePath>
    val conf = new SparkConf().setAppName("local")
    val sc = new SparkContext(conf)

    val accessKey = sys.env("AWS_ACCESS_KEY_ID")
    val secretKey = sys.env("AWS_SECRET_ACCESS_KEY")

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
    sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
    sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
    sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
    sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secretKey)
    sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")

    //s3スキーマは、hadoopで使われてるものとEMRで使われてるもので挙動が違うっぽいので
    //ローカルでの使用はオススメしない。
    //val logDataS3 = sc.textFile(s"s3://$fromFile")
    //logDataS3.saveAsTextFile(s"s3://${toFile}_s3")

    val logDataS3n = sc.textFile(s"s3n://$fromFile")
    logDataS3n.saveAsTextFile(s"s3n://${toFile}_s3n")

    val logDataS3a = sc.textFile(s"s3a://$fromFile")
    logDataS3a.saveAsTextFile(s"s3a://${toFile}_s3a")
  }
}

実行

./bin/spark-submit --class com.sample.Main --master local \
--packages org.apache.hadoop:hadoop-aws:2.7.1 \
yourSparkApp-assembly-1.0.jar \
<bucket>/input/path <bucket>/output/path

--packages org.apache.hadoop:hadoop-aws:2.7.1 を指定しているところが大事。
これがないとClassNotFoundになる。

(EMR上では最初からクラスパスに入ってるはずなので不要)

備考

・ローカルでs3nで試すと、S3にゴミファイルができてしまった。
(おそらく、Hadoopがメタな情報として置いていたのが消されずに残ってしまった。)
そもそもs3nは古いスキーマっぽく、パフォーマンスなどの面でs3aの方が良さそうなので、問題なければそちらを使うとよさそう。

・コード中のコメントに書いたが、s3スキーマはローカルで使用するのはオススメしない。

参考:
https://wiki.apache.org/hadoop/AmazonS3

14
19
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
19

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?