前回作成したClusterへデータを転送して、Sparkから参照します。
ローカル(Mac)からのwebhdfsを使ったhdfsへのファイル転送やります。
##ローカル(Mac)からファイル転送
webhdfsはRESTのインタフェースを持っています。
前回紹介したようにBluemix上で払い出したHadoopのClusterには
ディフォルトでwebhdfsのURLが払い出されますのでそれを使います。
###hdfsの構成
Bluemix上のHadoopではhdfsのrootディレクトリ下に下図の構成がディフォルトで用意されます。
また、userディレクトリしたにプロビジョニング時のユーザーディレクトリが用意されますので、以後そちらのディレクトリ(/user/tanayu)にファイルを置いていきます。
###webhdfsを使ったディレクトリの参照
ここではcurlを使ってwebhdfsのRESTインタフェースを操作します。
ベースとなるURL:https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/
は共通で、その後ろにrootディレクトリからのpathを書きます。
最後にopでオペレーションを指定しています。ここではLISTSTATUSを使ってディレクトリの情報を取得しています。
以下の例では/userのディレクトリの情報を参照します。
ユーザー名、パスワードはご自身の環境のものに置き換えてください。
curl -i -k -s --user <user name>:<password> --max-time 45 https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user?op=LISTSTATUS
コマンドを実行するとHTTP Statusと/userのディレクトリ情報がJson形式で
出力されます。
かなり見づらいですが、戻される情報は基本的にhdfs dfs -lsコマンドと差はありません。
###webhdfsを使ったファイルの転送
次にローカルにあるファイルをwebhdfsを利用してBluemix上のClusterに配置します。
以下の例では/user/tanayuディレクトリの下にad_sample_log.csvファイルを作成しています。opはCREATEを利用します。
curl -i -L -k -s --user <user name>:<password> --max-time 45 -X PUT -T ad_sample_log.csv https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user/tanayu/ad_sample_log.csv?op=CREATE
下図は実行時のコンソールです。
ファイルが転送されたかの確認をLISTSTATUSで行います。
curl -i -k -s --user <user name>:<password> --max-time 45 https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user/tanayu?op=LISTSTATUS
以下はコマンドの戻り値です。
正常に作成されている事がわかります。
###webhdfsを使ったファイルの削除
最後にwebhdfsを使ったファイルの削除処理です。
先ほど転送したad_sample_log.csvファイルを削除します。
削除はop=DELETEでおこないます。
curl -i -L -k -s --user <user name>:<password> --max-time 45 -X DELETE https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user/tanayu/ad_sample_log.csv?op=DELETE
下図はコマンド実行時のコンソールです。
ほぼwebhdfsの機能をそのまま使えそう。
##Sparkのプログラム作成
今回はhdfs上に配置したをSparkから参照して行数を表示するだけの簡単なプログラムです。
package org.tanayu.qiita
import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkConf, SparkContext}
object sample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("qiita_sample")
val sc = new SparkContext(conf)
run(sc)
sc.stop
}
private[qiita]
def run(sc: SparkContext){
val txt = sc.textFile("hdfs://user/tanayu/ad_sample_log.csv")
println("count :" + txt.count());
}
}
上記をbuildした.jarをscpで転送します。
以下はコマンドの例です。
scp target/scala-2.11/bioc_sample-assembly-0.0.1.jar <user name>@bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:
転送が終わるとhome_dirにファイルが転送されます。
以下は転送したjarの実行コマンドです。
ディフォルトだとINFOのログがいっぱい出てウザいので
log4jのレベルをWARNに設定しています。
spark-submit --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:./log4j.properties --class org.tanayu.qiita.sample --master yarn-client bioc_sample-assembly-0.0.1.jar
Cloud上のHadoopのconfigurationは直接触れないので、
/etc/hadoop/conf/log4j.propertiesをhome_dirにコピーし中身を変更して
spark-submit時に渡しています。ちなみに書き換える部分は下記です。
hadoop.root.logger=WARN,console