8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkのExecutorから外部リソースにアクセスする方法

Last updated at Posted at 2016-03-29

問題提起

Hadoop系でMapReduceする際は、一般的には純粋な関数にすることが推奨されます。しかし、実際のアプリケーションは色々と複雑で、外部リソースにアクセスしたくなる時が多々あります。
例えば、RDDではKeyだけもらってくるけど、実際にはDBからそのKeyのValueを取得してから処理したい場合などです。
理想を言えば、DriverNodeの処理の中で全データを一括取得できればいいのでしょうが、データ量によってはExecutorへ転送する量が多すぎて遅いとか、そもそもDriverのメモリに乗り切らないといった問題があります。

解決法

Executorの中で外部リソースにアクセスする。

まぁ当たり前ですよね。

ただ、普通のやり方ではよくありません。

  • mapReduce内でコネクションを張る処理を書いてしまうと、RDDの各要素毎にコネクションを作るので遅い&外部リソースに負荷がかかる。
  • DriverNodeでコネクションを作ると、それをExecutorに配布することが(同一コネクションの使い回し、シリアライズ不可能などの理由で)難しい。

そのため、ちょっと特殊なことをする必要があります。

やり方

一言で言うと、lazy valを使います。
ここでは例としてDynamoDBへのコネクションを張るやり方を紹介します。

ちなみに、AccessKeyなどの環境変数を使いたい場合は、実行時に以下のような方法でDriver, Executorに渡すことが出来ます。

spark-submit <色々> --conf spark.executorEnv.AWS_ACCESS_KEY_ID=hoge --conf spark.executorEnv.AWS_SECRET_ACCESS_KEY=hogehoge <色々>

Githubで見たい方はこちら。
https://github.com/uryyyyyyy/hadoopSample/blob/master/spark/scala_2.11/batch_dynamo/src/main/scala/com/github/uryyyyyyy/hadoop/spark/batch/scala2_11/Hello.scala

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, PrimaryKey}
import org.apache.spark.{SparkConf, SparkContext}

object Hello {
	def main(args: Array[String]): Unit = {
		val conf = new SparkConf().setAppName("Simple Application")
		val sc = new SparkContext(conf)
		val rdd = sc.range(1, 1000, 1)
		val accessKey = sys.env("AWS_ACCESS_KEY_ID")
		val secretKey = sys.env("AWS_SECRET_ACCESS_KEY")
		val file = args(0)
		lazy val dynamo = DynamoUtils.setupDynamoClientConnection(accessKey, secretKey)

		println("----Start----")
		rdd.map(v => {
			val table = dynamo.getTable("sample")

			val key = new PrimaryKey("id", 1)
			val ss = table.getItem(key)
			ss.toString
		}).saveAsTextFile(file)

	}
}

object DynamoUtils {

	def setupDynamoClientConnection(accessKey:String, secretKey:String): DynamoDB = {
		val credentials = new BasicAWSCredentials(accessKey,secretKey)
		val client = new AmazonDynamoDBClient(credentials)
		client.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1))
		val dynamoDB = new DynamoDB(client)

		val table = dynamoDB.getTable("sample")

		val expressionAttributeNames = new util.HashMap[String,String]()
		expressionAttributeNames.put("#p", "pageCount")

		val expressionAttributeValues = new util.HashMap[String,Object]()
		val num = 1.asInstanceOf[Object]
		expressionAttributeValues.put(":val", num)

		val outcome = table.updateItem(
			"id", 1,
			"set #p = #p + :val",
			expressionAttributeNames,
			expressionAttributeValues)
		dynamoDB
	}
}

lazy val dynamo = DynamoUtils.setupDynamoClientConnection(accessKey, secretKey) となっているところがキモです。

こう書いてあげると、それぞれのExecutorの中で一度だけ実行されてコネクションが使えるようになります。
ここでは例として、コネクションを貼る際にDynamoのテーブルに対してatomic countの操作をしています。このBatchをsparkで実行してみると、該当レコードの値はExecutorの分だけ(ExecutorNodeの数だけ?)増えることが確認できます。

本筋と関係ないですが、この例では環境変数を読み取るのはDriverNode(valになってるので)で、その値が各Executorにbroadcastされているはずです。その値を使ってDynamoに繋ぎに行くのは、上で述べてるように各Executorです。(lazy valのため。)

注意点

Executorの数が増えるほどコネクションが増えてしまいます。
また、おそらく並列で同時に大量のリクエストが外部リソースに飛ぶことになります。
そのため、その辺りの負荷を気にしておく必要があります。

参照

8
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?