LoginSignup
2
3

More than 5 years have passed since last update.

# 【分散型アーキテクチャを試してみる】第3章 〜Apache Hadoop,YARN,SparkでCSVインポートからPostgresqlにデータ取り込み編〜

Last updated at Posted at 2016-07-24

【分散型アーキテクチャを試してみる】第3章 〜Apache Hadoop,YARN,SparkでCSVインポートからPostgresqlにデータ取り込み編〜

あらすじ

最近は日本にいなかったり、インタビュー講演ばっかり受けてて、Qiitaサボリ気味でしたが、少しずつ再開しようと思ってます。

膨大なデータを取り扱ったり、大規模向けシステムが当たり前になってきている中、機会がないと触れることがない分散型アーキテクチャを試してみようという事で、勉強会で使った資料を公開していきます。

※後々、リクエストに応じて更新することが多いのでストックしておくことをおすすめします。

自分は某社でCTOをしていますが、頭でっかちに理論ばっかり学習するよりは、イメージがなんとなく掴めるように学習し、実践の中で知識を深めていく方が効率的に学習出来ると考えています。

未経験者の教育についてインタビューされた記事もあるので紹介しておきます。ご興味ある方は御覧ください。
エンジニアは「即戦力」より理念に共感した「未経験者」を育てるほうが費用対効果が高い。

教育スタイルとしては正しい事をきっちりかっちり教えるのではなく、未経験レベルの人がなんとなく掴めるように、資料を構成していきます。

以下のようなシリーズネタで進めます。

No. 記事
1 【分散型アーキテクチャを試してみる】第1章 〜Apache Mesos,Zookeeper,Marathon,Chronos編〜
2 【分散型アーキテクチャを試してみる】第2章 〜Apache Ambari,Hadoop,YARN Zookeeper,Spark編〜
3 【分散型アーキテクチャを試してみる】第3章 〜SparkでCSVインポートからPostgresqlにそのままデータ取り込み編〜

では、今回もはじめていきましょう!

はじめに

今回は
前回行った【分散型アーキテクチャを試してみる】第2章 〜Apache Ambari,Hadoop,YARN Zookeeper,Spark編〜の環境でSpark-Shellを使ってCSVを読み込み、Postgresqlにそのまま格納する事にします。

Postgresqlの環境をもう一台vagrantにって思ったのですが、そんなにメモリ積んでねえよボケってなったので、Ambariによって入れたnode1(172.168.1.11)にPostgresqlを使って試してみましょう。

No. hostname ip 備考
1 node1 172.168.1.11 マスターノード
2 node2 172.168.1.12 ノード
3 node3 172.168.1.13 ノード

SparkShellでsum count

HDFS上にあるCSVファイルを読み込んで
いきなり、Postgresqlに突っ込むってやっても辛いので、少しステップを踏んでいきましょう。

まず最初にSparkに含まれているSparkShellという対話式のプログラムを使って
1から10まで全部足したらいくつ的なのをやります。

vagrant machineにSSH接続

# vagrant ssh node1

spark-shell実行を試す

前回のAmbariで構築した環境ですとYRANで構築されているので、クラスターリソース管理はYARNを使い、Sparkクライアントを立ち上げてみます。

# sudo su - spark
$ cd /usr/hdp/current/spark-client/
$ ./bin/spark-shell --master yarn --deploy-mode client --conf spark.executor.memory=512m --conf spark.executor.cores=1

上記のコマンドを実行すると、YARNによりSparkの実行環境が整えられます。
以下はコンソールに流れるログなのでそのまま記載しておきます。

※どうせみてもあれなので、よみ飛ばしてください。

16/07/22 02:22:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/07/22 02:22:37 INFO SecurityManager: Changing view acls to: spark
16/07/22 02:22:37 INFO SecurityManager: Changing modify acls to: spark
16/07/22 02:22:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
16/07/22 02:22:37 INFO HttpServer: Starting HTTP Server
16/07/22 02:22:37 INFO Server: jetty-8.y.z-SNAPSHOT
16/07/22 02:22:37 INFO AbstractConnector: Started SocketConnector@0.0.0.0:48061
16/07/22 02:22:37 INFO Utils: Successfully started service 'HTTP class server' on port 48061.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
16/07/22 02:22:42 INFO SparkContext: Running Spark version 1.6.1
16/07/22 02:22:42 INFO SecurityManager: Changing view acls to: spark
16/07/22 02:22:42 INFO SecurityManager: Changing modify acls to: spark
16/07/22 02:22:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
16/07/22 02:22:42 INFO Utils: Successfully started service 'sparkDriver' on port 46345.
16/07/22 02:22:43 INFO Slf4jLogger: Slf4jLogger started
16/07/22 02:22:43 INFO Remoting: Starting remoting
16/07/22 02:22:43 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 33284.
16/07/22 02:22:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.168.1.11:33284]
16/07/22 02:22:43 INFO SparkEnv: Registering MapOutputTracker
16/07/22 02:22:43 INFO SparkEnv: Registering BlockManagerMaster
16/07/22 02:22:43 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-ccb8e2f6-8499-4f77-be75-28f03f63186d
16/07/22 02:22:43 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/07/22 02:22:44 INFO SparkEnv: Registering OutputCommitCoordinator
16/07/22 02:22:44 INFO Server: jetty-8.y.z-SNAPSHOT
16/07/22 02:22:44 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/07/22 02:22:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/07/22 02:22:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.168.1.11:4040
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
16/07/22 02:22:45 INFO TimelineClientImpl: Timeline service address: http://node2:8188/ws/v1/timeline/
16/07/22 02:22:45 INFO RMProxy: Connecting to ResourceManager at node2/172.168.1.12:8050
16/07/22 02:22:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
16/07/22 02:22:46 INFO Client: Requesting a new application from cluster with 3 NodeManagers
16/07/22 02:22:46 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (1536 MB per container)
16/07/22 02:22:46 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
16/07/22 02:22:46 INFO Client: Setting up container launch context for our AM
16/07/22 02:22:46 INFO Client: Setting up the launch environment for our AM container
16/07/22 02:22:46 INFO Client: Using the spark assembly jar on HDFS because you are using HDP, defaultSparkAssembly:hdfs://node1:8020/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar
16/07/22 02:22:46 INFO Client: Preparing resources for our AM container
16/07/22 02:22:46 INFO Client: Using the spark assembly jar on HDFS because you are using HDP, defaultSparkAssembly:hdfs://node1:8020/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar
16/07/22 02:22:46 INFO Client: Source and destination file systems are the same. Not copying hdfs://node1:8020/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar
16/07/22 02:22:46 INFO Client: Uploading resource file:/tmp/spark-b7e33f77-ee76-40b9-86ee-12a601a95ed4/__spark_conf__308131288477383306.zip -> hdfs://node1:8020/user/spark/.sparkStaging/application_1469152619478_0002/__spark_conf__308131288477383306.zip
16/07/22 02:22:47 INFO SecurityManager: Changing view acls to: spark
16/07/22 02:22:47 INFO SecurityManager: Changing modify acls to: spark
16/07/22 02:22:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
16/07/22 02:22:47 INFO Client: Submitting application 2 to ResourceManager
16/07/22 02:22:47 INFO YarnClientImpl: Submitted application application_1469152619478_0002
16/07/22 02:22:47 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1469152619478_0002 and attemptId None
16/07/22 02:22:48 INFO Client: Application report for application_1469152619478_0002 (state: ACCEPTED)
16/07/22 02:22:48 INFO Client:
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1469154167464
     final status: UNDEFINED
     tracking URL: http://node2:8088/proxy/application_1469152619478_0002/
     user: spark
16/07/22 02:22:49 INFO Client: Application report for application_1469152619478_0002 (state: ACCEPTED)
16/07/22 02:22:50 INFO Client: Application report for application_1469152619478_0002 (state: ACCEPTED)
16/07/22 02:22:51 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
16/07/22 02:22:51 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> node2, PROXY_URI_BASES -> http://node2:8088/proxy/application_1469152619478_0002), /proxy/application_1469152619478_0002
16/07/22 02:22:51 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/07/22 02:22:51 INFO Client: Application report for application_1469152619478_0002 (state: RUNNING)
16/07/22 02:22:51 INFO Client:
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: 172.168.1.13
     ApplicationMaster RPC port: 0
     queue: default
     start time: 1469154167464
     final status: UNDEFINED
     tracking URL: http://node2:8088/proxy/application_1469152619478_0002/
     user: spark
16/07/22 02:22:51 INFO YarnClientSchedulerBackend: Application application_1469152619478_0002 has started running.
16/07/22 02:22:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59567.
16/07/22 02:22:51 INFO NettyBlockTransferService: Server created on 59567
16/07/22 02:22:51 INFO BlockManagerMaster: Trying to register BlockManager
16/07/22 02:22:51 INFO BlockManagerMasterEndpoint: Registering block manager 172.168.1.11:59567 with 511.1 MB RAM, BlockManagerId(driver, 172.168.1.11, 59567)
16/07/22 02:22:51 INFO BlockManagerMaster: Registered BlockManager
16/07/22 02:22:51 INFO EventLoggingListener: Logging events to hdfs:///spark-history/application_1469152619478_0002
16/07/22 02:22:58 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (node2:43252) with ID 1
16/07/22 02:22:58 INFO BlockManagerMasterEndpoint: Registering block manager node2:56631 with 143.3 MB RAM, BlockManagerId(1, node2, 56631)
16/07/22 02:23:14 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
16/07/22 02:23:14 INFO SparkILoop: Created spark context..
Spark context available as sc.
16/07/22 02:23:15 INFO HiveContext: Initializing execution hive, version 1.2.1
16/07/22 02:23:15 INFO ClientWrapper: Inspected Hadoop version: 2.7.1.2.4.2.0-258
16/07/22 02:23:15 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.7.1.2.4.2.0-258
16/07/22 02:23:15 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/07/22 02:23:15 INFO ObjectStore: ObjectStore, initialize called
16/07/22 02:23:16 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/07/22 02:23:16 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/07/22 02:23:16 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
Fri Jul 22 02:23:16 UTC 2016 Thread[main,5,main] java.io.FileNotFoundException: derby.log (Permission denied)
16/07/22 02:23:16 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
----------------------------------------------------------------
Fri Jul 22 02:23:16 UTC 2016:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.1.1 - (1458268): instance a816c00e-0156-1069-fcc3-000028e90f30
on database directory /tmp/spark-581617c5-bffa-4b88-ae7f-b0b4d29b539f/metastore with class loader sun.misc.Launcher$AppClassLoader@70dea4e
Loaded from file:/usr/hdp/2.4.2.0-258/spark/lib/spark-assembly-1.6.1.2.4.2.0-258-hadoop2.7.1.2.4.2.0-258.jar
java.vendor=Oracle Corporation
java.runtime.version=1.8.0_60-b27
user.dir=/usr/hdp/2.4.2.0-258/spark
os.name=Linux
os.arch=amd64
os.version=3.10.0-229.el7.x86_64
derby.system.home=null
Database Class Loader started - derby.database.classpath=''
16/07/22 02:23:18 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/07/22 02:23:19 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:19 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:20 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:20 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:21 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
16/07/22 02:23:21 INFO ObjectStore: Initialized ObjectStore
16/07/22 02:23:21 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/07/22 02:23:21 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/07/22 02:23:22 INFO HiveMetaStore: Added admin role in metastore
16/07/22 02:23:22 INFO HiveMetaStore: Added public role in metastore
16/07/22 02:23:22 INFO HiveMetaStore: No user is added in admin role, since config is empty
16/07/22 02:23:22 INFO HiveMetaStore: 0: get_all_databases
16/07/22 02:23:22 INFO audit: ugi=spark ip=unknown-ip-addr  cmd=get_all_databases
16/07/22 02:23:22 INFO HiveMetaStore: 0: get_functions: db=default pat=*
16/07/22 02:23:22 INFO audit: ugi=spark ip=unknown-ip-addr  cmd=get_functions: db=default pat=*
16/07/22 02:23:22 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:22 INFO SessionState: Created local directory: /tmp/4e8db152-09bd-43d1-a726-cd4c9025850d_resources
16/07/22 02:23:22 INFO SessionState: Created HDFS directory: /tmp/hive/spark/4e8db152-09bd-43d1-a726-cd4c9025850d
16/07/22 02:23:22 INFO SessionState: Created local directory: /tmp/spark/4e8db152-09bd-43d1-a726-cd4c9025850d
16/07/22 02:23:22 INFO SessionState: Created HDFS directory: /tmp/hive/spark/4e8db152-09bd-43d1-a726-cd4c9025850d/_tmp_space.db
16/07/22 02:23:22 INFO HiveContext: default warehouse location is /user/hive/warehouse
16/07/22 02:23:22 INFO HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
16/07/22 02:23:22 INFO ClientWrapper: Inspected Hadoop version: 2.7.1.2.4.2.0-258
16/07/22 02:23:22 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.7.1.2.4.2.0-258
16/07/22 02:23:23 INFO metastore: Trying to connect to metastore with URI thrift://node2:9083
16/07/22 02:23:23 INFO metastore: Connected to metastore.
16/07/22 02:23:23 INFO SessionState: Created local directory: /tmp/f1e8e3c9-519b-435a-8994-79638e479bf3_resources
16/07/22 02:23:23 INFO SessionState: Created HDFS directory: /tmp/hive/spark/f1e8e3c9-519b-435a-8994-79638e479bf3
16/07/22 02:23:23 INFO SessionState: Created local directory: /tmp/spark/f1e8e3c9-519b-435a-8994-79638e479bf3
16/07/22 02:23:23 INFO SessionState: Created HDFS directory: /tmp/hive/spark/f1e8e3c9-519b-435a-8994-79638e479bf3/_tmp_space.db
16/07/22 02:23:23 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext

1から10までの整数の和を求める

つらつらコンソールが長々と流れた後に

scala>

と表示されればOKです。

以下の

scala> sc.parallelize(1 to 10).sum

を実行してみましょう。

scala> sc.parallelize(1 to 10).sum
16/07/22 02:26:33 INFO SparkContext: Starting job: sum at <console>:28

<<中略>>

16/07/22 02:26:35 INFO DAGScheduler: Job 0 finished: sum at <console>:28, took 1.620451 s
res0: Double = 55.0

以下のコマンドでspark-shellから抜けてみましょう。

scala> exit

無事に結果が55.0と表示されましたか?

「took 1.620451 s」と1.6秒ほどかかったのが分かりますね。

※おせーよと思ったから健全な反応です。

PostgresqlにDBユーザーと接続設定をする

冒頭にも記載しましたが、node1には手順通り作業していれば、既にPostgresqlがインストールされています。node1で一旦rootユーザーに変更してからpostgresユーザーで接続しなおしましょう。

sparkユーザーからrootに戻る

[spark@node1 ~]$ exit
logout
[root@node1 ~]#

postgresのDBユーザー作成

sparkというDBユーザーを作成します。
パスワードを設定するのでお忘れなく。

※本ドキュメントではsparkというパスワードを設定してます。

# su - postgres
-bash-4.2$ createuser spark --interactive -P
Enter password for new role:
Enter it again:
Shall the new role be a superuser? (y/n) n
Shall the new role be allowed to create databases? (y/n) y
Shall the new role be allowed to create more new roles? (y/n) n

postgresの設定情報を追記

pg_hba.confを編集して、先ほど作成したsparkユーザーを接続出来るようにしましょう。

-bash-4.2$ vi data/pg_hba.conf

以下を追記
---
local  all  spark md5
host  all   spark 0.0.0.0/0  md5
host  all   spark ::/0 md5
---

postgresqlサービスの再起動

postgresqユーザーからrootへあがり、サービスの再起動を行います。

-bash-4.2$  exit
#
# systemctl restart postgresql.service

sparkというDBを作成

# su - postgres
-bash-4.2$ createdb -U spark spark
Password:

psqlで接続確認

-bash-4.2$ psql
psql (9.2.15)
Type "help" for help.

postgres=# \l
                                  List of databases
   Name    |  Owner   | Encoding |   Collate   |    Ctype    |   Access privileges

-----------+----------+----------+-------------+-------------+----------------------
-
 ambari    | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =Tc/postgres
+
           |          |          |             |             | postgres=CTc/postgres
+
           |          |          |             |             | ambari=CTc/postgres
 ambarirca | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =Tc/postgres
+
           |          |          |             |             | postgres=CTc/postgres
+
           |          |          |             |             | mapred=CTc/postgres
 postgres  | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 spark     | spark    | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 template0 | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres
+
           |          |          |             |             | postgres=CTc/postgres
 template1 | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres
+
           |          |          |             |             | postgres=CTc/postgres
postgres=# \q

-bash-4.2$ psql -U spark spark
spark=> \d
No relations found.

Postgresqlに接続する環境を整える

全てのノードで同じ作業が必要が必要です。
今回はsparkユーザーで作業するので以下から
https://jdbc.postgresql.org/download.html
postgresql-9.4.1209.jarをDLして各nodeに配置しましょう。

ここでの作業は割愛します。MacでDLしてSCPでアップするなりしてください。

読み込むCSVを準備する

今回は
去年くらいに話題になった
賞金稼ぎから仕事探しまで、世界のデータサイエンティストが「Kaggle」に集まる理由で紹介されている。

リクルートさんが公開してるデータを使ってみましょう。

例のごとくMacでダウンロードしてzipを解答して以下のようにscpでアップしてください。

$ scp ./coupon_detail_train.csv vagrant@172.168.1.11:~/

sparkユーザーの/home/sparkに移動させる

$ sudo su -
# mv coupon_area_test.csv coupon_area_train.csv coupon_detail_train.csv coupon_list_test.csv coupon_list_train.csv user_list.csv prefecture_locations.csv /home/spark/
# chown spark:hadoop /home/spark/*.csv
# ls -l /home/spark/
-rw-r--r-- 1 spark hadoop   119943 Jul 22 03:26 coupon_area_test.csv
-rw-r--r-- 1 spark hadoop  7542291 Jul 22 03:26 coupon_area_train.csv
-rw-r--r-- 1 spark hadoop 23844951 Jul 22 03:26 coupon_detail_train.csv
-rw-r--r-- 1 spark hadoop    59163 Jul 22 03:25 coupon_list_test.csv
-rw-r--r-- 1 spark hadoop  3680603 Jul 22 03:26 coupon_list_train.csv
-rw-r--r-- 1 spark hadoop   689278 Jul 15 13:03 postgresql-9.4.1209.jar
-rw-r--r-- 1 spark hadoop     2046 Jul 22 04:14 prefecture_locations.csv
-rw-r--r-- 1 spark hadoop  1579780 Jul 22 04:13 user_list.csv

hadoopコマンドでファイルを登録する

HDFSを使ったことない人はいきなりなんだって思いますが、各ノードのどこからでもアクセス出来るようにHDFSにファイルを登録しましょう。

# sudo su - spark
$ hadoop fs -put  ./*.csv /user/spark/
$ hadoop fs -ls  /user/spark/
Found 8 items
drwxr-xr-x   - spark hdfs          0 2016-07-22 03:41 /user/spark/.sparkStaging
-rw-r--r--   3 spark hdfs     119943 2016-07-22 04:22 /user/spark/coupon_area_test.csv
-rw-r--r--   3 spark hdfs    7542291 2016-07-22 04:22 /user/spark/coupon_area_train.csv
-rw-r--r--   3 spark hdfs   23844951 2016-07-22 04:22 /user/spark/coupon_detail_train.csv
-rw-r--r--   3 spark hdfs      59163 2016-07-22 04:22 /user/spark/coupon_list_test.csv
-rw-r--r--   3 spark hdfs    3680603 2016-07-22 04:22 /user/spark/coupon_list_train.csv
-rw-r--r--   3 spark hdfs       2046 2016-07-22 04:22 /user/spark/prefecture_locations.csv
-rw-r--r--   3 spark hdfs    1579780 2016-07-22 04:22 /user/spark/user_list.csv

ライブラリを読み込んだ状態でSparkShellを起動

ここまで長かったですね。よく頑張りました。

今回はCSVを読み込みするので「com.databricks:spark-csv_2.11:1.3.0」とdownloadし配置した
「postgresql-9.4.1209.jar」を読み込んで起動します。

※com.databricks:spark-csv_2.11:1.4.0だとcsv出力出来ないのでcom.databricks:spark-csv_2.11:1.3.0に変更 by 2016/07/30 10:25

# sudo su - spark
$ cd ~/
$ wget https://jdbc.postgresql.org/download/postgresql-9.4.1209.jar
$ cd /usr/hdp/current/spark-client/
$ ./bin/spark-shell --master yarn --deploy-mode client --conf spark.executor.memory=512m --conf spark.executor.cores=1 --packages com.databricks:spark-csv_2.11:1.4.0 --driver-class-path /home/spark/postgresql-9.4.1209.jar --jars /home/spark/postgresql-9.4.1209.jar

dataframeでcsvを読み込んでみる。

sparkで用意されているdataframeという仕組みを使ってCSVを読み込んで見ましょう。

csvを読み込みdfオブジェクト作成

scala> val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("user_list.csv")

<<中略>>

16/07/22 04:24:40 INFO DAGScheduler: Job 1 finished: aggregate at InferSchema.scala:41, took 1.572468 s
df: org.apache.spark.sql.DataFrame = [REG_DATE: timestamp, SEX_ID: string, AGE: int, WITHDRAW_DATE: string, PREF_NAME: string, USER_ID_hash: string]

dfオブジェクトの中身を5件だけ見てみる

scala> df.take(5)
16/07/22 04:25:08 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 314.3 KB, free 1013.4 KB)
16/07/22 04:25:08 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 26.5 KB, free 1039.9 KB)
16/07/22 04:25:08 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.168.1.11:54982 (size: 26.5 KB, free: 511.0 MB)

<<中略>>

16/07/22 04:25:08 INFO DAGScheduler: Job 2 finished: take at <console>:28, took 0.245868 s
res0: Array[org.apache.spark.sql.Row] = Array([2012-03-28 14:14:18.0,f,25,NA,,d9dca3cb44bab12ba313eaa681f663eb], [2011-05-18 00:41:48.0,f,34,NA,東京都,560574a339f1b25e57b0221e486907ed], [2011-06-13 16:36:58.0,m,41,NA,愛知県,e66ae91b978b3229f8fd858c80615b73], [2012-02-08 12:56:15.0,m,25,NA,,43fc18f32eafb05713ec02935e2c2825], [2011-05-22 23:43:56.0,m,62,NA,神奈川県,dc6df8aa860f8db0d710ce9d4839840f])

dfオブジェクトをpostgresqlに格納

user_listというテーブルに読みこんだCSVの中身を格納してみましょう。

import java.util.Properties
import org.apache.spark.sql.SaveMode

lazy val dbAuth = {
val prop = new Properties()
prop.setProperty ("user","spark")
prop.setProperty ("password","spark")
prop.setProperty ("driver","org.postgresql.Driver")
prop
}
df.write.mode(SaveMode.Overwrite).jdbc("jdbc:postgresql://172.168.1.11:5432/spark", "user_list", dbAuth)

以下のコマンドでspark-shellを抜けましょう。

scala> exit

psqlで確認

では実際にpostgresに登録されているか確認して見ましょう。
一度rootに上がってからpostgresユーザーになってからpsqlで確認してみます。

$ exit
# sudo su - postgres
-bash-4.2$ psql -U spark spark
Password for user spark:

spark=> \d
         List of relations
 Schema |   Name    | Type  | Owner
--------+-----------+-------+-------
 public | user_list | table | spark
(1 row)

spark=> select count(*) from user_list;
 count
-------
 22873
(1 row)

無事に登録されていますね。

今回は以上です。postgresやhdfsやたくさん新しいのが出てきて
おえってなったと思いますが、懲りずに勉強しましょう。

※ただのSparkのstandalone構成だったら簡単なんだけどね。。。

編集後記

第2章の後にどういう方向でネタ作るかなぁと思ったんですが、鉄板なCSVなどのファイルから読み込んで、加工や集計を行った結果をDBに格納するみたいな流れが実務的で良いかなと思ってこんな感じにしてみました。

実際にSparkに触れてみて、遅いじゃんって思った人も多いと思います。
vagrant上でCluster組んでるっていうのもあるし、まだYarnのClusterを引き出すような内容をやっているのでそう感じるだけです。

RMDBだけでも出来るよってお思いのあなた。はい、やろうと思えば出来ます。
ただ、本番環境でRMDBだけで、ブンブンと集計処理を回したりして、フロント側のサービスが遅くなったりしたら、意味がないですよね。適材適所でアーキテクチャ考えて、処理させるのがいいと思います。

次回は第4章 〜Apache Hadoop,YARN,Sparkで集計してサマリーをpostgresqlに格納してみる編を公開する予定です。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3