1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

OhoAdvent Calendar 2019

Day 6

ParquetにしたOSMのデータをEMRでいじってみる

Last updated at Posted at 2019-12-05

目的

昨日PBFからParquetに変換したOpen Street Mapのデータを、AWS EMRでspark-sqlを使って触ってみる。
全球データ=大きい → 分散処理したい → AWS EMR

今回は、こちらこちらを参考にして、データ処理していきます。

準備

S3にParquetのデータをアップロード

前回作ったParquetのデータ3ファイルをS3のバケットにアップロードします。僕は簡単にAWSコンソールからアップロードしましたが、aws-cliを使うなり方法は他にもありますね。

キーペアの用意

EMRクラスタ用のキーペアを作って、ダウンロードしておきます。使うOSやSSHクライアントに合わせて鍵ファイルを選んでください。

クラスタの立ち上げ

AWSコンソールからEMRクラスタを立ち上げます。
ログ記録はなしで、Sparkクラスタを指定します。クラウド破産も怖いので、インスタンス数も2個くらいにしておきましょう。インスタンスタイプももっと小さいインスタンスでいい気がしますが、デフォルトで指定されているm5.xlargeをそのまま使っておきます。
「クラスターを作成」すると、しばらくしてクラスターが準備完了になります。

セキュリティグループを設定してsshログイン

セキュリティグループがデフォルトで作成されますが、SSHが開いていないので、セキュリティグループを編集してmasterの方だけ外からのSSHを開けます。これで、先ほどのキーペアを使って手元からSSHログインできるようになります。

spark-shellを使って処理してみる

では、立ち上がったEMRクラスタを使って、Parquetに書き出したOSMデータを処理してみます。spark-shellを立ち上げます。

$ spark-shell

データファイルを読み込みます。

scala> import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._

scala> val sqlContext = new SQLContext(sc)

scala> val df = sqlContext.read.parquet("s3://bucket-name/Tokyo.osm.pbf.node.parquet")

スキーマを確認します。

scala> df.printSchema()
root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- changeset: long (nullable = true)
 |-- uid: integer (nullable = true)
 |-- user_sid: binary (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: binary (nullable = true)
 |    |    |-- value: binary (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

緯度と経度だけを抜き出してみます。

scala> df.registerTempTable("Node")

scala> val df3 = sqlContext.sql("SELECT id,latitude,longitude FROM Node")

scala> df3.show
+--------+------------------+------------------+
|      id|          latitude|         longitude|
+--------+------------------+------------------+
|31236558|        35.6351506|139.76784320000002|
|31236561|35.635041900000004|139.76785900000002|
|31236562|        35.6379133|       139.7591729|
|31236563|35.638182900000004|       139.7585998|
|31236564|35.638516100000004|139.75823490000002|
|31236565|35.638767200000004|       139.7580599|
|31236566|        35.6390562|       139.7579335|
|31236567|35.639158200000004|       139.7579045|
|31236568|        35.6393004|       139.7578741|
|31236569|        35.6378794|       139.7589461|
|31236570|35.637997500000004|       139.7586995|
|31236571|        35.6382569|       139.7583386|
|31236572|        35.6384817|139.75812480000002|
|31236573|35.638980700000005|       139.7578382|
|31236574|35.639336400000005|       139.7577578|
|31236575|        35.6395142|        139.757745|
|31236576|        35.6347773|       139.7699528|
|31236579|35.634913100000006|       139.7705506|
|31236580|        35.6350156|       139.7709312|
|31236581|35.635654800000005|       139.7727102|
+--------+------------------+------------------+
only showing top 20 rows

結果をS3に保存します。

scala> df3.rdd.saveAsTextFile("s3://bucket-name/Tokyo.osm.pbf.node.txt")

まとめ

なんかいけそう!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?