この投稿では、Fluendのhttpアウトプットプラグインについて調査した結果についてお伝えします。
Fluentdのhttpアウトプットプラグインとは?
Fluentdではアウトプットプラグインを設定することで、ログの送信先をファイルやElasticsearchなどさまざまなストレージに送信できます。httpアウトプットプラグインもその一種ですが、送信先のミドルウェアが決まっているものではなく、HTTPリクエストを受け取れるミドルウェアならどんなものに送信できる万能選手です。httpアウトプットを使うと、Webhook的なことができます。受け取る側のHTTPサーバーはどんな言語で実装しても構わないので、ログを受け取ったら任意の処理を行うことができます。
- 公式ドキュメント: https://docs.fluentd.org/output/http
out_httpがビルトインになったのはv1.7.0
Fluentdでout_httpがビルトイン、つまり、プラグインをインストールしなくても使えるようになったのはv1.7.0からです。
Docker Hubで公開されているfluentd/fluentd:latestイメージはv1.3なので、それを使うと動作しませんでした。おそらくout_httpプラグインをインストールすれば使えると思います。
今回は、fluentd/fluentd:v1.11-1を使うことにしました。
out_http導入に必要な設定はいたってシンプル
次に、Fluentdでout_httpを導入するに当たって必要な設定を調べました。多くの設定項目があることは、公式ドキュメントを分かりますが、最低限の設定は次のとおりでした。
<match nginx.access>
@type http
endpoint http://go:8000/
</match>
この例では、ログをhttp://go:8000/
に送信します。さまざまな設定がデフォルト値で動作するので、次のような振る舞いになります。
- メソッド:
POST
- Content-Type:
application/x-ndjson
- 送信されるタイミング: 60秒後
ndjsonという見慣れないデータ形式になりますが、これは改行文字でJSON値を区切ったデータ形式です。ログ界隈ではデファクト・スタンダードなデータ型らしいです。
ログ送信が60秒後なのは遅すぎるのでは?
ログをできるだけリアルタイムで処理したいと考えているので、デフォルトの60秒後送信は遅すぎると思いました。これは<buffer>
ディレクティブのflush_interval
設定を変えることで短縮できます。
<match nginx.access>
@type http
endpoint http://go:8000/
<buffer>
flush_interval 3s
</buffer>
</match>
この設定では、3秒後にログが送信されるようになります。
out_httpが送信するHTTPリクエスト
out_httpが送信するHTTPリクエストは次のようなものになります。次の例は、nginxのアクセスログです。
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
各行の内容は、Fluentdのnginxパーサーが処理したJSONデータを思われます。
送信するログのフィルタリング
out_httpは全てのログを送信するようになっています。では、特定のログだけ送信するにはどうしたらいいのでしょうか。
特定のログだけ送信するには、Filterを用います。特定のフィールドにパターンマッチさせて絞り込むには、grepプラグインを使います。次の設定例は、POSTメソッドのログのみをout_httpで送信する例です。
<filter nginx.access>
@type grep
<regexp>
key method
pattern /^POST$/
</regexp>
</filter>
<match nginx.access>
@type http
endpoint http://go:8000/
<buffer>
flush_interval 3s
</buffer>
</match>
送信されるログデータの課題
ログのアイデンティティ
out_httpが送信するログにはUUIDが振られていません。そのため、受信側はログが新規なのか再送されたものなのか判断できません。
この問題は、add-uuidプラグインを使うことで解決できそうです。
Nginxでは$request_id
をログ書式に加えることができますが、これはリクエスト単位でユニークなIDです。ログごとのIDではありません。
日時
ログに日時が含まれていないと、out_httpは日時情報を伝えません。ログを書き出す側で日時を出す必要があります。
受信側エンドポイントの実装例
ここではFluentdが送信してきたログをどのように処理するか考えていきます。ログを処理するHTTPサーバーはGoで実装します。
Fluentdが送信するリクエストは普通のHTTPリクエストなので、Goのnet/http
モジュールでHTTPサーバーを実装すれば処理できてしまいます。次のサンプルコードは、リクエストをダンプするだけのものです。
package main
import (
"fmt"
"log"
"net/http"
"net/http/httputil"
)
func handleRequest(res http.ResponseWriter, req *http.Request) {
dump, _ := httputil.DumpRequest(req, true)
fmt.Printf("%s\n\n", dump)
fmt.Fprintf(res, "OK")
}
func main() {
http.HandleFunc("/", handleRequest)
log.Fatal(http.ListenAndServe(":8000", nil))
}
この実装でリクエストを待ち受けていると、次のような実行結果を見ることができます。
$ go run main.go
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
まず、最低限のチェックとして、メソッドがPOST
なのかとContent-Typeがndjsonなのかをチェックするようにします。
if req.Method != http.MethodPost {
res.WriteHeader(http.StatusMethodNotAllowed)
res.Write([]byte("Method not allowed"))
return
}
if req.Header.Get("Content-type") != "application/x-ndjson" {
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte("Only application/x-ndjson content is allowed"))
return
}
次にリクエストボディのndjsonをパースする必要がありますが、これはencoding/json
モジュールだけで行なえます。json.Decoder
のMore
メソッドを叩くと、行ごとにデコードできます。次のサンプルコードはjson.Decoder
を使ったndjsonのパース例です。
package main
import (
"encoding/json"
"fmt"
"strings"
)
func main() {
data := `{"base": "white rice", "proteins": ["tofu"]}
{"base": "salad", "proteins": ["tuna", "salmon"]}
`
decoder := json.NewDecoder(strings.NewReader(data))
for decoder.More() {
var value interface{}
if err := decoder.Decode(&value); err != nil {
fmt.Errorf("parse error: %w", err)
return
}
fmt.Printf("value: %#v\n", value)
}
}
ndjsonのパース処理をサーバー実装に加えたものが次のサンプルコードです。
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
)
func handleRequest(res http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
res.WriteHeader(http.StatusMethodNotAllowed)
res.Write([]byte("Method not allowed"))
return
}
if req.Header.Get("Content-type") != "application/x-ndjson" {
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte("Only application/x-ndjson content is allowed"))
return
}
decoder := json.NewDecoder(req.Body)
for decoder.More() {
var value interface{}
if err := decoder.Decode(&value); err != nil {
fmt.Errorf("parse error: %w\n", err)
} else {
fmt.Printf("value: %#v\n", value)
}
}
fmt.Fprintf(res, "OK")
}
func main() {
http.HandleFunc("/", handleRequest)
log.Fatal(http.ListenAndServe(":8000", nil))
}
このサーバーを起動して、Fluentdがこのサーバーにログを送信すると次のような出力が確認できます。
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
この出力結果からndjsonの行ごとにJSONがパースできているのが分かります。
もしログ受信側サーバーがダウンしていたら?
もしもFluentdからログを受け取るエンドポイントがダウンしていたら、その間に送信されたログはどうなるのでしょうか。
これを確認するためにGoサーバーを停止した上で、Fluentdにログを送信させてみます。
すると、Fluentdのログには次の警告が表示されました。これは読む限り、go:8000のTCP接続が開けなかったことについての警告のようです。なお、このログはretry_time=7になるまで1秒後、2秒後、4秒後、6秒後と指数関数秒後にリトライ送信がかかっていました。
2020-11-02 07:19:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:19:40 +0000 chunk="5b31a91682a46b9ed00331d272b9caf7" error_class=SocketError error="Failed to open TCP connection to go:8000 (getaddrinfo: Name does not resolve)"
警告を確認後しばらくしてから、Goサーバーを起動してみます。どうなるでしょうか。
Goサーバーが起動して数秒すると、Fluentdからダウン時に作られたログが送信されてきました。Fluentdのリトライはretry_limitで設定した回数だけ繰り返されるようです。今回は8回目のリトライで送信に成功したようです。
18ぐらいにしておけば1日以上はretryしてくれます。ただし、retry間隔がどんどん大きくなっていくので、送信先の復旧後もなかなか送信されない状態が続きます。ですので、他のoptionと組み合わせてretry間隔が大きくならないよう調整するか、USR1シグナルを送り強制的にflushさせるなどの処置が必要になるかと思います。
BufferedOutput pluginの代表的なoptionについて - Qiita
もしログ受信側が500系のエラーを吐いたら?
上ではログ受信側が完全に停止しているケースを検証しました。では、もしもログ受信側が不安定になったケースではどうなるでしょうか。たとえば、500系のレスポンスコードを返し続けるようなケースです。
ためしに、Goサーバの実装を書き換えて、常に500ステータスを返すようにしてみます。その上で、Fluentdにログを送信させてみましょう。
func handleRequest(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusInternalServerError)
return
// ...
}
Fluentdのログには次のような警告が表示されました。この場合はサーバダウンと異なり、指数関数秒後の再送は発生しないようです。
2020-11-02 07:27:25 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="500 Internal Server Error "
Goサーバーのコードを直してGoサーバーを再起動してみます。どうなるでしょうか。
しばらく経ってもログは再送されませんでした。
ドキュメントを読む限り、Fluentdのout_httpのretryable_response_codes
の設定が必要のようです。これを設定しておくと、所定のステータスコードのときログの再送を試みるようです。この設定は次のようにセットします。
<match nginx.access>
@type http
endpoint http://go:8000/
retryable_response_codes [500, 503]
<buffer>
flush_interval 3s
</buffer>
</match>
この設定を加えた上で、再び同様の検証をしてみます。すると、Goサーバーが500レスポンスを返したときのFluentdのログ内容が次のように変わりました。リトライが行われるようになったことが分かります。
2020-11-02 07:33:31 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:33:32 +0000 chunk="5b31ac31236dc81f666960c6649cbfdc" error_class=Fluent::Plugin::HTTPOutput::RetryableResponse error="500 Internal Server Error "
しばらくしてから、Goサーバーのコードを直してGoサーバーを再起動してみました。すると、ログの再送がなされGoサーバーに届きました。
検証コード
この調査に用いた検証コードはGitHubで確認できます: https://github.com/suin/golang_playground/tree/7623374f54c509e9e02360184f0c196183fefb28/fluentd_http_out