問題提起
Hadoop系でMapReduceする際は、一般的には純粋な関数にすることが推奨されます。しかし、実際のアプリケーションは色々と複雑で、外部リソースにアクセスしたくなる時が多々あります。
例えば、RDDではKeyだけもらってくるけど、実際にはDBからそのKeyのValueを取得してから処理したい場合などです。
理想を言えば、DriverNodeの処理の中で全データを一括取得できればいいのでしょうが、データ量によってはExecutorへ転送する量が多すぎて遅いとか、そもそもDriverのメモリに乗り切らないといった問題があります。
解決法
Executorの中で外部リソースにアクセスする。
まぁ当たり前ですよね。
ただ、普通のやり方ではよくありません。
- mapReduce内でコネクションを張る処理を書いてしまうと、RDDの各要素毎にコネクションを作るので遅い&外部リソースに負荷がかかる。
- DriverNodeでコネクションを作ると、それをExecutorに配布することが(同一コネクションの使い回し、シリアライズ不可能などの理由で)難しい。
そのため、ちょっと特殊なことをする必要があります。
やり方
一言で言うと、lazy valを使います。
ここでは例としてDynamoDBへのコネクションを張るやり方を紹介します。
ちなみに、AccessKeyなどの環境変数を使いたい場合は、実行時に以下のような方法でDriver, Executorに渡すことが出来ます。
spark-submit <色々> --conf spark.executorEnv.AWS_ACCESS_KEY_ID=hoge --conf spark.executorEnv.AWS_SECRET_ACCESS_KEY=hogehoge <色々>
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, PrimaryKey}
import org.apache.spark.{SparkConf, SparkContext}
object Hello {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val rdd = sc.range(1, 1000, 1)
val accessKey = sys.env("AWS_ACCESS_KEY_ID")
val secretKey = sys.env("AWS_SECRET_ACCESS_KEY")
val file = args(0)
lazy val dynamo = DynamoUtils.setupDynamoClientConnection(accessKey, secretKey)
println("----Start----")
rdd.map(v => {
val table = dynamo.getTable("sample")
val key = new PrimaryKey("id", 1)
val ss = table.getItem(key)
ss.toString
}).saveAsTextFile(file)
}
}
object DynamoUtils {
def setupDynamoClientConnection(accessKey:String, secretKey:String): DynamoDB = {
val credentials = new BasicAWSCredentials(accessKey,secretKey)
val client = new AmazonDynamoDBClient(credentials)
client.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1))
val dynamoDB = new DynamoDB(client)
val table = dynamoDB.getTable("sample")
val expressionAttributeNames = new util.HashMap[String,String]()
expressionAttributeNames.put("#p", "pageCount")
val expressionAttributeValues = new util.HashMap[String,Object]()
val num = 1.asInstanceOf[Object]
expressionAttributeValues.put(":val", num)
val outcome = table.updateItem(
"id", 1,
"set #p = #p + :val",
expressionAttributeNames,
expressionAttributeValues)
dynamoDB
}
}
lazy val dynamo = DynamoUtils.setupDynamoClientConnection(accessKey, secretKey)
となっているところがキモです。
こう書いてあげると、それぞれのExecutorの中で一度だけ実行されてコネクションが使えるようになります。
ここでは例として、コネクションを貼る際にDynamoのテーブルに対してatomic countの操作をしています。このBatchをsparkで実行してみると、該当レコードの値はExecutorの分だけ(ExecutorNodeの数だけ?)増えることが確認できます。
本筋と関係ないですが、この例では環境変数を読み取るのはDriverNode(valになってるので)で、その値が各Executorにbroadcastされているはずです。その値を使ってDynamoに繋ぎに行くのは、上で述べてるように各Executorです。(lazy valのため。)
注意点
Executorの数が増えるほどコネクションが増えてしまいます。
また、おそらく並列で同時に大量のリクエストが外部リソースに飛ぶことになります。
そのため、その辺りの負荷を気にしておく必要があります。
参照