Glueを使ってVPC内のRDS(JDBC繋がればEC2でもOK)からVPC内のAmazon Elasticsearch Serviceにロードを行います。
作成される構成は以下です。
①: GlueがVPC内にENIを生やす
②: ①のENIからRDSへ接続
③,④: RDSのデータをGlueにロードし、Glueで変換処理
⑤,⑥: GlueからENIを介してAmazon ESに書き込み
前提
前提となる構成は以下です。
GlueからRDSへのロード部分は以下の㉟記事の内容をベースにしています。この記事内で作成したリソースもそのまま使うので、必要な方は㉟の記事をご参照ください
- Glue接続: se2-35-connect (GlueとRDSの接続)
- RDS: se2-35-rds
- VPC: demo1-vpc
- セキュリティグループ(Glueが使うやつ): se2-35-glue
- セキュリティグループ(RDS): se2-35-rds
- 必要なら確認用のEC2も作っておく
前提となる構成(㉟の記事の構成)
今回作るリソース
- セキュリティグループ: se2-39-es-sg2 (AmazonES用のSG)
- Amazon ES: se2-39-es2
- Glueクローラー: se2_in14 (RDSをクローリングしてテーブル作る)
- Glueジョブ: se2_job23 (RDS->AmazonESのGlueジョブ)
- Glue接続: se2-39-connect (GlueからRDSやAmazonESへの接続)
この辺が複雑なので少し意識してもらえたらと
自己参照セキュリティグループ
自己参照セキュリティグループは自分自身のセキュリティグループから通信を許可したセキュリティグループです。Glue自体はパブリックなAWSサービスなので、Glue接続にこの自己参照セキュリティグループをアタッチすることで、セキュリティグループを起点としたENIが生え、GlueがVPC内のリソース(RDS,Redshift)にアクセスすることができます。GlueがVPC内へのアクセスを行う際に使います(ドキュメントにも書かれてます)
Glueのダミー接続
Glue接続は接続タイプにRedshift,RDS,JDBCから選べますが、JDBCを選択し適当なJDBC URL,ユーザー,パスワードを入れることでダミーのGlue接続を作り、"VPCエンドポイント経由でのS3"や今回の"VPC内のAmazonES"へのGlueからの接続に使うことができます。ちょっとハックっぽい(ドキュメントに書かれてないかも)
全体の流れ
- Amazon ES作成
- Glueクローラーでテーブル作成
- Glueの"接続"のテスト
- Glueの"ジョブ"の作成と実行
AmazonES 作成
セキュリティグループ作成
以下のパラメータでSGを作成
- セキュリティグループ名: se2-39-es-sg2
- 説明: 任意
- VPC: demo1-vpc(RDSがあるVPC)
- インバウンドルール:GlueのSGから全てを許可、確認用EC2のSGから全てを許可
以下のパラメータでAmazonESを作成(指定がないパラメータはデフォルト)
Elasticsearchドメイン名: se2-39-es2
デプロイタイプ: 開発およびテスト
Elasticsearchバージョン: 6.7
VPC: demo1-vpc(RDSがあるVPC)
サブネット: PrivateSubnet(demo1-vpcのプライベートサブネット)
セキュリティグループ: se2-39-es-sg2
ドメインアクセスポリシー設定: "IAM認証情報を使用した署名リクエストを要求しない"
Glueクローラーでテーブル作成
Glueの画面で、左側メニューの"クローラ"をクリックし、[クローラの追加]をクリック
以下を入れ[次へ]
クローラーの名前: se2_in14
以下を入れ[次へ]
Crawler source type: "Data stores"
以下を入れ[次へ]
Choose a data store: JDBC
接続: "se2-35-connect"
インクルードパス: db/cvlog
[次へ]、[次へ]、IAMロールを"test-glue"を選び[次へ]、[次へ]をクリックし
以下を入力し[次へ]、[完了]、
データベース: se2
テーブルに追加されたプロフィックス(省略可能): se2_39_
"今すぐ実行しますか?"の緑色の文字をクリック
Glueの左側メニューの、テーブルをクリックし、"se2_39_db_cvlog"のテーブルが作成されていることを確認
Glueの"接続"のテスト
ここでの通信フローと構成は以下です。
自己参照セキュリティグループを使ったフロー
自己参照セキュリティグループを使ったより正確なGlueの通信フローは以下のようになります。
Glue自体はパブリックなAWSサービスで、Glueに自己参照セキュリティグループをアタッチすることでVPC内のリソース(RDS,Redshift)にアクセスすることができます
Glue接続はGlueからJDBCでの接続のための定義で、今回は㉟で作成済の"se2-35-connect"を使う。
接続状況確認のため、Glueの画面を開き、左側メニューの"接続"をクリックし、"se2-35-connect"にチェックを入れ、[接続のテスト]をクリックする
IAMロールに"test-glue"を選択し、[接続のテスト]をクリックする。IAMロールはクローラーやジョブを実行しているものと同じ内容で構わない
こうなればOK
Glueの"ジョブ"の作成と実行
今回使うJARファイルをS3にアップロードしておく
- JARファイル: elasticsearch-hadoop-7.1.0.jar
Sparkからelasticsearchへデータの読み書きをするためにElastic社が提供しているライブラリ
https://qiita.com/48hands/items/a8136eef0841a71dec43
http://www.intellilink.co.jp/article/column/bigdata-kk02.html
以下のリンクなどから"elasticsearch-hadoop-7.1.0.jar"をダウンロードし、S3にアップロードしておく
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/7.1.0
ここでの通信フローと構成は以下です。
Glueのメニューからジョブをクリックし、[ジョブの追加]をクリックする
以下を入力し、[次へ]をクリックする
名前:se2-job23
IAMロール名:test-glue
"セキュリティ設定、スクリプトライブラリおよびジョブパラメータ(省略可能)"をクリックし、"依存JARSパス"に、elasticsearch-hadoop-7.1.0.jarをアップロードしたS3パスを入力する
データソースで、"se2_39_db_cvlog"にチェックを入れ、[次へ]をクリックする
以下を入力し、[次へ]をクリックする (ターゲットはAmazonESなので、ここの値はあとで修正する)
データストア:JDBC
接続:se2-35-connect
データベース名:db
ここでJDBCをターゲットに指定しているが、今回のAmazonESへの出力に使う。ここで指定しているGlue接続の"se2-35-connect"はRDSへのGlue接続だが、AmazonESへの書き込みにも使える。AmazonESへの接続という視点だとこれはダミー接続とも言える。のちほどコードのwriteの箇所はAmazonESへの出力に書き換える。
このまま[次へ]をクリックし、次の画面で[ジョブを保存してスクリプトを編集する]をクリック
コードの一部を修正する。作成されたコードのwriteの箇所をコメントアウトし、toDF()して、elasticsearch-hadoopを使ったWriteに書き換える
変更箇所は下から2行目
修正前
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "se2-39-connect", connection_options = {"dbtable": "se2_39_db_cvlog", "database": "db"}, transformation_ctx = "datasink4")
job.commit()
修正後
#datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "se2-39-connect", connection_options = {"dbtable": "se2_39_db_cvlog", "database": "db"}, transformation_ctx = "datasink4")
df0 = dropnullfields3.toDF()
df0.write.format("org.elasticsearch.spark.sql").option("es.nodes.wan.only","true").option("es.port","443").option("es.net.ssl","true").option("es.nodes", "vpc-se2-39-es2-xxxxx.ap-northeast-1.es.amazonaws.com").mode("Overwrite").save("test/cvlog")
job.commit()
画面左上の[ジョブの実行]をクリックする
今回はGlueジョブによるAmazonESへの書き込みのタイミングでIndexを作っているが、実際に使う場合は事前にElasticsearchのIndexを作成しておいた方がいい
確認用EC2から確認
ここでの通信フローと構成は以下です。
ログインし、curlコマンドでElasticsearchのSearch APIで確認(見やすいように改行入れてます)
$ curl -XGET https://vpc-se2-39-es2-xxxx.ap-northeast-1.es.amazonaws.com/test/_search/
{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":19,"max_score":1.0,"hits":[
{"_index":"test","_type":"cvlog","_id":"CbPrFmsBfzGjTnMwbRNs","_score":1.0,"_source":{"country":"FR","month":12,"hour":18,"year":2017,"appid":9,"deviceid":"pc","uuid":11117,"day":2}},
{"_index":"test","_type":"cvlog","_id":"CrPrFmsBfzGjTnMwbRNx","_score":1.0,"_source":{"country":"JP","month":12,"hour":11,"year":2017,"appid":1,"deviceid":"iphon","uuid":11116,"day":15}},
{"_index":"test","_type":"cvlog","_id":"DrPrFmsBfzGjTnMwcxPC","_score":1.0,"_source":{"country":"JP","month":11,"hour":15,"year":2017,"appid":5,"deviceid":"other","uuid":11110,"day":29}},
{"_index":"test","_type":"cvlog","_id":"ELPrFmsBfzGjTnMwcxPZ","_score":1.0,"_source":{"country":"FR","month":11,"hour":20,"year":2017,"appid":1,"deviceid":"andro","uuid":11122,"day":30}},
{"_index":"test","_type":"cvlog","_id":"ErPrFmsBfzGjTnMwdBNW","_score":1.0,"_source":{"country":"AUS","month":12,"hour":14,"year":2017,"appid":7,"deviceid":"iphon","uuid":11124,"day":17}},
{"_index":"test","_type":"cvlog","_id":"FLPrFmsBfzGjTnMwdBN9","_score":1.0,"_source":{"country":"JP","month":12,"hour":8,"year":2017,"appid":1,"deviceid":"iphon","uuid":11126,"day":19}},
{"_index":"test","_type":"cvlog","_id":"BbPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"AUS","month":12,"hour":18,"year":2017,"appid":7,"deviceid":"iphon","uuid":11114,"day":17}},
{"_index":"test","_type":"cvlog","_id":"BrPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"JP","month":12,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11111,"day":14}},
{"_index":"test","_type":"cvlog","_id":"D7PrFmsBfzGjTnMwcxPU","_score":1.0,"_source":{"country":"JP","month":11,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11121,"day":11}},
{"_index":"test","_type":"cvlog","_id":"FrPrFmsBfzGjTnMwdRM1","_score":1.0,"_source":{"country":"FR","month":12,"hour":4,"year":2017,"appid":9,"deviceid":"iphon","uuid":11128,"day":9}}]}}
ログインし、curlコマンドでOpenDistroのSQLを使って、ElasticsearchにSQLを実行し確認(見やすいように改行入れてます)
OpenDistroのSQLについて
https://opendistro.github.io/for-elasticsearch-docs/docs/sql/
$ curl -XPOST https://vpc-se2-39-es2-tlv3sd7uwrosmj42k6adzdczky.ap-northeast-1.es.amazonaws.com/_opendistro/_sql -k -d '{"query": "SELECT * FROM test LIMIT 10"}' -H 'Content-Type: application/json'
{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":19,"max_score":1.0,"hits":[
{"_index":"test","_type":"cvlog","_id":"CbPrFmsBfzGjTnMwbRNs","_score":1.0,"_source":{"country":"FR","month":12,"hour":18,"year":2017,"appid":9,"deviceid":"pc","uuid":11117,"day":2}},
{"_index":"test","_type":"cvlog","_id":"CrPrFmsBfzGjTnMwbRNx","_score":1.0,"_source":{"country":"JP","month":12,"hour":11,"year":2017,"appid":1,"deviceid":"iphon","uuid":11116,"day":15}},
{"_index":"test","_type":"cvlog","_id":"DrPrFmsBfzGjTnMwcxPC","_score":1.0,"_source":{"country":"JP","month":11,"hour":15,"year":2017,"appid":5,"deviceid":"other","uuid":11110,"day":29}},
{"_index":"test","_type":"cvlog","_id":"ELPrFmsBfzGjTnMwcxPZ","_score":1.0,"_source":{"country":"FR","month":11,"hour":20,"year":2017,"appid":1,"deviceid":"andro","uuid":11122,"day":30}},
{"_index":"test","_type":"cvlog","_id":"ErPrFmsBfzGjTnMwdBNW","_score":1.0,"_source":{"country":"AUS","month":12,"hour":14,"year":2017,"appid":7,"deviceid":"iphon","uuid":11124,"day":17}},
{"_index":"test","_type":"cvlog","_id":"FLPrFmsBfzGjTnMwdBN9","_score":1.0,"_source":{"country":"JP","month":12,"hour":8,"year":2017,"appid":1,"deviceid":"iphon","uuid":11126,"day":19}},
{"_index":"test","_type":"cvlog","_id":"BbPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"AUS","month":12,"hour":18,"year":2017,"appid":7,"deviceid":"iphon","uuid":11114,"day":17}},
{"_index":"test","_type":"cvlog","_id":"BrPrFmsBfzGjTnMwaxO_","_score":1.0,"_source":{"country":"JP","month":12,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11111,"day":14}},
{"_index":"test","_type":"cvlog","_id":"D7PrFmsBfzGjTnMwcxPU","_score":1.0,"_source":{"country":"JP","month":11,"hour":12,"year":2017,"appid":1,"deviceid":"iphon","uuid":11121,"day":11}},
{"_index":"test","_type":"cvlog","_id":"FrPrFmsBfzGjTnMwdRM1","_score":1.0,"_source":{"country":"FR","month":12,"hour":4,"year":2017,"appid":9,"deviceid":"iphon","uuid":11128,"day":9}}]}}
Glueにおける自己参照ルールの補足
こちらも是非
Glueの使い方的な㉟(RDBにwhereでロードするデータを絞る)
https://qiita.com/pioho07/items/1e52672fb58ee88e9aa7
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f