LoginSignup
12
15

More than 3 years have passed since last update.

Glueの使い方的な㊴(RDSからAmazon Elasticsearch Serviceにロード)

Last updated at Posted at 2019-06-02

Glueを使ってVPC内のRDS(JDBC繋がればEC2でもOK)からVPC内のAmazon Elasticsearch Serviceにロードを行います。

作成される構成は以下です。

se2-39-Page-1 (1).png

①: GlueがVPC内にENIを生やす
②: ①のENIからRDSへ接続
③,④: RDSのデータをGlueにロードし、Glueで変換処理
⑤,⑥: GlueからENIを介してAmazon ESに書き込み

前提

前提となる構成は以下です。

GlueからRDSへのロード部分は以下の㉟記事の内容をベースにしています。この記事内で作成したリソースもそのまま使うので、必要な方は㉟の記事をご参照ください

  • Glue接続: se2-35-connect (GlueとRDSの接続)
  • RDS: se2-35-rds
  • VPC: demo1-vpc
  • セキュリティグループ(Glueが使うやつ): se2-35-glue
  • セキュリティグループ(RDS): se2-35-rds
  • 必要なら確認用のEC2も作っておく

前提となる構成(㉟の記事の構成)

se2-39-Page-2.png

今回作るリソース

  • セキュリティグループ: se2-39-es-sg2 (AmazonES用のSG)
  • Amazon ES: se2-39-es2
  • Glueクローラー: se2_in14 (RDSをクローリングしてテーブル作る)
  • Glueジョブ: se2_job23 (RDS->AmazonESのGlueジョブ)
  • Glue接続: se2-39-connect (GlueからRDSやAmazonESへの接続)

この辺が複雑なので少し意識してもらえたらと

自己参照セキュリティグループ

自己参照セキュリティグループは自分自身のセキュリティグループから通信を許可したセキュリティグループです。Glue自体はパブリックなAWSサービスなので、Glue接続にこの自己参照セキュリティグループをアタッチすることで、セキュリティグループを起点としたENIが生え、GlueがVPC内のリソース(RDS,Redshift)にアクセスすることができます。GlueがVPC内へのアクセスを行う際に使います(ドキュメントにも書かれてます)

Glueのダミー接続

Glue接続は接続タイプにRedshift,RDS,JDBCから選べますが、JDBCを選択し適当なJDBC URL,ユーザー,パスワードを入れることでダミーのGlue接続を作り、"VPCエンドポイント経由でのS3"や今回の"VPC内のAmazonES"へのGlueからの接続に使うことができます。ちょっとハックっぽい(ドキュメントに書かれてないかも)

全体の流れ

  • Amazon ES作成
  • Glueクローラーでテーブル作成
  • Glueの"接続"のテスト
  • Glueの"ジョブ"の作成と実行

AmazonES 作成

セキュリティグループ作成

以下のパラメータでSGを作成

  • セキュリティグループ名: se2-39-es-sg2
  • 説明: 任意
  • VPC: demo1-vpc(RDSがあるVPC)
  • インバウンドルール:GlueのSGから全てを許可、確認用EC2のSGから全てを許可

以下のパラメータでAmazonESを作成(指定がないパラメータはデフォルト)

Elasticsearchドメイン名: se2-39-es2
デプロイタイプ: 開発およびテスト
Elasticsearchバージョン: 6.7
VPC: demo1-vpc(RDSがあるVPC)
サブネット: PrivateSubnet(demo1-vpcのプライベートサブネット)
セキュリティグループ: se2-39-es-sg2
ドメインアクセスポリシー設定: "IAM認証情報を使用した署名リクエストを要求しない"

Glueクローラーでテーブル作成

Glueの画面で、左側メニューの"クローラ"をクリックし、[クローラの追加]をクリック

スクリーンショット 0031-06-02 19.12.51.png

以下を入れ[次へ]
クローラーの名前: se2_in14

以下を入れ[次へ]
Crawler source type: "Data stores"

以下を入れ[次へ]

Choose a data store: JDBC
接続: "se2-35-connect"
インクルードパス: db/cvlog

スクリーンショット 0031-06-02 19.15.16.png

[次へ]、[次へ]、IAMロールを"test-glue"を選び[次へ]、[次へ]をクリックし
以下を入力し[次へ]、[完了]、

データベース: se2
テーブルに追加されたプロフィックス(省略可能): se2_39_

スクリーンショット 0031-06-02 19.17.28.png

"今すぐ実行しますか?"の緑色の文字をクリック

スクリーンショット 0031-06-02 19.18.54.png

Glueの左側メニューの、テーブルをクリックし、"se2_39_db_cvlog"のテーブルが作成されていることを確認

スクリーンショット 0031-06-02 19.22.08.png

スクリーンショット 0031-06-02 19.22.23.png

Glueの"接続"のテスト

ここでの通信フローと構成は以下です。

自己参照セキュリティグループを使ったフロー

自己参照セキュリティグループを使ったより正確なGlueの通信フローは以下のようになります。
Glue自体はパブリックなAWSサービスで、Glueに自己参照セキュリティグループをアタッチすることでVPC内のリソース(RDS,Redshift)にアクセスすることができます

se2-39-Page-4.png

Glue接続はGlueからJDBCでの接続のための定義で、今回は㉟で作成済の"se2-35-connect"を使う。
接続状況確認のため、Glueの画面を開き、左側メニューの"接続"をクリックし、"se2-35-connect"にチェックを入れ、[接続のテスト]をクリックする

スクリーンショット 0031-06-02 17.52.25.png

IAMロールに"test-glue"を選択し、[接続のテスト]をクリックする。IAMロールはクローラーやジョブを実行しているものと同じ内容で構わない

スクリーンショット 0031-06-02 17.56.54.png

こうなればOK

スクリーンショット 0031-06-02 17.55.49.png

Glueの"ジョブ"の作成と実行

今回使うJARファイルをS3にアップロードしておく

  • JARファイル: elasticsearch-hadoop-7.1.0.jar

Sparkからelasticsearchへデータの読み書きをするためにElastic社が提供しているライブラリ
https://qiita.com/48hands/items/a8136eef0841a71dec43
http://www.intellilink.co.jp/article/column/bigdata-kk02.html

以下のリンクなどから"elasticsearch-hadoop-7.1.0.jar"をダウンロードし、S3にアップロードしておく
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/7.1.0

ここでの通信フローと構成は以下です。

se2-39-Page-1 (1).png

Glueのメニューからジョブをクリックし、[ジョブの追加]をクリックする

スクリーンショット 0030-09-09 16.41.32.png

以下を入力し、[次へ]をクリックする
名前:se2-job23
IAMロール名:test-glue

スクリーンショット 0031-06-02 18.51.55.png

"セキュリティ設定、スクリプトライブラリおよびジョブパラメータ(省略可能)"をクリックし、"依存JARSパス"に、elasticsearch-hadoop-7.1.0.jarをアップロードしたS3パスを入力する

スクリーンショット 0031-06-02 18.52.35.png

データソースで、"se2_39_db_cvlog"にチェックを入れ、[次へ]をクリックする

スクリーンショット 0031-06-02 18.55.53 1.png

以下を入力し、[次へ]をクリックする (ターゲットはAmazonESなので、ここの値はあとで修正する)
データストア:JDBC
接続:se2-35-connect
データベース名:db

ここでJDBCをターゲットに指定しているが、今回のAmazonESへの出力に使う。ここで指定しているGlue接続の"se2-35-connect"はRDSへのGlue接続だが、AmazonESへの書き込みにも使える。AmazonESへの接続という視点だとこれはダミー接続とも言える。のちほどコードのwriteの箇所はAmazonESへの出力に書き換える。

スクリーンショット 0031-06-02 18.58.30.png

このまま[次へ]をクリックし、次の画面で[ジョブを保存してスクリプトを編集する]をクリック

スクリーンショット 0030-09-09 16.43.27.png

コードの一部を修正する。作成されたコードのwriteの箇所をコメントアウトし、toDF()して、elasticsearch-hadoopを使ったWriteに書き換える
変更箇所は下から2行目

修正前

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "se2-39-connect", connection_options = {"dbtable": "se2_39_db_cvlog", "database": "db"}, transformation_ctx = "datasink4")

job.commit()

修正後

#datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "se2-39-connect", connection_options = {"dbtable": "se2_39_db_cvlog", "database": "db"}, transformation_ctx = "datasink4")

df0 = dropnullfields3.toDF()

df0.write.format("org.elasticsearch.spark.sql").option("es.nodes.wan.only","true").option("es.port","443").option("es.net.ssl","true").option("es.nodes", "vpc-se2-39-es2-xxxxx.ap-northeast-1.es.amazonaws.com").mode("Overwrite").save("test/cvlog")

job.commit()

画面左上の[ジョブの実行]をクリックする

今回はGlueジョブによるAmazonESへの書き込みのタイミングでIndexを作っているが、実際に使う場合は事前にElasticsearchのIndexを作成しておいた方がいい

確認用EC2から確認

ここでの通信フローと構成は以下です。

se2-39-Page-3 (1).png

ログインし、curlコマンドでElasticsearchのSearch APIで確認(見やすいように改行入れてます)

$ curl -XGET https://vpc-se2-39-es2-xxxx.ap-northeast-1.es.amazonaws.com/test/_search/
{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":19,"max_score":1.0,"hits":[
{"_index":"test","_type":"cvlog","_id":"CbPrFmsBfzGjTnMwbRNs","_score":1.0,"_source":{"country":"FR","month":12,"hour":18,"year":2017,"appid":9,"deviceid":"pc","uuid":11117,"day":2}},
{"_index":"test","_type":"cvlog","_id":"CrPrFmsBfzGjTnMwbRNx","_score":1.0,"_source":{"country":"JP","month":12,"hour":11,"year":2017,"appid":1,"deviceid":"iphon","uuid":11116,"day":15}},
{"_index":"test","_type":"cvlog","_id":"DrPrFmsBfzGjTnMwcxPC","_score":1.0,"_source":{"country":"JP","month":11,"hour":15,"year":2017,"appid":5,"deviceid":"other","uuid":11110,"day":29}},
{"_index":"test","_type":"cvlog","_id":"ELPrFmsBfzGjTnMwcxPZ","_score":1.0,"_source":{"country":"FR","month":11,"hour":20,"year":2017,"appid":1,"deviceid":"andro","uuid":11122,"day":30}},
{"_index":"test","_type":"cvlog","_id":"ErPrFmsBfzGjTnMwdBNW","_score":1.0,"_source":{"country":"AUS","month":12,"hour":14,"year":2017,"appid":7,"deviceid":"iphon","uuid":11124,"day":17}},
{"_index":"test","_type":"cvlog","_id":"FLPrFmsBfzGjTnMwdBN9","_score":1.0,"_source":{"country":"JP","month":12,"hour":8,"year":2017,"appid":1,"deviceid":"iphon","uuid":11126,"day":19}},
{"_index":"test","_type":"cvlog","_id":"BbPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"AUS","month":12,"hour":18,"year":2017,"appid":7,"deviceid":"iphon","uuid":11114,"day":17}},
{"_index":"test","_type":"cvlog","_id":"BrPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"JP","month":12,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11111,"day":14}},
{"_index":"test","_type":"cvlog","_id":"D7PrFmsBfzGjTnMwcxPU","_score":1.0,"_source":{"country":"JP","month":11,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11121,"day":11}},
{"_index":"test","_type":"cvlog","_id":"FrPrFmsBfzGjTnMwdRM1","_score":1.0,"_source":{"country":"FR","month":12,"hour":4,"year":2017,"appid":9,"deviceid":"iphon","uuid":11128,"day":9}}]}}

ログインし、curlコマンドでOpenDistroのSQLを使って、ElasticsearchにSQLを実行し確認(見やすいように改行入れてます)

OpenDistroのSQLについて
https://opendistro.github.io/for-elasticsearch-docs/docs/sql/

$ curl -XPOST https://vpc-se2-39-es2-tlv3sd7uwrosmj42k6adzdczky.ap-northeast-1.es.amazonaws.com/_opendistro/_sql -k -d '{"query": "SELECT * FROM test LIMIT 10"}' -H 'Content-Type: application/json'
{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":19,"max_score":1.0,"hits":[
{"_index":"test","_type":"cvlog","_id":"CbPrFmsBfzGjTnMwbRNs","_score":1.0,"_source":{"country":"FR","month":12,"hour":18,"year":2017,"appid":9,"deviceid":"pc","uuid":11117,"day":2}},
{"_index":"test","_type":"cvlog","_id":"CrPrFmsBfzGjTnMwbRNx","_score":1.0,"_source":{"country":"JP","month":12,"hour":11,"year":2017,"appid":1,"deviceid":"iphon","uuid":11116,"day":15}},
{"_index":"test","_type":"cvlog","_id":"DrPrFmsBfzGjTnMwcxPC","_score":1.0,"_source":{"country":"JP","month":11,"hour":15,"year":2017,"appid":5,"deviceid":"other","uuid":11110,"day":29}},
{"_index":"test","_type":"cvlog","_id":"ELPrFmsBfzGjTnMwcxPZ","_score":1.0,"_source":{"country":"FR","month":11,"hour":20,"year":2017,"appid":1,"deviceid":"andro","uuid":11122,"day":30}},
{"_index":"test","_type":"cvlog","_id":"ErPrFmsBfzGjTnMwdBNW","_score":1.0,"_source":{"country":"AUS","month":12,"hour":14,"year":2017,"appid":7,"deviceid":"iphon","uuid":11124,"day":17}},
{"_index":"test","_type":"cvlog","_id":"FLPrFmsBfzGjTnMwdBN9","_score":1.0,"_source":{"country":"JP","month":12,"hour":8,"year":2017,"appid":1,"deviceid":"iphon","uuid":11126,"day":19}},
{"_index":"test","_type":"cvlog","_id":"BbPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"AUS","month":12,"hour":18,"year":2017,"appid":7,"deviceid":"iphon","uuid":11114,"day":17}},
{"_index":"test","_type":"cvlog","_id":"BrPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"JP","month":12,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11111,"day":14}},
{"_index":"test","_type":"cvlog","_id":"D7PrFmsBfzGjTnMwcxPU","_score":1.0,"_source":{"country":"JP","month":11,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11121,"day":11}},
{"_index":"test","_type":"cvlog","_id":"FrPrFmsBfzGjTnMwdRM1","_score":1.0,"_source":{"country":"FR","month":12,"hour":4,"year":2017,"appid":9,"deviceid":"iphon","uuid":11128,"day":9}}]}}

Glueにおける自己参照ルールの補足

こちらも是非

Glueの使い方的な㉟(RDBにwhereでロードするデータを絞る)
https://qiita.com/pioho07/items/1e52672fb58ee88e9aa7

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

12
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
15