Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
19
Help us understand the problem. What is going on with this article?
@uryyyyyyy

ローカルのSparkでS3のファイルを扱う

More than 3 years have passed since last update.

Sparkは、デフォルトではS3のスキーマ(s3, s3n, s3a)の設定が入ってなく、クラスパスも入ってないようなので、自分で設定する必要がある。

(ただし、EMR上や既存のHadoopクラスタで動作させるときは、既に以下の設定がcore-site.xmlなどに書かれていると思うので、不要です。これはあくまでhadoopの設定をしてないローカルでの動作を想定しています。)

ローカルではhadoop-awsがクラスパスに入ってなかったので、別途追加してあげる必要がある。

import org.apache.spark.{SparkConf, SparkContext}

object Hello {
  def main(args: Array[String]): Unit = {

    val fromFile = args(0) // ex. <bucketName>/<filePath>
    val toFile = args(1) // ex. <bucketName>/<filePath>
    val conf = new SparkConf().setAppName("local")
    val sc = new SparkContext(conf)

    val accessKey = sys.env("AWS_ACCESS_KEY_ID")
    val secretKey = sys.env("AWS_SECRET_ACCESS_KEY")

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
    sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", accessKey)
    sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", secretKey)
    sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
    sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secretKey)
    sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")

    //s3スキーマは、hadoopで使われてるものとEMRで使われてるもので挙動が違うっぽいので
    //ローカルでの使用はオススメしない。
    //val logDataS3 = sc.textFile(s"s3://$fromFile")
    //logDataS3.saveAsTextFile(s"s3://${toFile}_s3")

    val logDataS3n = sc.textFile(s"s3n://$fromFile")
    logDataS3n.saveAsTextFile(s"s3n://${toFile}_s3n")

    val logDataS3a = sc.textFile(s"s3a://$fromFile")
    logDataS3a.saveAsTextFile(s"s3a://${toFile}_s3a")
  }
}

実行

./bin/spark-submit --class com.sample.Main --master local \
--packages org.apache.hadoop:hadoop-aws:2.7.1 \
yourSparkApp-assembly-1.0.jar \
<bucket>/input/path <bucket>/output/path

--packages org.apache.hadoop:hadoop-aws:2.7.1 を指定しているところが大事。
これがないとClassNotFoundになる。

(EMR上では最初からクラスパスに入ってるはずなので不要)

備考

・ローカルでs3nで試すと、S3にゴミファイルができてしまった。
(おそらく、Hadoopがメタな情報として置いていたのが消されずに残ってしまった。)
そもそもs3nは古いスキーマっぽく、パフォーマンスなどの面でs3aの方が良さそうなので、問題なければそちらを使うとよさそう。

・コード中のコメントに書いたが、s3スキーマはローカルで使用するのはオススメしない。

参考:
https://wiki.apache.org/hadoop/AmazonS3

19
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
anymindgroup
エンタメテック・マーケテック・D2C・HRテックを展開するグローバルIT企業

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
19
Help us understand the problem. What is going on with this article?