データソースはいろいろありながらも、取り急ぎファイルのままストレージに入れておくのがハードルは低そう。Sparkのようなビッグデータ前提であればログファイルなどが想定されることも多いのだろうけど、一般的なビジネスデータは避けて通れないのでまずはCSV、ということでCSV周りを少し試してみたときのメモ。( IBM Data Scientist Experience , Python 2 with Spark 2.0 にて)
準備
CSVファイルをSwiftベースのIBM Object Storageへアップロードしておく。画面右の"Drop you file here or browse your files to add a new file "を使用。スクリーンショットのようにbaseball.csv , cars.csv , whiskey.csvをアップロードした。
Object Storage からデータを読み込んでみた
こちらコンフィグレーションをセットする処理と、CSVデータをDataFrameにロードする処理がある。
(1) SparkからHadoopのコンフィグレーションをセット
Accessing OpenStack Swift from Sparkに記載された構成パラメータが、IBM Object Storage にアップロードしたファイルに合わせてセットされている模様。
# @hidden_cell
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# This function accesses a file in your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share (name):
def set_hadoop_config_with_credentials_xxxxxxxxxxxxxxxxxxcxxxxxcc(name):
"""This function sets the Hadoop configuration so it is possible to
access data from Bluemix Object Storage V3 using Spark"""
prefix = 'fs.swift.service.' + name
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
hconf.set(prefix + '.tenant', '4dbbbca3a8ec42eea9349120fb91dcf9')
hconf.set(prefix + '.username', 'xxxxxxxxxcxxxxxxcccxxxxxccc')
hconf.set(prefix + '.password', 'Xxxxxxxxxxxxxxxxx')
hconf.setInt(prefix + '.http.port', 8080)
hconf.set(prefix + '.region', 'dallas')
hconf.setBoolean(prefix + '.public', True)
# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_Xxxxxxxxxxxxxxxxxxcxxxxxc(name)
(2) Spark DataFrameへCSVデータをロード
df_data_1=sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true',inferschema='true')\
.load("swift://PredictiveAnalyticsProject2." + name + "/whiskey.csv")
これはSpark 2.0以前は必要だったspark-csvというパッケージを利用しているもののようです.
spark2.0からはSpark DataFrameが直接csvを扱えるようで、read.csv()も使えて便利になっている。
Object Storage へデータを書き出してみた
whisky.csvを読み込んだSpark DataFrameの内容を, whiskey_new.csvという名前でObject Storage に書き出した。
(1) Spark DataFrameのwriteで出力してみた
シンプル にwrite.csv()と書けばOK.
modeはこちらにあるようなSave Modeが利用できるようです。
出力されるファイルを見ると、以下のスクリーンショットのように一つのCSVではなく、複数のファイルに分割されて出力されていました。複数のノードで処理しているからなのだと思います。(不思議なもので?、これをSparkから再度read.csvで読むときは一つのtextFile, csvとして扱われるから問題ないみたいです)。
でも一つのファイルで出したいと考える人も私以外にもいるようで、一つのファイルに出力できないか問い合わせているQ&Aもありました(http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv)
(2) Sparkを利用せず、REST APIを試してみた
大量データを扱う場合には向いていないと思いますが、以下のようなコードでObject Storage上に一つのCSVとしてputできます。(DSXがPandas DataFrame作成用にインサートするコードのget部分をputに変更しているだけです。1回目のAPIコール/POSTをして認証情報をもらいつつ、レスポンスからアップロードするためのput先/url2を組み立てているようです)
def put_object_storage_file_with_credentials_xxxxxxxxxx(container, filename, indata):
url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
data = {'auth': {'identity': {'methods': ['password'],
'password': {'user': {'name': 'member_1825cd3bc875420fc629ccfd22c22e20433a7ac9','domain': {'id': '07e33cca1abe47d293b86de49f1aa8bc'},
'password': 'xxxxxxxxxx'}}}}}
headers1 = {'Content-Type': 'application/json'}
resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
resp1_body = resp1.json()
for e1 in resp1_body['token']['catalog']:
if(e1['type']=='object-store'):
for e2 in e1['endpoints']:
if(e2['interface']=='public'and e2['region']=='dallas'):
url2 = ''.join([e2['url'],'/', container, '/', filename])
s_subject_token = resp1.headers['x-subject-token']
headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
resp2 = requests.put(url=url2, headers=headers2 , data=indata)
print resp2
return (resp2.content)
put_object_storage_file_with_credentials_xxxxxxxxxx( 'PredictiveAnalyticsProject2', 'whiskey_new_s.csv' , df_data_2.to_csv( index = False ) )
追記
上記のREST APIでPutするイメージとして Pythonのswiftclientを利用する方法もあるようですが、IBMのData Scientist Experience環境では使えないようでした。
Using IBM Object Storage in Bluemix, with Python