すでに何人かの方が記事を書かれていますが、Go言語の情報が少ないので書いてみました。
基本的にはDevelopers.IOの藤本さんの記事を元に、Apex Goで実装してみました。
AWS Lambda Funcion作成
完成版のLambdaコードはGistを見てください。以下、要点のみ説明します。
全体の流れ
ELBがアクセスログをS3に書き込むと、メタ情報がS3イベントとしてLambdaに渡され、該当のLambdaファンクションを実行します。
以下、処理詳細になります。
- LambdaハンドラーがS3イベントを引数として受け取る。
- S3イベントにはスライスで複数のアクセスログファイル名が格納されているのでループで処理する。
- S3に接続し、アクセスログファイルのコンテンツを取得する。
- 取得したファイルを1行ごとループで処理する。(1行が1HTTPリクエスト)
- ログをJSON形式に変換する。(Goのstructタグを使用して。)
- ElasticsearchへPOSTする。(Indexing。ドキュメントの追加。)
Apex Lambdaハンドラー
コールバックでapexS3.Event.Records
に、S3バケットにアップロードされたファイル名のリストが渡されます。
func main() {
apexS3.HandleFunc(func(event *apexS3.Event, ctx *apex.Context) error {
for _, rec := range event.Records {
// ファイルの処理。Recordsには、ファイルではなくファイル名のリストが入っている。
注意: ファイルが渡されるわけではないです。
S3バケットからファイルを取得
rec.S3.Object.Key
に、S3のファイル名がセットされてます。キーにしてS3からファイルをダウンロードします。
svc := s3.New(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}))
// S3バケットからオブジェクトを取得する。
s3out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(rec.S3.Bucket.Name),
Key: aws.String(rec.S3.Object.Key),
})
// バイト配列に一気に読み込む。
bytes, err := ioutil.ReadAll(s3out.Body)
// 改行で分割する。
for _, line := range strings.Split(string(bytes), "\n") {
// JSON形式に変換。
// Elasticsearchへ登録。
JSON形式に変換
Elasticsearchに登録するドキュメントのスキーマになります。Structタグを使用してJSONにしています。データ型は、Elasticsearch側でよしなに変換してくれます。timestamp
は、date型に変換され、Kibanaでタイムフレームでグラフを描画するのに使用するフィールドになります。
type ElbAccessLog struct {
Timestamp string `json:"timestamp"`
Elb string `json:"elb"`
ClientIpAddress string `json:"client_ip_address"`
BackendIpAddress string `json:"backend_ip_address"`
RequestProcessingTime string `json:"request_processing_time"`
BackendProcessingTime string `json:"backend_processing_time"`
ResponseProcessingTime string `json:"response_processing_time"`
ElbStatusCode string `json:"elb_status_code"`
BackendStatusCode string `json:"backend_status_code"`
ReceivedBytes string `json:"received_bytes"`
SentBytes string `json:"sent_bytes"`
Request string `json:"request"`
}
ElasticsearchにIndexing
AWS版のElasticsearchサービスに接続するのは少し面倒です。
olivere/elasticとedoardo89のRoundTripper
を使用してます。試してませんがデバッグログが出力できるようです。
ライブラリを取得してください。
$ go get gopkg.in/olivere/elastic.v3
$ go get github.com/edoardo849/apex-aws-signer
Elasticsearchに接続します。
transport := signer.NewTransport(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}), elasticsearchservice.ServiceName)
httpClient := &http.Client{
Transport: transport,
}
// Elasticsearchに接続
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(esUrl), // AWS Elasticsearchのエンドポイント
elastic.SetScheme("https"),
elastic.SetHttpClient(httpClient),
)
Documentを登録(Indexing)します。
put1, err := client.Index().
Index(indexName). // インデックス名
Type(esType). // タイプ名
BodyJson(doc). // ドキュメントのコンテンツ
Do()
完成したコードをApexを使用して、Lambdaに登録します。dora63さんの記事が詳しいです。
AWS Lambdaコンソールでトリガーの設定
AWS Lambdaコンソールにログインし、Lambda関数のトリガーにS3バケット(ELBのアクセスログがはかれるとこ)をひも付けます。
適当にELBにアクセスしてみます。Lambdaが実行されElasticsearchにアクセスログがインデックスされます。ChromeのSenseプラグインで検索します。
ローカルで実行してみる
Apexの素晴らしさは、Lambdaプログラムをそのままローカルで実行できることです。イベントは標準入力で渡します。
go run main.go < s3event.json
s3event.json
は実際にAWS上のLambdaが受け取ったS3のイベントapexS3.Event
です。
{
"event": {
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "ap-northeast-1",
"eventTime": "2016-09-15T23:56:16.822Z",
"eventName": "ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
"principalId": "AWS:Axxxx"
},
"requestParameters": {
"sourceIPAddress": "52.x.x.x"
},
"S3": {
"s3SchemaVersion": "1.0",
"configurationId": "xxx",
"bucket": {
"name": "xxx",
"ownerIdentity": {
"principalId": "xxx"
},
"arn": "arn:aws:s3:::xxx"
},
"object": {
"key": "AWSLogs/xxx/elasticloadbalancing/ap-northeast-1/2016/09/15/xxx_elasticloadbalancing_ap-northeast-1_elb1_20160915T2355Z_xxx_19usc64f.log",
"size": 1212,
"eTag": "8b0eb26c9b3b0fb86b5b7c9cafbc7331-1",
"versionId": "",
"sequencer": "0057DB3520885F756B"
}
}
}
]
}
}
スタンドアローンで動かせるのはかなりデバッグが楽になります。実際、このプログラムはほとんどこのダミーイベントを使用して開発できました。
ハマったところ
AWS付属のKibanaですが、小数点は3桁で丸められます。レスポンスタイムなど、ミリ秒で丸められます。Number
型の精度をSettings
のformat:numberPrecision
で変更できるのですが、AWS ESで使用してるKibana 4.1.2はバグってるようです。
AWSのロール・ポリシー設定はなかなかなれませんね。。。Lambdaプログラムから、S3バケット、Elasticsearch、CloudWatchへのアクセス権限が必要です。