Posted at

Reading from a Hive table and writing to a Relational Database using pySpark

More than 1 year has passed since last update.

I was once asked for a tutorial that described how to use pySpark to read data from a Hive table and write to a JDBC datasource like PostgreSQL or SQL Server. Not being able to find a suitable tutorial, I decided to write one.

This demo creates a python script which uses pySpark to read data from a Hive table into a DataFrame, perform operations on the DataFrame, and write the results out to a JDBC DataSource (PostgreSQL database). I am using Spark 1.6.2.


Getting some CSV data to populate into Hive

I will use crime data from the City of Chicago in this tutorial. The data is available in CSV format from the following link:

https://catalog.data.gov/dataset/crimes-2001-to-present-398a4

[root@jyoung-hdp25-node-1 tmp]# mkdir -p /tmp/crime

[root@jyoung-hdp25-node-1 tmp]# cd /tmp/crime
[root@jyoung-hdp25-node-1 crime]# curl -o crime -L https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
[root@jyoung-hdp25-node-1 crime]# cd /tmp/
[root@jyoung-hdp25-node-1 tmp]# chmod -R 777 crime/


Copy the CSV data to HDFS

I'm using a kerberized Hortonworks Data Platform (HDP) cluster, so I'll use kinit to authenticate.

root@jyoung-hdp25-node-1 tmp]# su - hdfs

[hdfs@jyoung-hdp25-node-1 ~]$ kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs-cluster1@EXAMPLE.COM
[hdfs@jyoung-hdp25-node-1 ~]$ klist
Ticket cache: FILE:/tmp/krb5cc_1006
Default principal: hdfs-cluster1@EXAMPLE.COM

Valid starting Expires Service principal
10/02/2016 04:15:38 10/03/2016 04:15:38 krbtgt/EXAMPLE.COM@EXAMPLE.COM

I create some HDFS directories to store my data and make them world readable and writeable just to make things easier.

[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -mkdir -p /tmp/crime

[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chmod -R 777 /tmp/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -copyFromLocal /tmp/crime/crime /tmp/crime/
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chown -R ambari-qa:hdfs /tmp/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chown -R ambari-qa:hdfs /tmp/crime/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chmod -R 777 /tmp/crime/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -ls /tmp/crime/
Found 1 items
-rwxrwxrwx 3 ambari-qa hdfs 1452392942 2016-10-02 04:27 /tmp/crime/crime
[hdfs@jyoung-hdp25-node-1 ~]$ exit


Create an HQL file of DDL commands for creating and populating the Hive table

Using kinit again to authenticate.

[root@jyoung-hdp25-node-1 tmp]# su - hive

[hive@jyoung-hdp25-node-1 ~]$ kinit -kt /etc/security/keytabs/hive.service.keytab hive/jyoung-hdp25-node-1.openstacklocal@EXAMPLE.COM
[hive@jyoung-hdp25-node-1 ~]$ klist
Ticket cache: FILE:/tmp/krb5cc_1000
Default principal: hive/jyoung-hdp25-node-1.openstacklocal@EXAMPLE.COM

Valid starting Expires Service principal
10/02/2016 04:34:16 10/03/2016 04:34:16 krbtgt/EXAMPLE.COM@EXAMPLE.COM

I use a heredoc to populate load_crime_table.txt with my Hive CREATE TABLE DDL.

Once you execute the first line, you can copy and paste the remaining lines up to the EOF and they'll be written to the file.

[hive@jyoung-hdp25-node-1 ~]$ cat << EOF > /tmp/load_crime_table.txt

CREATE EXTERNAL TABLE IF NOT EXISTS crime(
ID STRING,
Case_Number STRING,
Case_Date STRING,
Block STRING,
IUCR INT,
Primary_Type STRING,
Description STRING,
Location_Description STRING,
Arrest BOOLEAN,
Domestic BOOLEAN,
Beat STRING,
District STRING,
Ward STRING,
Community_Area STRING,
FBI_Code STRING,
X_Coordinate INT,
Y_Coordinate INT,
Case_Year INT,
Updated_On STRING,
Latitude DOUBLE,
Longitude DOUBLE,
Location STRING)
COMMENT 'This is crime data for the city of Chicago.'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION '/tmp/crime'
TBLPROPERTIES("skip.header.line.count"="1");
EOF


Populate the Hive table from the HQL file

I use beeline to execute my load_crime_table.txt which creates an external Hive table. Because I'm in a kerberized environment, I include my Hive Service Principal in beeline's JDBC connection URL.

[hive@jyoung-hdp25-node-1 ~]$ /usr/bin/beeline -u "jdbc:hive2://jyoung-hdp25-node-2.openstacklocal:10000/default;principal=hive/_HOST@EXAMPLE.COM" -f "/tmp/load_crime_table.txt"

[hive@jyoung-hdp25-node-1 ~]$ exit


Create an SQL file of DDL commands for creating the RDBMS table (PostgreSQL)

I'll use another heredoc to create a file of SQL commands containing the DDL statements for creating my relational database table.

[root@jyoung-hdp25-node-1 ~]# cat << EOF /tmp/create_pettytheft.sql

CREATE table pettytheft(
id BIGINT ,
case_number VARCHAR(255) NOT NULL ,
primary_type VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
location_description VARCHAR(255) NOT NULL,
beat VARCHAR(255) NOT NULL,
district VARCHAR(255) NOT NULL,
ward VARCHAR(255) NOT NULL,
community_area VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);

GRANT ALL ON pettytheft TO ambari;
ALTER TABLE pettytheft OWNER TO ambari;
EOF
[root@jyoung-hdp25-node-1 ~]# chmod 777 /tmp/create_pettytheft.sql


Create the Database table using the SQL file

I use PostgreSQL's psql utility to create the database from the SQL file created above.

[root@jyoung-hdp25-node-1 ~]# su - postgres

-bash-4.2$ psql -d postgres -f /tmp/create_pettytheft.sql
-bash-4.2$ exit


Create a python script to read from Hive and write to the JDBC DataSource (PostgreSQL table)

I will create the python script as /tmp/pyspark_hive_jdbc_demo.py. Let's go through the logic section-by-section.

Import the necessary modules for pyspark.

from pyspark import SparkContext

from pyspark.sql import HiveContext

Create a Spark Context object, which tells Spark how to access a cluster.

Use the Spark Context to create a Hive Context object, which allows you to execute SQL queries as well as Hive commands.

sc = SparkContext("local", "pySpark Hive JDBC Demo App")

# Create a Hive Context
hive_context = HiveContext(sc)

Read from the Hive table "crime" on the "default" Hive database. The result is a DataFrame.

print "Reading Hive table..."

crime = hive_context.table("default.crime")

Register the DataFrame crime as a temporary table crime_temp.

print "Registering DataFrame as a table..."

crime.registerTempTable("crime_temp")

SQL can be run over DataFrames that have been registered as a table.

Executing an SQL query over this temporary table to get a list of thefts of property worth less than $500USD.

The results will be another DataFrame.

print "Executing SQL SELECT query on DataFrame registered as a temporary table..."

pettythefts = hive_context.sql('SELECT * FROM crime_temp WHERE Primary_Type = "THEFT" AND Description = "$500 AND UNDER"')

Create a new DataFrame containing only the columns we wish to write to the JDBC connected datasource using select([list of columns]).

This will be a subset of the columns available in the source Hive table.

print "Creating a DataFrame of only the columns of our resultset to be persisted to JDBC DataSource..."

pettythefts_table_df = pettythefts.select("id", "case_number", "primary_type", "description", "location_description", "beat", "district", "ward", "community_area")

Prepare the connection properties for the JDBC datasource.

I'm using a Postgres database so the connection URL below is valid for Postgres.

My database table name (as defined in /tmp/create_pettytheft.sql) is public.pettytheft.

mode = 'overwrite'

url = 'jdbc:postgresql://<database server IP address>:5432/postgres?searchpath=public'
properties = {"user": "<username>", "password": "<password>", "driver": "org.postgresql.Driver"}
table = 'public.pettytheft'

Write the contents of the DataFrame to the JDBC datasource (Postgres) using the connection URL defined above.

print "Writing DataFrame to JDBC Datasource..."

pettythefts_table_df.write.jdbc(url=url, table=table, mode=mode, properties=properties)

print "Exiting..."

Save the python script as /tmp/pyspark_hive_jdbc_demo.py


Run the pySpark application locally

Be sure to include postgresql-jdbc.jar in both your distributed cache using the --jars option and on your class path using the --driver-class-path option.

[root@jyoung-hdp25-node-1 ~]# su - yarn

[yarn@jyoung-hdp25-node-1 ~]$ /bin/spark-submit --jars /usr/share/java/postgresql-jdbc.jar --driver-class-path /usr/share/java/postgresql-jdbc.jar --master local[1] /tmp/pyspark_hive_jdbc_demo.py
[yarn@jyoung-hdp25-node-1 ~]$ exit


Verify results in the RDBMS

I execute a query against my pettytheft table in Postgres to verify that pySpark wrote out the data to the database as expected.

[root@jyoung-hdp25-node-1 ~]# su - postgres

-bash-4.2$ psql -d postgres -U <username> -c "select * from public.pettytheft limit 5;"
Password for user ambari:
id | case_number | primary_type | description | location_description | beat | district | ward | community_area
---------+-------------+--------------+----------------+----------------------+------+----------+------+----------------
8503954 | HV180410 | THEFT | $500 AND UNDER | STREET | 1114 | 011 | 28 | 26
8503999 | HV180357 | THEFT | $500 AND UNDER | STREET | 2412 | 024 | 50 | 2
8504003 | HV180508 | THEFT | $500 AND UNDER | STREET | 0725 | 007 | 17 | 67
8504108 | HV180682 | THEFT | $500 AND UNDER | SIDEWALK | 0123 | 001 | 2 | 32
8504109 | HV180672 | THEFT | $500 AND UNDER | STREET | 1911 | 019 | 47 | 5
(5 rows)


Wrap-up

In the exercise above we:


  • created a Hive table

  • populated the Hive table with CSV data

  • created a relational database table

  • created a pySpark script that:


    • read the Hive table into a DataFrame

    • registered the DataFrame as a temporary table

    • executed an SQL query over the temporary table

    • executed a select() on the resulting DataFrame

    • wrote the contents out to the relational database



  • verified the data was written to our database

If you'd like to quickly get up-and-running with Spark development, I encourage you to download our HDP Sandbox. It comes with everything you need to start developing Spark applications today!