やりたいこと
踏み台サーバー経由 (ssh) でプライベートサブネット内の RDS (MySQL) へ接続するのと同様にして、VPC エンドポイントの Elasticsearch Service にも踏み台サーバー経由で接続することが目標です。
ssh コマンドと curl コマンドを利用すれば次のように簡単にローカルから接続できますが、今回は Go プログラムで接続することを目標とします。
ssh -i <path/to/private-key> <username>@<hostname> curl -s '<ES_ENDPOINT>/_cat/indices?format=json&pretty'
前提
次のようなアーキテクチャを想定します。セキュリティグループはいい感じに設定されているものとします。
実装
Elasticsearch のクライアント用ライブラリとしては Elastic 社公式の elastic/go-elasticsearch を利用します。
実装のポイントは http.RoundTripper (http.Transport)
の Dial に SSH Client の Dial を利用することです。
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"golang.org/x/crypto/ssh"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
var (
sshUser = os.Getenv("SSH_USER")
sshHost = os.Getenv("SSH_HOST")
sshPort = os.Getenv("SSH_PORT")
sshPrivateKey = os.Getenv("SSH_PRIVATE_KEY")
esEndpoint = os.Getenv("ES_ENDPOINT")
)
// ------------------------------
// 秘密鍵ファイルの読み込み
// ------------------------------
b, err := ioutil.ReadFile(sshPrivateKey)
if err != nil {
log.Fatal(err)
}
signer, err := ssh.ParsePrivateKey(b)
if err != nil {
log.Fatal(err)
}
// ------------------------------
// SSH クライアントの生成
// ------------------------------
sshConf := ssh.ClientConfig{
User: sshUser,
Auth: []ssh.AuthMethod{
ssh.PublicKeys(signer),
},
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
return nil
},
Timeout: 10 * time.Second,
}
sshClient, err := ssh.Dial("tcp", net.JoinHostPort(sshHost, sshPort), &sshConf)
if err != nil {
log.Fatal(err)
}
defer sshClient.Close()
// ------------------------------
// Elasticsearch クライアントの生成
// ------------------------------
esConf := elasticsearch.Config{
Addresses: []string{esEndpoint},
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: sshClient.Dial, // ここで SSH Client を利用
TLSHandshakeTimeout: 10 * time.Second,
},
}
es, err := elasticsearch.NewClient(esConf)
if err != nil {
log.Fatal(err)
}
// ------------------------------
// リクエストを実行 (/_cat/indices)
// ------------------------------
req := esapi.CatIndicesRequest{
Format: "json",
Pretty: true,
}
ctx := context.Background()
resp, err := req.Do(ctx, es)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
// ------------------------------
// レスポンスを解析
// ------------------------------
if resp.IsError() {
log.Fatal(resp.String())
}
body := io.TeeReader(resp.Body, os.Stdout) // debug
var r []map[string]interface{}
if err := json.NewDecoder(body).Decode(&r); err != nil {
log.Fatal(err)
}
for i, obj := range r {
fmt.Printf("\n[#%d]\n", i)
for k, v := range obj {
fmt.Println(k, v)
}
}
}