【分散型アーキテクチャを試してみる】第3章 〜Apache Hadoop,YARN,SparkでCSVインポートからPostgresqlにデータ取り込み編〜
あらすじ
最近は日本にいなかったり、インタビューや講演ばっかり受けてて、Qiitaサボリ気味でしたが、少しずつ再開しようと思ってます。
膨大なデータを取り扱ったり、大規模向けシステムが当たり前になってきている中、機会がないと触れることがない分散型アーキテクチャを試してみようという事で、勉強会で使った資料を公開していきます。
※後々、リクエストに応じて更新することが多いのでストックしておくことをおすすめします。
自分は某社でCTOをしていますが、頭でっかちに理論ばっかり学習するよりは、イメージがなんとなく掴めるように学習し、実践の中で知識を深めていく方が効率的に学習出来ると考えています。
未経験者の教育についてインタビューされた記事もあるので紹介しておきます。ご興味ある方は御覧ください。
エンジニアは「即戦力」より理念に共感した「未経験者」を育てるほうが費用対効果が高い。
教育スタイルとしては正しい事をきっちりかっちり教えるのではなく、未経験レベルの人がなんとなく掴めるように、資料を構成していきます。
以下のようなシリーズネタで進めます。
では、今回もはじめていきましょう!
はじめに
今回は
前回行った【分散型アーキテクチャを試してみる】第2章 〜Apache Ambari,Hadoop,YARN Zookeeper,Spark編〜の環境でSpark-Shellを使ってCSVを読み込み、Postgresqlにそのまま格納する事にします。
Postgresqlの環境をもう一台vagrantにって思ったのですが、そんなにメモリ積んでねえよボケってなったので、Ambariによって入れたnode1(172.168.1.11)にPostgresqlを使って試してみましょう。
No. | hostname | ip | 備考 |
---|---|---|---|
1 | node1 | 172.168.1.11 | マスターノード |
2 | node2 | 172.168.1.12 | ノード |
3 | node3 | 172.168.1.13 | ノード |
SparkShellでsum count
HDFS上にあるCSVファイルを読み込んで
いきなり、Postgresqlに突っ込むってやっても辛いので、少しステップを踏んでいきましょう。
まず最初にSparkに含まれているSparkShellという対話式のプログラムを使って
1から10まで全部足したらいくつ的なのをやります。
vagrant machineにSSH接続
# vagrant ssh node1
spark-shell実行を試す
前回のAmbariで構築した環境ですとYRANで構築されているので、クラスターリソース管理はYARNを使い、Sparkクライアントを立ち上げてみます。
# sudo su - spark
$ cd /usr/hdp/current/spark-client/
$ ./bin/spark-shell --master yarn --deploy-mode client --conf spark.executor.memory=512m --conf spark.executor.cores=1
上記のコマンドを実行すると、YARNによりSparkの実行環境が整えられます。
以下はコンソールに流れるログなのでそのまま記載しておきます。
※どうせみてもあれなので、よみ飛ばしてください。
16/07/22 02:22:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/07/22 02:22:37 INFO SecurityManager: Changing view acls to: spark
16/07/22 02:22:37 INFO SecurityManager: Changing modify acls to: spark
16/07/22 02:22:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
16/07/22 02:22:37 INFO HttpServer: Starting HTTP Server
16/07/22 02:22:37 INFO Server: jetty-8.y.z-SNAPSHOT
16/07/22 02:22:37 INFO AbstractConnector: Started SocketConnector@0.0.0.0:48061
16/07/22 02:22:37 INFO Utils: Successfully started service 'HTTP class server' on port 48061.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
16/07/22 02:22:42 INFO SparkContext: Running Spark version 1.6.1
16/07/22 02:22:42 INFO SecurityManager: Changing view acls to: spark
16/07/22 02:22:42 INFO SecurityManager: Changing modify acls to: spark
16/07/22 02:22:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
16/07/22 02:22:42 INFO Utils: Successfully started service 'sparkDriver' on port 46345.
16/07/22 02:22:43 INFO Slf4jLogger: Slf4jLogger started
16/07/22 02:22:43 INFO Remoting: Starting remoting
16/07/22 02:22:43 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 33284.
16/07/22 02:22:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.168.1.11:33284]
16/07/22 02:22:43 INFO SparkEnv: Registering MapOutputTracker
16/07/22 02:22:43 INFO SparkEnv: Registering BlockManagerMaster
16/07/22 02:22:43 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-ccb8e2f6-8499-4f77-be75-28f03f63186d
16/07/22 02:22:43 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/07/22 02:22:44 INFO SparkEnv: Registering OutputCommitCoordinator
16/07/22 02:22:44 INFO Server: jetty-8.y.z-SNAPSHOT
16/07/22 02:22:44 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/07/22 02:22:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/07/22 02:22:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.168.1.11:4040
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
16/07/22 02:22:45 INFO TimelineClientImpl: Timeline service address: http://node2:8188/ws/v1/timeline/
16/07/22 02:22:45 INFO RMProxy: Connecting to ResourceManager at node2/172.168.1.12:8050
16/07/22 02:22:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
16/07/22 02:22:46 INFO Client: Requesting a new application from cluster with 3 NodeManagers
16/07/22 02:22:46 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (1536 MB per container)
16/07/22 02:22:46 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
16/07/22 02:22:46 INFO Client: Setting up container launch context for our AM
16/07/22 02:22:46 INFO Client: Setting up the launch environment for our AM container
16/07/22 02:22:46 INFO Client: Using the spark assembly jar on HDFS because you are using HDP, defaultSparkAssembly:hdfs://node1:8020/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar
16/07/22 02:22:46 INFO Client: Preparing resources for our AM container
16/07/22 02:22:46 INFO Client: Using the spark assembly jar on HDFS because you are using HDP, defaultSparkAssembly:hdfs://node1:8020/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar
16/07/22 02:22:46 INFO Client: Source and destination file systems are the same. Not copying hdfs://node1:8020/hdp/apps/2.4.2.0-258/spark/spark-hdp-assembly.jar
16/07/22 02:22:46 INFO Client: Uploading resource file:/tmp/spark-b7e33f77-ee76-40b9-86ee-12a601a95ed4/__spark_conf__308131288477383306.zip -> hdfs://node1:8020/user/spark/.sparkStaging/application_1469152619478_0002/__spark_conf__308131288477383306.zip
16/07/22 02:22:47 INFO SecurityManager: Changing view acls to: spark
16/07/22 02:22:47 INFO SecurityManager: Changing modify acls to: spark
16/07/22 02:22:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
16/07/22 02:22:47 INFO Client: Submitting application 2 to ResourceManager
16/07/22 02:22:47 INFO YarnClientImpl: Submitted application application_1469152619478_0002
16/07/22 02:22:47 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1469152619478_0002 and attemptId None
16/07/22 02:22:48 INFO Client: Application report for application_1469152619478_0002 (state: ACCEPTED)
16/07/22 02:22:48 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1469154167464
final status: UNDEFINED
tracking URL: http://node2:8088/proxy/application_1469152619478_0002/
user: spark
16/07/22 02:22:49 INFO Client: Application report for application_1469152619478_0002 (state: ACCEPTED)
16/07/22 02:22:50 INFO Client: Application report for application_1469152619478_0002 (state: ACCEPTED)
16/07/22 02:22:51 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
16/07/22 02:22:51 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> node2, PROXY_URI_BASES -> http://node2:8088/proxy/application_1469152619478_0002), /proxy/application_1469152619478_0002
16/07/22 02:22:51 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/07/22 02:22:51 INFO Client: Application report for application_1469152619478_0002 (state: RUNNING)
16/07/22 02:22:51 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.168.1.13
ApplicationMaster RPC port: 0
queue: default
start time: 1469154167464
final status: UNDEFINED
tracking URL: http://node2:8088/proxy/application_1469152619478_0002/
user: spark
16/07/22 02:22:51 INFO YarnClientSchedulerBackend: Application application_1469152619478_0002 has started running.
16/07/22 02:22:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59567.
16/07/22 02:22:51 INFO NettyBlockTransferService: Server created on 59567
16/07/22 02:22:51 INFO BlockManagerMaster: Trying to register BlockManager
16/07/22 02:22:51 INFO BlockManagerMasterEndpoint: Registering block manager 172.168.1.11:59567 with 511.1 MB RAM, BlockManagerId(driver, 172.168.1.11, 59567)
16/07/22 02:22:51 INFO BlockManagerMaster: Registered BlockManager
16/07/22 02:22:51 INFO EventLoggingListener: Logging events to hdfs:///spark-history/application_1469152619478_0002
16/07/22 02:22:58 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (node2:43252) with ID 1
16/07/22 02:22:58 INFO BlockManagerMasterEndpoint: Registering block manager node2:56631 with 143.3 MB RAM, BlockManagerId(1, node2, 56631)
16/07/22 02:23:14 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
16/07/22 02:23:14 INFO SparkILoop: Created spark context..
Spark context available as sc.
16/07/22 02:23:15 INFO HiveContext: Initializing execution hive, version 1.2.1
16/07/22 02:23:15 INFO ClientWrapper: Inspected Hadoop version: 2.7.1.2.4.2.0-258
16/07/22 02:23:15 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.7.1.2.4.2.0-258
16/07/22 02:23:15 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/07/22 02:23:15 INFO ObjectStore: ObjectStore, initialize called
16/07/22 02:23:16 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/07/22 02:23:16 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/07/22 02:23:16 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
Fri Jul 22 02:23:16 UTC 2016 Thread[main,5,main] java.io.FileNotFoundException: derby.log (Permission denied)
16/07/22 02:23:16 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
----------------------------------------------------------------
Fri Jul 22 02:23:16 UTC 2016:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.1.1 - (1458268): instance a816c00e-0156-1069-fcc3-000028e90f30
on database directory /tmp/spark-581617c5-bffa-4b88-ae7f-b0b4d29b539f/metastore with class loader sun.misc.Launcher$AppClassLoader@70dea4e
Loaded from file:/usr/hdp/2.4.2.0-258/spark/lib/spark-assembly-1.6.1.2.4.2.0-258-hadoop2.7.1.2.4.2.0-258.jar
java.vendor=Oracle Corporation
java.runtime.version=1.8.0_60-b27
user.dir=/usr/hdp/2.4.2.0-258/spark
os.name=Linux
os.arch=amd64
os.version=3.10.0-229.el7.x86_64
derby.system.home=null
Database Class Loader started - derby.database.classpath=''
16/07/22 02:23:18 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/07/22 02:23:19 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:19 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:20 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:20 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:21 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
16/07/22 02:23:21 INFO ObjectStore: Initialized ObjectStore
16/07/22 02:23:21 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/07/22 02:23:21 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/07/22 02:23:22 INFO HiveMetaStore: Added admin role in metastore
16/07/22 02:23:22 INFO HiveMetaStore: Added public role in metastore
16/07/22 02:23:22 INFO HiveMetaStore: No user is added in admin role, since config is empty
16/07/22 02:23:22 INFO HiveMetaStore: 0: get_all_databases
16/07/22 02:23:22 INFO audit: ugi=spark ip=unknown-ip-addr cmd=get_all_databases
16/07/22 02:23:22 INFO HiveMetaStore: 0: get_functions: db=default pat=*
16/07/22 02:23:22 INFO audit: ugi=spark ip=unknown-ip-addr cmd=get_functions: db=default pat=*
16/07/22 02:23:22 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
16/07/22 02:23:22 INFO SessionState: Created local directory: /tmp/4e8db152-09bd-43d1-a726-cd4c9025850d_resources
16/07/22 02:23:22 INFO SessionState: Created HDFS directory: /tmp/hive/spark/4e8db152-09bd-43d1-a726-cd4c9025850d
16/07/22 02:23:22 INFO SessionState: Created local directory: /tmp/spark/4e8db152-09bd-43d1-a726-cd4c9025850d
16/07/22 02:23:22 INFO SessionState: Created HDFS directory: /tmp/hive/spark/4e8db152-09bd-43d1-a726-cd4c9025850d/_tmp_space.db
16/07/22 02:23:22 INFO HiveContext: default warehouse location is /user/hive/warehouse
16/07/22 02:23:22 INFO HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
16/07/22 02:23:22 INFO ClientWrapper: Inspected Hadoop version: 2.7.1.2.4.2.0-258
16/07/22 02:23:22 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.7.1.2.4.2.0-258
16/07/22 02:23:23 INFO metastore: Trying to connect to metastore with URI thrift://node2:9083
16/07/22 02:23:23 INFO metastore: Connected to metastore.
16/07/22 02:23:23 INFO SessionState: Created local directory: /tmp/f1e8e3c9-519b-435a-8994-79638e479bf3_resources
16/07/22 02:23:23 INFO SessionState: Created HDFS directory: /tmp/hive/spark/f1e8e3c9-519b-435a-8994-79638e479bf3
16/07/22 02:23:23 INFO SessionState: Created local directory: /tmp/spark/f1e8e3c9-519b-435a-8994-79638e479bf3
16/07/22 02:23:23 INFO SessionState: Created HDFS directory: /tmp/hive/spark/f1e8e3c9-519b-435a-8994-79638e479bf3/_tmp_space.db
16/07/22 02:23:23 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext
1から10までの整数の和を求める
つらつらコンソールが長々と流れた後に
scala>
と表示されればOKです。
以下の
scala> sc.parallelize(1 to 10).sum
を実行してみましょう。
scala> sc.parallelize(1 to 10).sum
16/07/22 02:26:33 INFO SparkContext: Starting job: sum at <console>:28
<<中略>>
16/07/22 02:26:35 INFO DAGScheduler: Job 0 finished: sum at <console>:28, took 1.620451 s
res0: Double = 55.0
以下のコマンドでspark-shellから抜けてみましょう。
scala> exit
無事に結果が55.0と表示されましたか?
「took 1.620451 s」と1.6秒ほどかかったのが分かりますね。
※おせーよと思ったから健全な反応です。
PostgresqlにDBユーザーと接続設定をする
冒頭にも記載しましたが、node1には手順通り作業していれば、既にPostgresqlがインストールされています。node1で一旦rootユーザーに変更してからpostgresユーザーで接続しなおしましょう。
sparkユーザーからrootに戻る
[spark@node1 ~]$ exit
logout
[root@node1 ~]#
postgresのDBユーザー作成
sparkというDBユーザーを作成します。
パスワードを設定するのでお忘れなく。
※本ドキュメントではsparkというパスワードを設定してます。
# su - postgres
-bash-4.2$ createuser spark --interactive -P
Enter password for new role:
Enter it again:
Shall the new role be a superuser? (y/n) n
Shall the new role be allowed to create databases? (y/n) y
Shall the new role be allowed to create more new roles? (y/n) n
postgresの設定情報を追記
pg_hba.confを編集して、先ほど作成したsparkユーザーを接続出来るようにしましょう。
-bash-4.2$ vi data/pg_hba.conf
以下を追記
---
local all spark md5
host all spark 0.0.0.0/0 md5
host all spark ::/0 md5
---
postgresqlサービスの再起動
postgresqユーザーからrootへあがり、サービスの再起動を行います。
-bash-4.2$ exit
#
# systemctl restart postgresql.service
sparkというDBを作成
# su - postgres
-bash-4.2$ createdb -U spark spark
Password:
psqlで接続確認
-bash-4.2$ psql
psql (9.2.15)
Type "help" for help.
postgres=# \l
List of databases
Name | Owner | Encoding | Collate | Ctype | Access privileges
-----------+----------+----------+-------------+-------------+----------------------
-
ambari | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | =Tc/postgres
+
| | | | | postgres=CTc/postgres
+
| | | | | ambari=CTc/postgres
ambarirca | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | =Tc/postgres
+
| | | | | postgres=CTc/postgres
+
| | | | | mapred=CTc/postgres
postgres | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 |
spark | spark | UTF8 | en_US.UTF-8 | en_US.UTF-8 |
template0 | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | =c/postgres
+
| | | | | postgres=CTc/postgres
template1 | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | =c/postgres
+
| | | | | postgres=CTc/postgres
postgres=# \q
-bash-4.2$ psql -U spark spark
spark=> \d
No relations found.
Postgresqlに接続する環境を整える
全てのノードで同じ作業が必要が必要です。
今回はsparkユーザーで作業するので以下から
https://jdbc.postgresql.org/download.html
postgresql-9.4.1209.jarをDLして各nodeに配置しましょう。
ここでの作業は割愛します。MacでDLしてSCPでアップするなりしてください。
読み込むCSVを準備する
今回は
去年くらいに話題になった
賞金稼ぎから仕事探しまで、世界のデータサイエンティストが「Kaggle」に集まる理由で紹介されている。
リクルートさんが公開してるデータを使ってみましょう。
例のごとくMacでダウンロードしてzipを解答して以下のようにscpでアップしてください。
$ scp ./coupon_detail_train.csv vagrant@172.168.1.11:~/
sparkユーザーの/home/sparkに移動させる
$ sudo su -
# mv coupon_area_test.csv coupon_area_train.csv coupon_detail_train.csv coupon_list_test.csv coupon_list_train.csv user_list.csv prefecture_locations.csv /home/spark/
# chown spark:hadoop /home/spark/*.csv
# ls -l /home/spark/
-rw-r--r-- 1 spark hadoop 119943 Jul 22 03:26 coupon_area_test.csv
-rw-r--r-- 1 spark hadoop 7542291 Jul 22 03:26 coupon_area_train.csv
-rw-r--r-- 1 spark hadoop 23844951 Jul 22 03:26 coupon_detail_train.csv
-rw-r--r-- 1 spark hadoop 59163 Jul 22 03:25 coupon_list_test.csv
-rw-r--r-- 1 spark hadoop 3680603 Jul 22 03:26 coupon_list_train.csv
-rw-r--r-- 1 spark hadoop 689278 Jul 15 13:03 postgresql-9.4.1209.jar
-rw-r--r-- 1 spark hadoop 2046 Jul 22 04:14 prefecture_locations.csv
-rw-r--r-- 1 spark hadoop 1579780 Jul 22 04:13 user_list.csv
hadoopコマンドでファイルを登録する
HDFSを使ったことない人はいきなりなんだって思いますが、各ノードのどこからでもアクセス出来るようにHDFSにファイルを登録しましょう。
# sudo su - spark
$ hadoop fs -put ./*.csv /user/spark/
$ hadoop fs -ls /user/spark/
Found 8 items
drwxr-xr-x - spark hdfs 0 2016-07-22 03:41 /user/spark/.sparkStaging
-rw-r--r-- 3 spark hdfs 119943 2016-07-22 04:22 /user/spark/coupon_area_test.csv
-rw-r--r-- 3 spark hdfs 7542291 2016-07-22 04:22 /user/spark/coupon_area_train.csv
-rw-r--r-- 3 spark hdfs 23844951 2016-07-22 04:22 /user/spark/coupon_detail_train.csv
-rw-r--r-- 3 spark hdfs 59163 2016-07-22 04:22 /user/spark/coupon_list_test.csv
-rw-r--r-- 3 spark hdfs 3680603 2016-07-22 04:22 /user/spark/coupon_list_train.csv
-rw-r--r-- 3 spark hdfs 2046 2016-07-22 04:22 /user/spark/prefecture_locations.csv
-rw-r--r-- 3 spark hdfs 1579780 2016-07-22 04:22 /user/spark/user_list.csv
ライブラリを読み込んだ状態でSparkShellを起動
ここまで長かったですね。よく頑張りました。
今回はCSVを読み込みするので「com.databricks:spark-csv_2.11:1.3.0」とdownloadし配置した
「postgresql-9.4.1209.jar」を読み込んで起動します。
※com.databricks:spark-csv_2.11:1.4.0だとcsv出力出来ないのでcom.databricks:spark-csv_2.11:1.3.0に変更 by 2016/07/30 10:25
# sudo su - spark
$ cd ~/
$ wget https://jdbc.postgresql.org/download/postgresql-9.4.1209.jar
$ cd /usr/hdp/current/spark-client/
$ ./bin/spark-shell --master yarn --deploy-mode client --conf spark.executor.memory=512m --conf spark.executor.cores=1 --packages com.databricks:spark-csv_2.11:1.4.0 --driver-class-path /home/spark/postgresql-9.4.1209.jar --jars /home/spark/postgresql-9.4.1209.jar
dataframeでcsvを読み込んでみる。
sparkで用意されているdataframeという仕組みを使ってCSVを読み込んで見ましょう。
csvを読み込みdfオブジェクト作成
scala> val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("user_list.csv")
<<中略>>
16/07/22 04:24:40 INFO DAGScheduler: Job 1 finished: aggregate at InferSchema.scala:41, took 1.572468 s
df: org.apache.spark.sql.DataFrame = [REG_DATE: timestamp, SEX_ID: string, AGE: int, WITHDRAW_DATE: string, PREF_NAME: string, USER_ID_hash: string]
dfオブジェクトの中身を5件だけ見てみる
scala> df.take(5)
16/07/22 04:25:08 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 314.3 KB, free 1013.4 KB)
16/07/22 04:25:08 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 26.5 KB, free 1039.9 KB)
16/07/22 04:25:08 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.168.1.11:54982 (size: 26.5 KB, free: 511.0 MB)
<<中略>>
16/07/22 04:25:08 INFO DAGScheduler: Job 2 finished: take at <console>:28, took 0.245868 s
res0: Array[org.apache.spark.sql.Row] = Array([2012-03-28 14:14:18.0,f,25,NA,,d9dca3cb44bab12ba313eaa681f663eb], [2011-05-18 00:41:48.0,f,34,NA,東京都,560574a339f1b25e57b0221e486907ed], [2011-06-13 16:36:58.0,m,41,NA,愛知県,e66ae91b978b3229f8fd858c80615b73], [2012-02-08 12:56:15.0,m,25,NA,,43fc18f32eafb05713ec02935e2c2825], [2011-05-22 23:43:56.0,m,62,NA,神奈川県,dc6df8aa860f8db0d710ce9d4839840f])
dfオブジェクトをpostgresqlに格納
user_listというテーブルに読みこんだCSVの中身を格納してみましょう。
import java.util.Properties
import org.apache.spark.sql.SaveMode
lazy val dbAuth = {
val prop = new Properties()
prop.setProperty ("user","spark")
prop.setProperty ("password","spark")
prop.setProperty ("driver","org.postgresql.Driver")
prop
}
df.write.mode(SaveMode.Overwrite).jdbc("jdbc:postgresql://172.168.1.11:5432/spark", "user_list", dbAuth)
以下のコマンドでspark-shellを抜けましょう。
scala> exit
psqlで確認
では実際にpostgresに登録されているか確認して見ましょう。
一度rootに上がってからpostgresユーザーになってからpsqlで確認してみます。
$ exit
# sudo su - postgres
-bash-4.2$ psql -U spark spark
Password for user spark:
spark=> \d
List of relations
Schema | Name | Type | Owner
--------+-----------+-------+-------
public | user_list | table | spark
(1 row)
spark=> select count(*) from user_list;
count
-------
22873
(1 row)
無事に登録されていますね。
今回は以上です。postgresやhdfsやたくさん新しいのが出てきて
おえってなったと思いますが、懲りずに勉強しましょう。
※ただのSparkのstandalone構成だったら簡単なんだけどね。。。
編集後記
第2章の後にどういう方向でネタ作るかなぁと思ったんですが、鉄板なCSVなどのファイルから読み込んで、加工や集計を行った結果をDBに格納するみたいな流れが実務的で良いかなと思ってこんな感じにしてみました。
実際にSparkに触れてみて、遅いじゃんって思った人も多いと思います。
vagrant上でCluster組んでるっていうのもあるし、まだYarnのClusterを引き出すような内容をやっているのでそう感じるだけです。
RMDBだけでも出来るよってお思いのあなた。はい、やろうと思えば出来ます。
ただ、本番環境でRMDBだけで、ブンブンと集計処理を回したりして、フロント側のサービスが遅くなったりしたら、意味がないですよね。適材適所でアーキテクチャ考えて、処理させるのがいいと思います。
次回は第4章 〜Apache Hadoop,YARN,Sparkで集計してサマリーをpostgresqlに格納してみる編を公開する予定です。