目的
昨日PBFからParquetに変換したOpen Street Mapのデータを、AWS EMRでspark-sqlを使って触ってみる。
全球データ=大きい → 分散処理したい → AWS EMR
今回は、こちらとこちらを参考にして、データ処理していきます。
準備
S3にParquetのデータをアップロード
前回作ったParquetのデータ3ファイルをS3のバケットにアップロードします。僕は簡単にAWSコンソールからアップロードしましたが、aws-cliを使うなり方法は他にもありますね。
キーペアの用意
EMRクラスタ用のキーペアを作って、ダウンロードしておきます。使うOSやSSHクライアントに合わせて鍵ファイルを選んでください。
クラスタの立ち上げ
AWSコンソールからEMRクラスタを立ち上げます。
ログ記録はなしで、Sparkクラスタを指定します。クラウド破産も怖いので、インスタンス数も2個くらいにしておきましょう。インスタンスタイプももっと小さいインスタンスでいい気がしますが、デフォルトで指定されているm5.xlargeをそのまま使っておきます。
「クラスターを作成」すると、しばらくしてクラスターが準備完了になります。
セキュリティグループを設定してsshログイン
セキュリティグループがデフォルトで作成されますが、SSHが開いていないので、セキュリティグループを編集してmasterの方だけ外からのSSHを開けます。これで、先ほどのキーペアを使って手元からSSHログインできるようになります。
spark-shellを使って処理してみる
では、立ち上がったEMRクラスタを使って、Parquetに書き出したOSMデータを処理してみます。spark-shellを立ち上げます。
$ spark-shell
データファイルを読み込みます。
scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
scala> val sqlContext = new SQLContext(sc)
scala> val df = sqlContext.read.parquet("s3://bucket-name/Tokyo.osm.pbf.node.parquet")
スキーマを確認します。
scala> df.printSchema()
root
|-- id: long (nullable = true)
|-- version: integer (nullable = true)
|-- timestamp: long (nullable = true)
|-- changeset: long (nullable = true)
|-- uid: integer (nullable = true)
|-- user_sid: binary (nullable = true)
|-- tags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: binary (nullable = true)
| | |-- value: binary (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
緯度と経度だけを抜き出してみます。
scala> df.registerTempTable("Node")
scala> val df3 = sqlContext.sql("SELECT id,latitude,longitude FROM Node")
scala> df3.show
+--------+------------------+------------------+
| id| latitude| longitude|
+--------+------------------+------------------+
|31236558| 35.6351506|139.76784320000002|
|31236561|35.635041900000004|139.76785900000002|
|31236562| 35.6379133| 139.7591729|
|31236563|35.638182900000004| 139.7585998|
|31236564|35.638516100000004|139.75823490000002|
|31236565|35.638767200000004| 139.7580599|
|31236566| 35.6390562| 139.7579335|
|31236567|35.639158200000004| 139.7579045|
|31236568| 35.6393004| 139.7578741|
|31236569| 35.6378794| 139.7589461|
|31236570|35.637997500000004| 139.7586995|
|31236571| 35.6382569| 139.7583386|
|31236572| 35.6384817|139.75812480000002|
|31236573|35.638980700000005| 139.7578382|
|31236574|35.639336400000005| 139.7577578|
|31236575| 35.6395142| 139.757745|
|31236576| 35.6347773| 139.7699528|
|31236579|35.634913100000006| 139.7705506|
|31236580| 35.6350156| 139.7709312|
|31236581|35.635654800000005| 139.7727102|
+--------+------------------+------------------+
only showing top 20 rows
結果をS3に保存します。
scala> df3.rdd.saveAsTextFile("s3://bucket-name/Tokyo.osm.pbf.node.txt")
まとめ
なんかいけそう!