LoginSignup
3
5

More than 5 years have passed since last update.

SparkSQLを使いデータを取得する

Last updated at Posted at 2018-10-25

SparkSQLを使用してDataFrameを扱おうとした際にいろいろ試してみた結果メモ。

表の中の値を取得するサンプル1

#%%
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

sc = SparkContext("local", "test")

try:
    # RDD作成
    rdd = sc.parallelize([
        ("Alice", 20),
        ("Bob", 25),
        ("Carol", 30),
        ("Daniel", 30),
    ]).collect()
    print(rdd)

    # スキーマ作成
    schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), False),
    ])
    print(schema)

    # Sparkのインスタンス作成
    spark = SparkSession.builder \
        .master("local") \
        .appName("test") \
        .getOrCreate()
    print(spark)

    # RDDとスキーマを使用しデータフレーム作成
    print("# RDDとスキーマを使用しデータフレーム作成")
    print("spark.createDataFrame(rdd, schema)")
    df = spark.createDataFrame(rdd, schema)
    print(df)
    df.show()

    # データフレームをusersテーブルとして登録、SQLを使って表示
    df.registerTempTable("users")
    df = spark.sql("select * from users")
    df.show()

    # SQLで表示した値をusers_age_under_30テーブルとして登録、表示
    df = spark.sql("select * from users where age < 30")
    df.registerTempTable("users_age_under_30")
    df = spark.sql("select * from users_age_under_30")
    df.show()

    # dfに毎回=で代入してるので、usersテーブルが使用できるか確認
    df = spark.sql("select * from users")
    df.show()

    # SQL結果の列「Name」の行「0」をStringとして取得
    dfList = df.collect()
    row = dfList[0]
    print( row )
    name = row.name
    print( name )   
    # つまり df.collect()[0].Name で値が取得出来る

finally:
    sc.stop()

●以下は間違った解釈。

    # spark : インスタンス。createDataFrame(rdd, schema)でsparkに値が格納される?
    # df    : データフレーム。sparkから.sql等で値を取ってきたもの
    # df.registerTempTable("TableName")
    #       : データフレームに対してテーブル名を指定する。dfを上書きしたら使えなくなる

●以下は合っていると思われる解釈。

    # spark : インスタンス。createDataFrame(rdd, schema)でsparkに値が格納される?
    # df    : データフレーム。インスタンスを参照し、表示しているだけ?
    # df.registerTempTable("TableName")
    #       : dfに保存されている時点の参照先のインスタンスの表に対して、名前を付ける?

表の中の値を取得するサンプル2


#%%

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import Row
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

sc = SparkContext("local", "test")

try :
    print("Start get ")
    # import and convert to rdd from csv
    fileName = "sample_dataset.csv"

    spark = SparkSession.builder \
        .master("local") \
        .appName("test") \
        .getOrCreate()
    df = spark.read.option("header", "true").csv(fileName)
    df.registerTempTable("sample")
    df.show()

    df = spark.sql("select Name, Birthdate, Occupation, Salary \
        from sample \
        where birthdate > '1984/12/31' \
        order by Birthdate")

    if df.count() < int(spark.sql("select count(*) cnt from sample").collect()[0].cnt) :
        print("絞り込めました : " + str(df.count()) + "行")

finally:
    sc.stop()
  • csvデータは他の方が作成した以下のデータをお借りしています。
sample_dataset.csv
ID,Name,Birthdate,Sex,Occupation,Salary
ID-0001,Abe,1985/1/1,M,Engineer,8422213
ID-0002,Saito,1970/2/11,F,Professor,8222588
ID-0003,Yamada,1975/3/21,M,Doctor,9845288
ID-0004,Tanaka,1980/4/22,F,Sales,8505218
ID-0005,Okamoto,1995/5/25,M,Student,218103

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5