概要
- YAFの入力に名前付きパイプを指定、名前付きパイプにはpcapファイル名一覧を渡す
- YAFからsuper_mediatorにDPIありでIPfix形式でsuper_mediatorに渡す
- super_mediatorの出力にJSONのファイルを指定
- JSONファイルをlogstashに渡す
- elasticsearh経由でKibanaで可視化
super_mediatorの改造
super_mediator-1.3.0 が出力するJSONファイルのISO8601形式の時刻をELK側で正しく扱える様に下記の改造を行った。
改造しなくてもlogstashのフィルタで対応できると思う。
super_mediatorを改造せずにlogstashのフィルタで@timestampの値をflows.flowStartMillisecondsにする
(2018/02/04追記)
https://qiita.com/t_umeno/items/5ae7f57f2388b49d5ecd
diff -urN super_mediator-1.3.0.orig/include/mediator/mediator_ctx.h super_mediator-1.3.0/include/mediator/mediator_ctx.h
--- super_mediator-1.3.0.orig/include/mediator/mediator_ctx.h 2016-01-06 04:31:41.000000000 +0900
+++ super_mediator-1.3.0/include/mediator/mediator_ctx.h 2016-09-25 19:32:06.593349847 +0900
@@ -97,7 +97,7 @@
#define MAX_LIST 10
/* 30 sec */
#define MD_RESTART_MS 30000
-#define PRINT_TIME_FMT "%04u-%02u-%02u %02u:%02u:%02u"
+#define PRINT_TIME_FMT "%04u-%02u-%02uT%02u:%02u:%02u"
#define MD_MSGLEN_STD 65535
typedef enum mdTransportType_en {
diff -urN super_mediator-1.3.0.orig/src/mediator_export.c super_mediator-1.3.0/src/mediator_export.c
--- super_mediator-1.3.0.orig/src/mediator_export.c 2016-03-08 23:03:46.000000000 +0900
+++ super_mediator-1.3.0/src/mediator_export.c 2016-09-25 19:36:19.858386172 +0900
@@ -7228,7 +7228,7 @@
md_util_time_buf_append(exporter->buf, &brem, start_secs,
PRINT_TIME_FMT);
- ret = snprintf(exporter->buf->cp, brem, ".%03u\"}}\n", start_rem);
+ ret = snprintf(exporter->buf->cp, brem, ".%03uZ\"}}\n", start_rem);
MD_CHECK_RET(exporter->buf, ret, brem);
} else {
char *bufstart = exporter->buf->cp;
@@ -7270,7 +7270,7 @@
return FALSE;
}
- ret = snprintf(exporter->buf->cp, brem, ".%03u%c",
+ ret = snprintf(exporter->buf->cp, brem, ".%03uZ%c",
start_rem, exporter->delimiter);
MD_CHECK_RET(exporter->buf, ret, brem);
afterlen = exporter->buf->cp - bufstart;
diff -urN super_mediator-1.3.0.orig/src/mediator_json.c super_mediator-1.3.0/src/mediator_json.c
--- super_mediator-1.3.0.orig/src/mediator_json.c 2016-03-09 00:38:27.000000000 +0900
+++ super_mediator-1.3.0/src/mediator_json.c 2016-09-25 19:36:06.030766449 +0900
@@ -156,7 +156,7 @@
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u\",", start_rem);
+ ret = snprintf(buf->cp, brem, ".%03uZ\",", start_rem);
MD_CHECK_RET0(buf, ret, brem);
@@ -578,7 +578,7 @@
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u\",", start_rem);
+ ret = snprintf(buf->cp, brem, ".%03uZ\",", start_rem);
MD_CHECK_RET0(buf, ret, brem);
@@ -590,7 +590,7 @@
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u\",", end_rem);
+ ret = snprintf(buf->cp, brem, ".%03uZ\",", end_rem);
MD_CHECK_RET0(buf, ret, brem);
}
@@ -779,14 +779,14 @@
if (!md_util_time_buf_append(buf, &brem, start_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u\",\"lastSeen\":\"", start_rem);
+ ret = snprintf(buf->cp, brem, ".%03uZ\",\"lastSeen\":\"", start_rem);
MD_CHECK_RET0(buf, ret, brem);
if (!md_util_time_buf_append(buf, &brem, end_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u\",\"sslCertSerialNumber\":\"",
+ ret = snprintf(buf->cp, brem, ".%03uZ\",\"sslCertSerialNumber\":\"",
end_rem);
MD_CHECK_RET0(buf, ret, brem);
@@ -843,7 +843,7 @@
if (!md_util_time_buf_append(buf, &brem, start_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u\",\"lastSeen\":\"", start_rem);
+ ret = snprintf(buf->cp, brem, ".%03uZ\",\"lastSeen\":\"", start_rem);
MD_CHECK_RET0(buf, ret, brem);
if (!md_util_time_buf_append(buf, &brem, end_secs, PRINT_TIME_FMT)) {
@@ -852,12 +852,12 @@
if (rec->sip != rec->hash) {
if (rec->sip == 0) {
- ret = snprintf(buf->cp, brem, ".%03u\",\"sourceIPv6Address\":\"",
+ ret = snprintf(buf->cp, brem, ".%03uZ\",\"sourceIPv6Address\":\"",
end_rem);
MD_CHECK_RET0(buf, ret, brem);
md_util_print_ip6_addr(sabuf, rec->sip6);
} else {
- ret = snprintf(buf->cp, brem, ".%03u\",\"sourceIPv4Address\":\"",
+ ret = snprintf(buf->cp, brem, ".%03uZ\",\"sourceIPv4Address\":\"",
end_rem);
MD_CHECK_RET0(buf, ret, brem);
md_util_print_ip4_addr(sabuf, rec->sip);
@@ -867,7 +867,7 @@
sabuf, rec->hash, rec->count);
} else {
/* deduped on hash, not IP so don't print IP */
- ret = snprintf(buf->cp, brem, ".%03u\",\"yafFlowKeyHash\":%u,"
+ ret = snprintf(buf->cp, brem, ".%03uZ\",\"yafFlowKeyHash\":%u,"
"\"observedDataTotalCount\":%"PRIu64",",
end_rem, rec->hash, rec->count);
}
diff -urN super_mediator-1.3.0.orig/src/mediator_print.c super_mediator-1.3.0/src/mediator_print.c
--- super_mediator-1.3.0.orig/src/mediator_print.c 2016-03-08 05:41:41.000000000 +0900
+++ super_mediator-1.3.0/src/mediator_print.c 2016-09-25 19:37:18.868763206 +0900
@@ -270,7 +270,7 @@
md_util_time_g_string_append(tmp, start_secs, PRINT_TIME_FMT);
- g_string_append_printf(tmp, ".%03u", start_rem);
+ g_string_append_printf(tmp, ".%03uZ", start_rem);
ret = snprintf(buf->cp, *bufsize, decorator, tmp->str);
@@ -296,7 +296,7 @@
int ret;
md_util_time_g_string_append(tmp, end_secs, PRINT_TIME_FMT);
- g_string_append_printf(tmp, ".%03u", end_rem);
+ g_string_append_printf(tmp, ".%03uZ", end_rem);
ret = snprintf(buf->cp, *bufsize, decorator, tmp->str);
@@ -2098,11 +2098,11 @@
md_util_time_g_string_append(str, start_secs, PRINT_TIME_FMT);
- g_string_append_printf(str, ".%03u", start_rem);
+ g_string_append_printf(str, ".%03uZ", start_rem);
g_string_append_printf(str, "%c", delimiter);
md_util_time_g_string_append(str, end_secs, PRINT_TIME_FMT);
- g_string_append_printf(str, ".%03u", end_rem);
+ g_string_append_printf(str, ".%03uZ", end_rem);
g_string_append_printf(str, "%c%8.3f", delimiter,
(flow->flowEndMilliseconds -
@@ -2411,7 +2411,7 @@
if (!md_util_time_buf_append(buf, &brem, start_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u%c", start_rem, delimiter);
+ ret = snprintf(buf->cp, brem, ".%03uZ%c", start_rem, delimiter);
MD_WR_BDC0(ret, brem);
buf->cp += ret;
@@ -2419,7 +2419,7 @@
if (!md_util_time_buf_append(buf, &brem, end_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u%c", end_rem, delimiter);
+ ret = snprintf(buf->cp, brem, ".%03uZ%c", end_rem, delimiter);
MD_WR_BDC0(ret, brem);
buf->cp += ret;
}
@@ -2537,7 +2537,7 @@
return 0;
}
- ret = snprintf(buf->cp, brem,".%03u%c%u%c%u%c", start_rem, delimiter,
+ ret = snprintf(buf->cp, brem,".%03uZ%c%u%c%u%c", start_rem, delimiter,
record->hash, delimiter, record->obid, delimiter);
MD_WR_BDC0(ret, brem);
@@ -2820,7 +2820,7 @@
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u%c", start_rem, delimiter);
+ ret = snprintf(buf->cp, brem, ".%03uZ%c", start_rem, delimiter);
MD_WR_BDC0(ret, brem);
buf->cp += ret;
@@ -2828,7 +2828,7 @@
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u%c", end_rem, delimiter);
+ ret = snprintf(buf->cp, brem, ".%03uZ%c", end_rem, delimiter);
MD_WR_BDC0(ret, brem);
buf->cp += ret;
@@ -2935,14 +2935,14 @@
if (!md_util_time_buf_append(buf, &brem, start_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u%c", start_rem, delimiter);
+ ret = snprintf(buf->cp, brem, ".%03uZ%c", start_rem, delimiter);
MD_WR_BDC0(ret, brem);
buf->cp += ret;
if (!md_util_time_buf_append(buf, &brem, end_secs, PRINT_TIME_FMT)) {
return 0;
}
- ret = snprintf(buf->cp, brem, ".%03u%c", end_rem, delimiter);
+ ret = snprintf(buf->cp, brem, ".%03uZ%c", end_rem, delimiter);
MD_WR_BDC0(ret, brem);
buf->cp += ret;
メリット
- 一度ファイルにするのでlogstashに何度でもデータを渡せる
- logstashやfluentdのnetflowやipfixのプラグインを使った時にはUDPパケット紛失や通信バッファあふれのためフローデータを紛失することがあるが、ファイルに一旦変換するのでフローデータの紛失が抑えられる。
デメリット
- JSONファイルを置くディスクが圧迫される
参考
- YAF https://tools.netsa.cert.org/yaf/
- super_mediator http://tools.netsa.cert.org/super_mediator/