Help us understand the problem. What is going on with this article?

メールのログを集約して追跡を楽ちんにするスクリプト

More than 3 years have passed since last update.

メールログを集約して、「いつ」「誰から」「誰宛の」メールが「どうなったか」をメール毎に1行で出力するスクリプト maillog-digest.pl を作成しました。

以下のように、メールログを標準入力から流し込んで使います。

$ cat /var/log/maillog | ./maillog-digest.pl
Apr 30 15:12:16 mail    QID001:     from=<FROM001@example.com>  to=<TO001@example.com>      status=sent
Apr 30 15:12:16 mail    QID002:     from=<FROM002@example.com>  to=<TO002@example.com>      status=sent
Apr 30 15:12:17 mail    QID003:     from=<FROM003@example.com>  to=<TO003@example.com>      status=sent
Apr 30 15:12:19 mail    QID004:     from=<FROM004@example.com>  to=<TO004@example.com>      status=sent
Apr 30 15:12:21 mail    QID005:     from=<FROM005@example.com>  to=<TO005@example.com>      status=sent

    ...(以降略)...

$

どういうこと?

「メールが届いてないんだけど?」メールサーバを運用していると、一般利用者からしばしばそういう質問が発せられます。
メールサーバの管理担当者は、その質問に答えるためにメールログを解析して、届くはずだったメールがどうなったのか調べることにになります。

ところが、メールログの調査は意外と厄介です。複数のメールデータを同時並行で処理するため、1件のメールが「誰から」「誰宛に」送られたか、は別の行に書かれる(ときには離れた場所に!)ので、結構な大仕事になります。対象の件数が多ければ尚更です。

なので、メールログ解析の手間を省くため、「誰から」「誰宛に」を1行にまとめて出力するスクリプトにまとめました。こんなスクリプトです。

  • メールサーバあるいは syslog サーバに集約されたメールログを読み込む。
  • メール1件につき、以下の情報を1行にまとめて出力する。

    • 日時 ※配送処理が行われた日時
    • from (=誰から)
    • to (=誰宛の)
    • status (=メールがどうなったか)

処理は、ざっくり言えば「メールログを個別のメールごとに仕分けしながら読んで行き、必要な情報が揃ったら出力する」となります。

環境

  • postfix ※sendmail でも応用が利くはずです1
  • syslog
  • Perl v5.10.1

手法

スクリプト maillog-digest.pl

以下のようなスクリプトにまとめました。
可搬性のため、ライブラリも込み込みで1ファイルにまとめてあります。

メールログを標準入力から流し込むと、メールごとにログを集約して出力します。
手元の試験環境では、postfix ログ 50万行(約54MB)を、30秒程度で処理を完了、メモリ使用量は 30.5MBでした。
ログ0行の場合のメモリ使用量は 26.8MB なので、ログ用に使用したメモリは 3.7MB という計算です。

maillog-digest.pl
#!/usr/bin/perl

#   (c) 2017 Ikeda Tomoyuki

use strict;
use warnings;
use utf8;

$| = 1;

my $buffer = OrderedHash->new();

while(<>)
{
    chomp();

    #    0     1    2     3      4      5     6
    my ($mmm, $dd, $hms, $host, $proc, $qid, $msg) = split(/\s+/, $_, 7);

    # or, adjust according to maillog format
    #    0     1    2     3      4      5     6       7
    #my ($mmm, $dd, $hms, $host, $proc, $qid, $level, $msg) = split(/\s+/, $_, 8);

    if($qid !~ /:$/) { next; }  # not like queue-id

    # ID: 'hostname|queue-id'
    my $id = sprintf('%s|%s', $host, $qid); 

    # to=
    if($msg =~ /^to=/)
    {
        my $dt = sprintf("%s %2d %s", $mmm, $dd, $hms); # day, time
        my $log = join(" ", $dt, $host, $proc, $qid, $buffer->get($id));
        $log .= $msg;

        # printf(STDOUT "%s\n", $log);
        print_digest($log);
        next;
    }

    # client=; SMTP session starts
    if($msg =~ /^client=/)
    {
        $buffer->set($msg . ", ", $id);
        next;
    }

    # from=
    if($msg =~ /^from=/)
    {
        if($buffer->get($id) =~ /, from=</) # already set 'from=', must be re-run
        {
            next;
        }
    }

    # other; message-id=, etc...
    $buffer->append($msg . ", ", $id);
}

#======================================================================

sub print_digest
{
    my ($line) = @_;

    # my @fields = split(/\s+/, $line, 7);
    my ($mmm, $dd, $hms, $host, $proc, $qid, $logmsg) = split(/\s+/, $line, 7);

    my ($msgid, $from, $to, $status) = ("", "", "", "");
    $logmsg = ' ' . $logmsg . ' ';  # add a sentinel at first and last

    if($logmsg =~ /\s+(message-id=\S+),\s/)
    {
        $msgid = $1;
    }
    if($logmsg =~ /\s+(from=\S+),\s/)
    {
        $from = $1;
    }
    if($logmsg =~ /\s+(to=\S+),\s/)
    {
        $to = $1;
    }
    if($logmsg =~ /\s+(status=\S+)\s/)
    {
        $status = $1;
    }

    my $dth = sprintf("%s %2d %s %s", $mmm, $dd, $hms, $host);
    printf(STDOUT "%s\n", join("\t", $dth, $qid, $from, $to, $status));
}

#======================================================================

#
# OrderedHash.pm
#
#   (c) 2017 Ikeda Tomoyuki
#
package OrderedHash;

use strict;
use warnings;
use utf8;

use constant DEBUG => 0;


#-----------------------------------------------------------------------
# new() ... create a new OrderedHash instance
#
#   arg:
#       $size   : Buffer Size
#
#   returns: OBJECT (blessed hash reference)
#
# Data Structure:
#
# + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
# ' ring: ARRAY, size = n                                         '
# '                                                               '
# '   +--------------------------------------------------+        '
# '   v                                                  |        '
# ' +------+     +----+     +----+      +-------+     +--------+  '
# ' |  k0  | --> | k1 | --> | k2 | ---> | (...) | --> | k(n-1) |  '
# ' +------+     +----+     +----+      +-------+     +--------+  '
# '   |            |          |                                   '
# + - | - - - - - -|- - - - - | - - - - - - - - - - - - - - - - - +
#     |            |          |
#     |            |          |
# + - | - - - - - -|- - - - - | - - - - - -+
# '   v            v          v    buffer: '
# ' +------+     +----+     +----+    HASH '
# ' |  k0  |     | k1 |     | k2 |         '
# ' +------+     +----+     +----+         '
# '   |            |          |            '
# '   |            |          |            '
# '   v            v          v            '
# ' +------+     +----+     +----+         '
# ' |  v0  |     | v1 |     | v2 |         '
# ' +------+     +----+     +----+         '
# '                                        '
# + - - - - - - - - - - - - - - - - - - - -+
#
sub new
{
    my ($class, $size) = @_;

    if(! defined($size) || $size =~ /[^0-9]/ || $size <= 0)
    {
        $size = 10000;  # default
    }

    my $self = {

        # a hash 'buffer'; KEY-VALUE pair
        # store values with each key (specified, or automatically assigned).
        'buffer'    => {},

        # an array of KEYs in a hash 'buffer'
        # ring[i] = key, i: 0..(size-1)
        'ring'  => [],  

        # size of 'buffer' = (max index for an array 'ring') + 1
        'size'  => $size,

        # write cursor, index for an array 'ring' to write next
        'write_next'    => 0,

    };
    bless $self, $class;

    return $self;
}

#-----------------------------------------------------------------------
# set() ... set a value into OrderedHash
#
#   see append()
#
#   arg:
#       $value  : a value to set, or overwrite OLDEST key-value pair.
#                 Existing value for the key is CLEARed.
#       $key    : a key for value, automatically generated if omitted
#
#   returns:
#       $key    : String
#
sub set
{
    my ($self, $value, $key) = @_;

    if(defined($key) && $self->_isExistKey($key))
    {
        $self->delete($key);
    }

    return $self->append($value, $key);
}

#-----------------------------------------------------------------------
# append() ... append a specified value to existing key-value pair
#
#   set KEY into $self->{ring}->[write_next]
#   set KEY-VALUE pair into $self->{buffer}->{KEY} = VALUE
#
#   arg:
#       $value  : a value to append to existing value for the key,
#                 or overwrite OLDEST key-value pair.
#                 Existing value for the key is NOT cleared and appended.
#       $key    : a key for value, automatically generated if omitted.
#
#   returns:
#       $key    : String
#
#
sub append
{
    my ($self, $value, $key) = @_;

    if(! $self->_isExistKey($key))
    {
        # BRANDNEW KEY! -> into next area in 'ring'
        my $cur = $self->{write_next};

        # if the OLDEST key exists in next area ->delete the oldest data
        if($self->{ring}->[$cur])   
        {
            # delete the corresponding key-value in the 'buffer'.
            $self->delete($self->{ring}->[$cur]);
        }

        # set the key into 'ring', or overwrite the oldest key
        $self->{ring}->[$cur] = $key;
        $self->{buffer}->{$key} = '';

        # write cursor steps ahead.
        $self->{write_next} = ($self->{write_next} + 1) % $self->{size};
    }

    $self->{buffer}->{$key} .= $value;

    return $key;
}


#-----------------------------------------------------------------------
# get() ... get a value at read cursor, or for a specified key
#
#   arg:
#       $key    : a key for the value
#
#   returns
#       $value  : String, maybe null string ''
#
sub get
{
    my ($self, $key) = @_;

    if(! defined($key))
    {
        return '';
    }

    my $value = $self->{buffer}->{$key};
    if(! defined($value))
    {
        $value = '';
    }
    return $value;
}

#-----------------------------------------------------------------------
# delete() ... delete a key-value pair
#
#   arg:
#       $key    : a key for the value to delete
#
#   returns:
#       boolean : 1 ->success, 0 ->FAIL
#
sub delete
{
    my ($self, $key) = @_;

    if(!defined($key)){
        return 0;   # false
    }
    delete($self->{buffer}->{$key});
    return 1;
}

#-----------------------------------------------------------------------
# _isExistKey() ... check wheather a specified key is exists or not.
#
#   arg:
#       $key    : a key for the value to search
#
#   returns:
#       boolean : EXIST -> true / NOT exist -> false
#
sub _isExistKey
{
    my ($self, $key) = @_;

    if(!defined($key)){
        return 0;   # false
    }
    return exists($self->{buffer}->{$key});
}

1;

メールログの書式を確認する

メールログは概ね以下のような形式になります。これは postfix のメールログです。

postfix-log.png

ただし、形式は syslog サービスの設定によって変わります。[mail.info]等のログレベルをプロセス名Queue IDの間に出力していることもあるでしょう。

ログの個々のエントリと意味については記載を省略します。

メール1件分のログを特定するキー

Queue ID は、SMTP server が把握できる範囲内で unique な値が振られます。

つまり別ホストのメールサーバが同時に同じ Queue ID を振ることも考えられます。複数のメールサーバのログを syslog サーバに集約している場合に、Queue ID がかち合う可能性があります。
ですので、メールログ上では「ホスト名 + Queue ID」でメール1件を特定することにします2

メールの情報をまとめる仕組み

スクリプトの処理は「メールログを【ホスト名 + Queue ID】で仕分けしながら読んで行き、必要な情報がそろったら出力する」となります。

つまり必要な情報が揃うまでは、先行して得た情報を覚えていなければなりません。

また、配送が終わってメール queue から消えたメールについては、それ以降はログに出て来ません。なのでそのメールについて覚えた情報は忘れて構いません。忘れた方がメモリ使用量の節約になります。

この「覚えて、忘れる」動作を実現するために、リングバッファを使うことにしました3。実装では、hash と配列とを組み合わせ、

  • hashに、「ホスト名 + Queue ID」を key にして、ログメッセージが現れるたびに追記する。
  • 配列に、「ホスト名 + Queue ID」が現れた順(古→新)で記録する。
  • データが一定数溜まったら配列の添え字を0に戻し、新しい「ホスト名 + Queue ID」で上書きする。
  • 上書きする前に記録されていた「ホスト名 + Queue ID」で、hash 内の古いデータを削除する。

としています。
リングバッファを簡略化した実装なので、スクリプト内では OrderedHash と名付けています。

注意点

from 情報が抜けることがあります

古いデータは忘れる、という仕様のため、配送できずに queue 内に長く残って再送を繰り替えしているようなメールについては、先行取得したデータを忘れてしまい、from の情報が抜けてしまう恐れがあります。
あらかじめ buffer サイズを多めに指定しておくことで予防できますが、その見積もりは容易ではありません。
今までの経験上、何万件のオーダーで用意しておけば実用上充分かな、という実感を持っています。

負荷

このスクリプトは、メールログを食わせる都合上、ログサーバの上、あるいはメールサーバの上で(=SMTP サーバが動いている横で)動かしたいでしょう。
過去ログの解析なら、作業用の別ホストにメールログをコピーして解析するのも良いですが、以下のように今現在新たに発生するログをその場で解析することも想定しています。

$ tail -f /var/log/maillog | ./maillog-digest.pl

そのとき、スクリプトの動作で Load Average が上がると、メールサーバの設定によっては、メールの受信を中止して負荷のさらなる上昇を抑制する、といった制御がはたらく恐れがあります。
tail 等でゆっくり maillog を流し込むなら Load Average の上昇はほぼありません。
cat 等でいっぺんに maillog を流し込む場合は、過負荷に注意してください。

もっともこれは環境依存なので、利用にあたっては事前にそれぞれの環境で検証されることをお勧めします。

調整が要るかもしれない箇所

メールログの形式が合わない場合は、スクリプト冒頭のこの部分を調整します。split() の第3引数も併せて調整してください。

    #    0     1    2     3      4      5     6
    my ($mmm, $dd, $hms, $host, $proc, $qid, $msg) = split(/\s+/, $_, 7);

    # or, adjust according to maillog format
    #    0     1    2     3      4      5     6       7
    #my ($mmm, $dd, $hms, $host, $proc, $qid, $level, $msg) = split(/\s+/, $_, 8);

Message-Id も出力したい、といった場合は、sub print_digest の中を調整してください。
sub print_digestは、手あたり次第に集約した後のデータから必要な項目を抜き出して、標準出力に出力する処理を担っています。
集約した後のデータをそのまま出力したい場合は、例えば以下のようにします。

sub print_digest
{
    my ($line) = @_;    
    printf(STDOUT "%s\n", $line);
}

バッファサイズは、スクリプト冒頭のこの部分で指定できます。new()の引数に以下のように指定します。省略時は10000です。

my $buffer = OrderedHash->new(20000);

結論

postfix のメールログを集約して、「いつ」「誰から」「誰宛の」「メールがどうなったか」を出力するスクリプトを作成した。
このスクリプトは sendmail のメールログにも適用できる。

このスクリプトは、postfix ログ 50万行(約54MB)を、30秒程度で処理した。

参考文献



  1. このスクリプトは元々 sendmail 用に書いて使っていたので、アルゴリズムが通用するのは確認済みです。今回で 3~4度目くらいの再実装です。 

  2. 1台のメールサーバで複数の SMTP server を動かすこともあり得ますが、ここでは考えません。その場合は、ログに出力するホスト名を SMTP server ごとに変えることで区別できるでしょう。それは SMTP server や syslog の設定で実現してください。 

  3. リングバッファの利用は、RRDtool(=Round Robin Database tool) から思いつきました。データベース容量の増加を抑制するため、データ領域をリング状に構成し、古いデータは上書きして捨てる設計になっています。cf. rrdtutorial 

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした