LoginSignup
2
1

More than 5 years have passed since last update.

Apex GoでELBのアクセスログをAWS LambdaでElasticsearchに取り込む

Posted at

すでに何人かの方が記事を書かれていますが、Go言語の情報が少ないので書いてみました。
基本的にはDevelopers.IOの藤本さんの記事を元に、Apex Goで実装してみました。

AWS Lambda Funcion作成

完成版のLambdaコードはGistを見てください。以下、要点のみ説明します。

全体の流れ

ELBがアクセスログをS3に書き込むと、メタ情報がS3イベントとしてLambdaに渡され、該当のLambdaファンクションを実行します。
以下、処理詳細になります。

  1. LambdaハンドラーがS3イベントを引数として受け取る。
  2. S3イベントにはスライスで複数のアクセスログファイル名が格納されているのでループで処理する。
  3. S3に接続し、アクセスログファイルのコンテンツを取得する。
  4. 取得したファイルを1行ごとループで処理する。(1行が1HTTPリクエスト)
  5. ログをJSON形式に変換する。(Goのstructタグを使用して。)
  6. ElasticsearchへPOSTする。(Indexing。ドキュメントの追加。)

Apex Lambdaハンドラー

コールバックでapexS3.Event.Recordsに、S3バケットにアップロードされたファイル名のリストが渡されます。

main.go
func main() {
    apexS3.HandleFunc(func(event *apexS3.Event, ctx *apex.Context) error {
        for _, rec := range event.Records {
                // ファイルの処理。Recordsには、ファイルではなくファイル名のリストが入っている。

注意: ファイルが渡されるわけではないです。

S3バケットからファイルを取得

rec.S3.Object.Keyに、S3のファイル名がセットされてます。キーにしてS3からファイルをダウンロードします。

main.go
            svc := s3.New(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}))

            // S3バケットからオブジェクトを取得する。
            s3out, err := svc.GetObject(&s3.GetObjectInput{
                Bucket: aws.String(rec.S3.Bucket.Name),
                Key:    aws.String(rec.S3.Object.Key),
            })
                        // バイト配列に一気に読み込む。
            bytes, err := ioutil.ReadAll(s3out.Body)
                        // 改行で分割する。
            for _, line := range strings.Split(string(bytes), "\n") {
                        // JSON形式に変換。
                        // Elasticsearchへ登録。

JSON形式に変換

Elasticsearchに登録するドキュメントのスキーマになります。Structタグを使用してJSONにしています。データ型は、Elasticsearch側でよしなに変換してくれます。timestampは、date型に変換され、Kibanaでタイムフレームでグラフを描画するのに使用するフィールドになります。

main.go
type ElbAccessLog struct {
    Timestamp              string `json:"timestamp"`
    Elb                    string `json:"elb"`
    ClientIpAddress        string `json:"client_ip_address"`
    BackendIpAddress       string `json:"backend_ip_address"`
    RequestProcessingTime  string `json:"request_processing_time"`
    BackendProcessingTime  string `json:"backend_processing_time"`
    ResponseProcessingTime string `json:"response_processing_time"`
    ElbStatusCode          string `json:"elb_status_code"`
    BackendStatusCode      string `json:"backend_status_code"`
    ReceivedBytes          string `json:"received_bytes"`
    SentBytes              string `json:"sent_bytes"`
    Request                string `json:"request"`
}

ElasticsearchにIndexing

AWS版のElasticsearchサービスに接続するのは少し面倒です。
olivere/elasticedoardo89RoundTripperを使用してます。試してませんがデバッグログが出力できるようです。

ライブラリを取得してください。

$ go get gopkg.in/olivere/elastic.v3
$ go get github.com/edoardo849/apex-aws-signer

Elasticsearchに接続します。

main.go
            transport := signer.NewTransport(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}), elasticsearchservice.ServiceName)

            httpClient := &http.Client{
                Transport: transport,
            }

            // Elasticsearchに接続
            client, err := elastic.NewClient(
                elastic.SetSniff(false),
                elastic.SetURL(esUrl), // AWS Elasticsearchのエンドポイント
                elastic.SetScheme("https"),
                elastic.SetHttpClient(httpClient),
            )

Documentを登録(Indexing)します。

main.go
    put1, err := client.Index().
        Index(indexName). // インデックス名
        Type(esType).     // タイプ名
        BodyJson(doc).    // ドキュメントのコンテンツ
        Do()

完成したコードをApexを使用して、Lambdaに登録します。dora63さんの記事が詳しいです。

AWS Lambdaコンソールでトリガーの設定

AWS Lambdaコンソールにログインし、Lambda関数のトリガーにS3バケット(ELBのアクセスログがはかれるとこ)をひも付けます。
Lambda_Management_Console.png

適当にELBにアクセスしてみます。Lambdaが実行されElasticsearchにアクセスログがインデックスされます。ChromeのSenseプラグインで検索します。
sense.png

ローカルで実行してみる

Apexの素晴らしさは、Lambdaプログラムをそのままローカルで実行できることです。イベントは標準入力で渡します。

go run main.go < s3event.json

s3event.jsonは実際にAWS上のLambdaが受け取ったS3のイベントapexS3.Eventです。

s3event.json
{
  "event": {
    "Records": [
      {
        "eventVersion": "2.0",
        "eventSource": "aws:s3",
        "awsRegion": "ap-northeast-1",
        "eventTime": "2016-09-15T23:56:16.822Z",
        "eventName": "ObjectCreated:CompleteMultipartUpload",
        "userIdentity": {
          "principalId": "AWS:Axxxx"
        },
        "requestParameters": {
          "sourceIPAddress": "52.x.x.x"
        },
        "S3": {
          "s3SchemaVersion": "1.0",
          "configurationId": "xxx",
          "bucket": {
            "name": "xxx",
            "ownerIdentity": {
              "principalId": "xxx"
            },
            "arn": "arn:aws:s3:::xxx"
          },
          "object": {
            "key": "AWSLogs/xxx/elasticloadbalancing/ap-northeast-1/2016/09/15/xxx_elasticloadbalancing_ap-northeast-1_elb1_20160915T2355Z_xxx_19usc64f.log",
            "size": 1212,
            "eTag": "8b0eb26c9b3b0fb86b5b7c9cafbc7331-1",
            "versionId": "",
            "sequencer": "0057DB3520885F756B"
          }
        }
      }
    ]
  }
}

スタンドアローンで動かせるのはかなりデバッグが楽になります。実際、このプログラムはほとんどこのダミーイベントを使用して開発できました。

ハマったところ

AWS付属のKibanaですが、小数点は3桁で丸められます。レスポンスタイムなど、ミリ秒で丸められます。Number型の精度をSettingsformat:numberPrecisionで変更できるのですが、AWS ESで使用してるKibana 4.1.2はバグってるようです。

AWSのロール・ポリシー設定はなかなかなれませんね。。。Lambdaプログラムから、S3バケット、Elasticsearch、CloudWatchへのアクセス権限が必要です。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1