中編ではAuditログのidを復号化するexpandaudit
機能について紹介しましたが、この機能を使った場合、ファイル単位で処理済みかどうかを判断します。
ファイルのローテーションはMFSの再起動時かdailyかで発生するので、Auditログにデータが随時追加され、かつ、ローテーションが発生しない状況では、expandaudit
をインターバルを空けて複数回実行した場合、前回の実行で読み込んだ内容も再度読み込んで処理することになります。
そのため、通常はexpandaudit
コマンドをdailyのバッチジョブとして実施します。
このときの問題点としては、まず複合元のAuditログサイズが大きくなった場合処理が非常に重くなることと、バッチ処理になりますのでリアルタイムでの処理が出来ません。
この問題に対して、MapR 6.0からはAudit Stream機能が追加され、AuditログをリアルタイムにStream処理することが出来るようになりました。
後編の今回はこのAudit Stream機能について、使い方の一例を紹介します。
Audit Streamについて
Audit StreamはAuditログをStream処理するための機能となります。
Auditログは通常 /var/mapr/local/<ホスト名>/audit/5660/
以下に累積されていきますが、Audit Stream機能を有効にするとMapR-ES1にログがpublishされます。
そのため、ユーザが実行したいconsumerを実装することで、Auditログをリアルタイムで処理することが可能となります。
上図はMapRの公式Document内の実行例となりますが、リアルタイムレポートや異常検知、その他のタスクのトリガーなど様々な用途に利用可能となります。
デフォルトの設定ではMapR-ESには7日間保持されますが、もちろんこの期間を編集することも可能です。
データをMapR-ESにproduceするのはhoststats
プロセスで、consumeはmaprユーザのみ可能となります。
Audit Streamの起動
それでは早速、Audit Stream機能を起動してみましょう。
Audit Streamの有効化には"mfs.enable.audit.as.stream"パラメタを1に設定してください。
$ maprcli config save -values '{"mfs.enable.audit.as.stream":"1"}'
$ maprcli config load -json | grep audit.as.stream
"mfs.enable.audit.as.stream":"1",
あとはAudit対象のVolumeに対しアクセスし、Auditログを発生させてください。
Auditログが発生した際に、produce先のストリームが自動で作成されます。
ストリームのパスは/var/mapr/auditstream/auditlogstream
になります。
$ maprcli stream info -path /var/mapr/auditstream/auditlogstream -json
{
"timestamp":1544532065489,
"timeofday":"2018-12-11 07:41:05.489 GMT-0500 AM",
"status":"OK",
"total":1,
"data":[
{
"path":"/var/mapr/auditstream/auditlogstream",
"physicalsize":2031616,
"logicalsize":1269760,
"numtopics":12,
"defaultpartitions":1,
"ttl":604800,
"compression":"lz4",
"autocreate":true,
"produceperm":"u:mapr",
"consumeperm":"u:mapr",
"topicperm":"u:mapr",
"copyperm":"u:mapr",
"adminperm":"u:mapr",
"ischangelog":false,
"defaulttimestamptype":"CreateTime"
}
]
}
$ maprcli stream topic list -path '/var/mapr/auditstream/auditlogstream' -json
{
"timestamp":1543589952706,
"timeofday":"2018-11-30 09:59:12.706 GMT-0500 AM",
"status":"OK",
"total":12,
"data":[
{
"topic":"ov_auth_ov2",
"partitions":1,
"consumers":0,
"physicalsize":0,
"logicalsize":0,
"maxlag":0
},
{
"topic":"ov_auth_ov0",
"partitions":1,
"consumers":0,
"physicalsize":49152,
"logicalsize":24576,
"maxlag":0
},
{
"topic":"ov_fs_ov1",
"partitions":1,
"consumers":0,
"physicalsize":49152,
"logicalsize":24576,
"maxlag":0
},
{
"topic":"ov_auth_ov1",
"partitions":1,
"consumers":0,
"physicalsize":0,
"logicalsize":0,
"maxlag":0
},
{
"topic":"ov_fs_ov2",
"partitions":1,
"consumers":0,
"physicalsize":65536,
"logicalsize":32768,
"maxlag":0
},
{
"topic":"ov_fs_ov0",
"partitions":1,
"consumers":0,
"physicalsize":1875968,
"logicalsize":1589248,
"maxlag":0
},
{
"topic":"ov_db_ov1",
"partitions":1,
"consumers":0,
"physicalsize":49152,
"logicalsize":24576,
"maxlag":0
},
{
"topic":"ov_cldb_ov0",
"partitions":1,
"consumers":0,
"physicalsize":172032,
"logicalsize":122880,
"maxlag":0
},
{
"topic":"ov_db_ov0",
"partitions":1,
"consumers":0,
"physicalsize":827392,
"logicalsize":688128,
"maxlag":0
},
{
"topic":"ov_cldb_ov1",
"partitions":1,
"consumers":0,
"physicalsize":49152,
"logicalsize":24576,
"maxlag":0
},
},
{
"topic":"ov_cldb_ov2",
"partitions":1,
"consumers":0,
"physicalsize":3055616,
"logicalsize":2498560,
"maxlag":0
},
{
"topic":"ov_db_ov2",
"partitions":1,
"consumers":0,
"physicalsize":49152,
"logicalsize":24576,
"maxlag":0
}
]
}
上記のようにトピックが作成されていることが確認できました。
トピック名は<clustername>_<logType>_<nodeName>
というシンタックスで作成されます。
<logType>
はcldb, auth, fs, dbがあり、dbにはMapR-DBとMapR-ESのログがpublishされます。
Audit StreamのConsume
Audit StreamはConsumerRecordsクラスのオブジェクトとしてMapR-ESにpublishされています。
そのため、consumeした際にはこちらのオブジェクトとしてレコードを読み込む必要があります。
さらに、データはidで復号化されていますので、expandaudit
コマンドのようにconsumer内部でパスに復号化する必要があります。
復号化のAPIについてはこちらのページに詳細があるので興味がある方はどうぞ。
さて、ConsumerのサンプルについてはMapRの公式ドキュメントでも実装例が紹介されています。
ここでは公式ドキュメントの実装例を以下のように少し変更したものを使って、Consumerの挙動を確認してみましょう
公式ドキュメントからの変更点は以下の点になります。
- Consumeしたレコードは標準出力に出すのではなく、MapR-DBに格納(デバッグ機能として標準出力機能も維持)
- クラスタと出力先のMapR-DBのパスをコマンドライン引数から指定可能
- systemdを使ってデーモン化
さて、実際に使ってみます。以下の作業をクラスタノード上で実施してください。
$ git clone https://github.com/tgib23/AuditConsumer
$ cd AuditConsumer
$ javac -cp .:`mapr classpath` AuditConsumer.java -Xlint:deprecation
ビルド出来たらまずは手動で実行してみます
$ java -cp .:`mapr classpath` AuditConsumer -cluster ov -output_db_path /tmp/testdb -debug 1
このあと適当にAudit対象のVolumeにアクセスしてみましょう。
標準出力に以下のようなログが出てくるのが確認できたでしょうか?
{"timestamp":{"$date":"2018-12-11T14:04:19.148Z"},"operation":"DB_REGIONLOOKUP","uid":5000,"ipAddress":"10.10.75.127","volumeId":1662517,"VolumeName":"force_audit","tableFid":"2304.168.266532","FidPath":"/force_audit/db/46","status":0}
{"timestamp":{"$date":"2018-12-11T14:04:19.318Z"},"operation":"DB_PUT","uid":5000,"ipAddress":"10.10.75.81","volumeId":1662517,"VolumeName":"force_audit","columnFamily":"default","columnQualifier":"total_score","tableFid":"2304.168.266532","FidPath":"/force_audit/db/46","status":0}
{"timestamp":{"$date":"2018-12-11T14:04:19.318Z"},"operation":"DB_PUT","uid":5000,"ipAddress":"10.10.75.81","volumeId":1662517,"VolumeName":"force_audit","columnFamily":"default","columnQualifier":"PROP.power","tableFid":"2304.168.266532","FidPath":"/force_audit/db/46","status":0}
{"timestamp":{"$date":"2018-12-11T14:04:19.318Z"},"operation":"DB_PUT","uid":5000,"ipAddress":"10.10.75.81","volumeId":1662517,"VolumeName":"force_audit","columnFamily":"default","columnQualifier":"PROP.lead","tableFid":"2304.168.266532","FidPath":"/force_audit/db/46","status":0}
{"timestamp":{"$date":"2018-12-11T14:04:19.318Z"},"operation":"DB_PUT","uid":5000,"ipAddress":"10.10.75.81","volumeId":1662517,"VolumeName":"force_audit","columnFamily":"default","columnQualifier":"PROP.intellligence","tableFid":"2304.168.266532","FidPath":"/force_audit/db/46","status":0}
{"timestamp":{"$date":"2018-12-11T14:04:19.318Z"},"operation":"DB_PUT","uid":5000,"ipAddress":"10.10.75.81","volumeId":1662517,"VolumeName":"force_audit","columnFamily":"default","columnQualifier":"USERNAME","tableFid":"2304.168.266532","FidPath":"/force_audit/db/46","status":0}
{"timestamp":{"$date":"2018-12-11T14:04:19.000Z"},"operation":"LOOKUP","uid":5000,"ipAddress":"10.10.75.127","srcFid":"2304.32.266260","FidPath":"/force_audit/db/46","dstFid":"2304.168.266532","FidPath":"/force_audit/db/46","srcName":"46","volumeId":1662517,"VolumeName":"force_audit","status":0}
上記のコマンドでは/tmp/testdb
パスにテーブルを作成して、そのテーブルにも同様のデータを格納します。
Drillで確認してみましょう
$ sqlline -u jdbc:drill:drillbit=ov0 -n mapr -p mapr
0: jdbc:drill:drillbit=ov0> select * from dfs.`/tmp/testdb`;
+-------------------------------+--------------+---------------+------------------+---------+---------------------+-------+---------------+---------------------+---------------------+---------------------+----------+
| _id | VolumeName | ipAddress | operation | status | tableFidPath | uid | columnFamily | columnQualifier | dstFidPath | srcFidPath | srcName |
+-------------------------------+--------------+---------------+------------------+---------+---------------------+-------+---------------+---------------------+---------------------+---------------------+----------+
| 2018-12-11T13:50:30.916Z-821 | force_audit | 10.10.75.81 | DB_TABLEINFO | 0 | /force_audit/db/50 | 5000 | null | null | null | null | null |
| 2018-12-11T13:50:30.920Z-148 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/50 | 5000 | default | PROP.lead | null | null | null |
| 2018-12-11T13:50:30.920Z-288 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/50 | 5000 | default | total_score | null | null | null |
| 2018-12-11T13:50:30.920Z-36 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/50 | 5000 | default | PROP.power | null | null | null |
| 2018-12-11T13:50:30.920Z-664 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/50 | 5000 | default | PROP.intellligence | null | null | null |
| 2018-12-11T13:50:30.920Z-973 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/50 | 5000 | default | USERNAME | null | null | null |
| 2018-12-11T13:50:36.559Z-331 | force_audit | 10.10.75.81 | DB_TABLEINFO | 0 | /force_audit/db/49 | 5000 | null | null | null | null | null |
| 2018-12-11T13:50:36.574Z-699 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/49 | 5000 | default | USERNAME | null | null | null |
| 2018-12-11T13:50:36.574Z-884 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/49 | 5000 | default | total_score | null | null | null |
| 2018-12-11T13:51:23.190Z-432 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/49 | 5000 | default | PROP.lead | null | null | null |
| 2018-12-11T13:51:23.190Z-56 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/49 | 5000 | default | PROP.intellligence | null | null | null |
| 2018-12-11T13:51:23.190Z-572 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/49 | 5000 | default | PROP.power | null | null | null |
| 2018-12-11T13:51:23.190Z-813 | force_audit | 10.10.75.81 | DB_PUT | 0 | /force_audit/db/49 | 5000 | default | total_score | null | null | null |
格納されていますね。
ちなみにこちらのプログラムでは"_id"に指定するキーを、時間+3桁のランダムIDという形で指定しています。
簡単ですがデモなので良しとします。
次にこちらのプログラムをデーモン化してみましょう
$ sudo mkdir /opt/mapr/audit_consumer
$ sudo cp AuditConsumer* consumer.props /opt/mapr/audit_consumer/
AuditConsumer.service内のExecStart
をクラスパス(mapr classpath
を展開した文字列), クラスタ名, 出力dbパスを指定して書き換えてください。
$ sudo cp AuditConsumer.service /etc/systemd/system/
$ sudo systemctl enable AuditConsumer
$ sudo systemctl start AuditConsumer
ストリーム処理なのでこのように何らかの形でデーモン化しておくといいでしょう
まとめ
後編ではAuditStream機能について紹介しました。
この機能を使うことでリアルタイムのidの復号化だけでなく、不正アクセス検出などのリアルタイム分析が可能になりました。
実際にアクセス状況をモデル化し、不正アクセス検出を行うサンプルなども、今後公開してみたいと思います。
-
MapR 6.0.1から(?)はMapR-ESをMapR Event Store For Apache Kafkaと呼んでいるようです。Kafkaの代替機能となることを強調しているようですが、個人的にコロコロ名前変えられると怪しいプロダクト感が増すので止めてほしいです笑 ↩