4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkからOpenStack SwiftベースのIBM Object Storageに接続してみたメモ

Last updated at Posted at 2016-11-23

データソースはいろいろありながらも、取り急ぎファイルのままストレージに入れておくのがハードルは低そう。Sparkのようなビッグデータ前提であればログファイルなどが想定されることも多いのだろうけど、一般的なビジネスデータは避けて通れないのでまずはCSV、ということでCSV周りを少し試してみたときのメモ。( IBM Data Scientist Experience , Python 2 with Spark 2.0 にて)

準備

CSVファイルをSwiftベースのIBM Object Storageへアップロードしておく。画面右の"Drop you file here or browse your files to add a new file "を使用。スクリーンショットのようにbaseball.csv , cars.csv , whiskey.csvをアップロードした。
Screen Shot 2016-11-23 at 21.22.24.png

Object Storage からデータを読み込んでみた

まずはDSXコードからインサートされるコードで試してみた
Screen Shot 2016-11-23 at 21.20.47.png

こちらコンフィグレーションをセットする処理と、CSVデータをDataFrameにロードする処理がある。
(1) SparkからHadoopのコンフィグレーションをセット
Accessing OpenStack Swift from Sparkに記載された構成パラメータが、IBM Object Storage にアップロードしたファイルに合わせてセットされている模様。

test.py
# @hidden_cell
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# This function accesses a file in your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share (name):
def set_hadoop_config_with_credentials_xxxxxxxxxxxxxxxxxxcxxxxxcc(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage V3 using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '4dbbbca3a8ec42eea9349120fb91dcf9')
    hconf.set(prefix + '.username', 'xxxxxxxxxcxxxxxxcccxxxxxccc')
    hconf.set(prefix + '.password', 'Xxxxxxxxxxxxxxxxx')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', True)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_Xxxxxxxxxxxxxxxxxxcxxxxxc(name)

(2) Spark DataFrameへCSVデータをロード

test.py
df_data_1=sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='true',inferschema='true')\
    .load("swift://PredictiveAnalyticsProject2." + name + "/whiskey.csv")

これはSpark 2.0以前は必要だったspark-csvというパッケージを利用しているもののようです.
spark2.0からはSpark DataFrameが直接csvを扱えるようで、read.csv()も使えて便利になっている。
Screen Shot 2016-11-23 at 21.38.46.png

Object Storage へデータを書き出してみた

whisky.csvを読み込んだSpark DataFrameの内容を, whiskey_new.csvという名前でObject Storage に書き出した。
(1) Spark DataFrameのwriteで出力してみた
シンプル にwrite.csv()と書けばOK.
Screen Shot 2016-11-23 at 21.50.24.png

modeはこちらにあるようなSave Modeが利用できるようです。
出力されるファイルを見ると、以下のスクリーンショットのように一つのCSVではなく、複数のファイルに分割されて出力されていました。複数のノードで処理しているからなのだと思います。(不思議なもので?、これをSparkから再度read.csvで読むときは一つのtextFile, csvとして扱われるから問題ないみたいです)。
Screen Shot 2016-11-23 at 21.49.21.png
でも一つのファイルで出したいと考える人も私以外にもいるようで、一つのファイルに出力できないか問い合わせているQ&Aもありました(http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv)

(2) Sparkを利用せず、REST APIを試してみた
大量データを扱う場合には向いていないと思いますが、以下のようなコードでObject Storage上に一つのCSVとしてputできます。(DSXがPandas DataFrame作成用にインサートするコードのget部分をputに変更しているだけです。1回目のAPIコール/POSTをして認証情報をもらいつつ、レスポンスからアップロードするためのput先/url2を組み立てているようです)

put_sample.py
def put_object_storage_file_with_credentials_xxxxxxxxxx(container, filename, indata):
    url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
    data = {'auth': {'identity': {'methods': ['password'],
            'password': {'user': {'name': 'member_1825cd3bc875420fc629ccfd22c22e20433a7ac9','domain': {'id': '07e33cca1abe47d293b86de49f1aa8bc'},
            'password': 'xxxxxxxxxx'}}}}}
    headers1 = {'Content-Type': 'application/json'}
    resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
    resp1_body = resp1.json()
    for e1 in resp1_body['token']['catalog']:
        if(e1['type']=='object-store'):
            for e2 in e1['endpoints']:
                        if(e2['interface']=='public'and e2['region']=='dallas'):
                            url2 = ''.join([e2['url'],'/', container, '/', filename])
    s_subject_token = resp1.headers['x-subject-token']
    headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
    resp2 = requests.put(url=url2, headers=headers2 , data=indata)
    print resp2
    return (resp2.content)


put_object_storage_file_with_credentials_xxxxxxxxxx( 'PredictiveAnalyticsProject2', 'whiskey_new_s.csv' , df_data_2.to_csv( index = False ) )

追記

上記のREST APIでPutするイメージとして Pythonのswiftclientを利用する方法もあるようですが、IBMのData Scientist Experience環境では使えないようでした。
Using IBM Object Storage in Bluemix, with Python
Screen Shot 2016-11-23 at 22.36.18.png

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?