0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Raspberry Pi × AWS(Cognito, API Gateway, Lambda, S3)で構築する「セキュア」な syslog 管理

Last updated at Posted at 2025-01-09

はじめに

Raspberry Pi で収集したログを定期的に AWS S3 へ保存しつつ, エラー以上の重大なログが出た際に Slack に通知するシステムを構築します.

この記事の概要

  • Raspberry Pi (以下, ラズパイ) 上の rsyslog でログを収集
  • 5 分ごと (crontab) にまとめてAPI Gateway へ送信
  • Amazon Cognito の認証を通じてセキュアに通信
  • AWS Lambda でログを受信・解析し, 重大なログがあればSlack へ通知
  • 受信したログは Amazon S3 に保存

この実装の特徴

  1. Cognito を使うことによるセキュリティ向上

    • ラズパイ上に長期間の AWS アクセスキーを保持しなくてもよい (トークンの短期化)
    • ユーザプール & ID プールで認可を分離しやすい
  2. ログは S3 に保管しつつ, Lambda が監視・通知

    • 5 分おきバッチ送信なので, リアルタイム性は不要なケースにマッチ
    • S3 にあるログは離れたところから確認可能
    • 必要なら Athena などによる高度な分析も可能だが, S3 だけなら安価
  3. コストを抑えつつシンプルに構成

    • EC2 や ECS を立てる必要なし
    • Lambda の無料枠内で十分対応可能 (低負荷の場合)

システム構成図

下記のようなデータフローを想定しています.

ラズパイ -> (Cognito 認証) -> API Gateway -> Lambda -> (Slack 通知, S3 保存) という一連のパイプラインで, ラズパイは常時起動するサーバーを持たずに安全・簡単にログ管理ができます.

log.drawio.png

使用環境

  • Raspberry Pi 4-B (OS Lite 64-bit)
  • Python 3.11

目次(本記事の流れ)

とても長い実装になりますが, お付き合いください.


前提: syslog におけるログレベル

syslog は UNIX 系システムで広く使われるログ記録システムです.

システムやアプリケーションの状態を記録する際に, 8段階のログレベルを利用します.

各レベルは, システムの重要度に応じて分類されており, 運用上の対応の優先度を示します.

レベル 重要度 説明
Emergency 0 (最重要) システムが利用不能. 完全な停止状態や壊滅的な障害
Alert 1 即時対応が必要な状態. 例: 重要なセキュリティ違反
Critical 2 クリティカルな問題. 例: ディスク障害や重要なサービスのクラッシュ
Error 3 一般的なエラー. 例: ファイルやサービスの失敗, リソース不足など
Warning 4 警告レベルの状態. 例: 使用量が閾値に近い, 将来的に問題になる可能性
Notice 5 通常動作中の重要な状態. 例: 設定変更やサービスの再起動
Informational 6 一般的な動作情報. 例: 通常のサービス起動や接続ログ
Debug 7 デバッグ用の詳細な情報. 例: 開発者向けのトラブルシューティングデータ

参考

今回通知するログレベルは, Emergency, Alert, Critical, Errorです.

これらは, システムの運用に直接影響を与える問題と検知し, Slackに通知します.

1. Raspberry Pi 環境構築

1-1. パッケージ更新

sudo apt-get update
sudo apt-get upgrade

1-2. 必要パッケージのインストール

sudo apt-get -y install python3-pip vim rsyslog

1-3. 仮想環境(venv)の作成

python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install boto3 warrant requests requests-aws4auth python-dotenv

warrant ライブラリが Python3.11 以降で collections.Mapping エラーを起こす場合があります.
該当箇所を collections.abc へ修正する必要があります(2024/12/29 現在)

解決法
/home/pi/.venv/lib/python3.11/site-packages/jose/jws.py

vim /home/pi/.venv/lib/python3.11/site-packages/jose/jwt.py
を以下のように手動で変更します.

- from collections import Mapping

+ from collections.abc import Mapping

もし, それでも解決しない場合は以下を実行したら解決するかもしれません.

pip install --upgrade pycryptodome

1-4. rsyslog の設定(ログフォーマットの変更)

rsyslogは設定ファイルで, ログフォーマットを変更することができます.
従来のフォーマットだと情報が不足しているので, カスタムします.

vim /etc/rsyslog.conf

以下を追記します.

今回は/var/log/syslogをログファイルとして使用します.

# カスタムテンプレートの定義
template(name="CustomLogFormat" type="string" string="%timegenerated:1:19:date-rfc3339% %HOSTNAME% %syslogfacility-text%.%syslogseverity-text%: %syslogtag%%msg%\n")

# syslog 設定例
*.*;auth,authpriv.none    -/var/log/syslog;CustomLogFormat

これで, /var/log/syslogには以下のような形式で出力されます.

2024-12-30 22:50:33 raspberrypi daemon.info: dhcpcd[727]: ...

2. Amazon Cognito の作成

今回使用する認可の方法は, AWS の一時クレデンシャルを用いる方法です.

こちらの記事が詳しく書いています.

2-1. ユーザプール(User Pool)

以下の手順で作成します.

  1. Machine to Machine でアプリケーションを作成
  2. サインイン識別子: 「ユーザー名」をチェック
  3. 必須属性: メールアドレス (指定しなければならない仕様のため)
  4. パスワードポリシーや最低文字数の設定を変更
    • デフォルト -> カスタム
    • パスワードの最小文字数: 12
    • パスワード要件: 少なくとも1つの数字を含む
  5. "ユーザー名とパスワード(ALLOW_USER_PASSWORD_AUTH)を使用してサインインします"を有効化します.

ユーザ作成

ユーザ名は, ラズパイのシリアルアドレスを使います.
以下で取得できます.

grep Serial /proc/cpuinfo | awk '{print substr($3,9,8)}'


仮パスワードは, パスワード要件を満たす任意の12文字を指定します.
例) hogehoge1234など


正規のパスワードは, ラズパイのMACアドレスを使います.
以下で取得できます.

ifconfig eth0|grep ether|awk '{print $2}'|sed 's/://g'

メールアドレスは, メールアドレスの形を満たす任意の文字列から適当に使います. ここで, ユニークなものにするのにシリアルアドレスなどを使うこともできます.

例) rasp-<serial-address>@hogehoge.com

さらに, 検証済みとチェックします.

仮パスワードの変更

仮パスワードを正規のパスワードに変更します.

ローカルpcで以下のスクリプトに必要情報を追記し, 実行して, Secret Hashを得ます.

import hmac
import hashlib
import base64

def calculate_secret_hash(client_id, client_secret, username):
    message = username + client_id
    dig = hmac.new(
        client_secret.encode('utf-8'),
        msg=message.encode('utf-8'),
        digestmod=hashlib.sha256
    ).digest()
    return base64.b64encode(dig).decode()

client_id = "<client-id>"           # アプリクライアントID
client_secret = "<client-secret>"   # アプリクライアントシークレット
username = "<user_name>"             # サインインに使用するユーザー名

secret_hash = calculate_secret_hash(client_id, client_secret, username)
print(f"SECRET_HASH: {secret_hash}")

AWS CLIで以下を実行します.

# 仮パスワードでサインイン (USER_PASSWORD_AUTH)
aws cognito-idp initiate-auth \
    --client-id <client-id> \
    --auth-flow USER_PASSWORD_AUTH \
    --auth-parameters USERNAME="<user_name>",PASSWORD="<password>",SECRET_HASH="<secret_hash>" \
    --region ap-northeast-1

出力例:

{
    "ChallengeName": "NEW_PASSWORD_REQUIRED",
    "Session": "<session>",
    "ChallengeParameters": {
        "USER_ID_FOR_SRP": "<user_name>",
        "requiredAttributes": "[]",
        "userAttributes": "{\"email_verified\":\"true\",\"email\":\"<email>\"}"
    }
}

レスポンス内に NEW_PASSWORD_REQUIRED が含まれたら, その Session を使って新パスワードを設定します.

aws cognito-idp respond-to-auth-challenge \
    --client-id <client-id> \
    --challenge-name NEW_PASSWORD_REQUIRED \
    --challenge-responses USERNAME="<user_name>",NEW_PASSWORD="<new_password>",SECRET_HASH="<secret_hash>" \
    --session "<session>" \
    --region ap-northeast-1

これでユーザは「確認済み」となり, 次回以降は新パスワードでログインできます.

救世主

2-2. ID プール(Identity Pool)

  1. 認証されたアクセス -> Amazon Cognito ユーザプールを有効化
  2. 新規 IAM ロール を作成し, 上記ユーザプール ID とクライアント ID を紐づけ
  3. その他は, デフォルト設定で OK

3. AWS S3 のバケット作成

Lambdaがログを保存するためのバケットを作成します.

  • バケット名は, 任意
  • バケットポリシーは特に不要

4. Slack Webhook 準備

以下の手順で, Slack に通知するために Webhook URLを取得します.

  1. Slack APIから, Incoming Webhookを有効化
  2. Add new Webhookを選択し, Slack のチャンネルを選ぶ
  3. Webhook URLを取得

5. AWS Lambdaの作成

以下の手順で作成します.

  1. 1から作成を選択
  2. ランタイム -> Python 3.13
  3. アーキテクチャ -> arm64

5-1. 環境変数の設定

設定タブから, S3のバケット名と先ほど取得した Webhook URLを環境変数に指定します.

キーはそれぞれ, BUCKET_NAME, WEBHOOK_URLとしました.

5-2. コードの作成

全体の流れ

  1. API Gateway からリクエストを受け取り (body 内に device_id, timestamp, logs) が含まれる
  2. ログを 1 行ずつ解析 -> 重大なログ (err 以上) を検知したら Slack に通知
  3. ログは S3 に JSON Lines 形式で保存
  4. 成功レスポンスを返す

コード全体

import os
import json
import boto3
import urllib.request
import urllib.error
from datetime import datetime

BUCKET_NAME = os.environ['BUCKET_NAME']
WEBHOOK_URL = os.environ['WEBHOOK_URL']
S3 = boto3.resource('s3')

def make_data(device_id, timestamp):
    """
    先頭: 要約文, ログ送信時間(timestamp), device_id,
    一行毎:
    loglevel, process, facility, timestamp,
    message
    """
    send_data = {
        "blocks": [
            {
                "type": "rich_text",
                "elements": [
                    {
                        "type": "rich_text_section",
                        "elements": [
                            {
                                "type": "emoji",
                                "name": "rotating_light",
                                "unicode": "1f6a8"
                            },
                            {
                                "type": "text",
                                "text": "以下のログを検知しました.\n"
                            }
                        ]
                    },
                    {
                        "type": "rich_text_quote",
                        "elements": [
                            {
                                "type": "text",
                                "text": "デバイス名:",
                                "style": {
                                    "bold": True
                                }
                            },
                            {
                                "type": "text",
                                "text": f'{device_id}\n'
                            },
                            {
                                "type": "text",
                                "text": "ログ送信日時:",
                                "style": {
                                    "bold": True
                                }
                            },
                            {
                                "type": "text",
                                "text": timestamp.strftime("%m/%d/%Y, %H:%M:%S")
                            }
                        ]
                    }
                ]
            },
        ]
    }

    for i in UNUSUAL_LOGS:
        parts = i.split(None, 5)
        time = f'{parts[0]} {parts[1]}'
        facility = parts[3].split(".")[0]
        logLevel = parts[3].split(".")[1].split(':')[0]
        process = parts[4].rstrip(':')
        message = parts[5]

        section = {
            "type": "section",
            "fields": [
                {"type": "mrkdwn", "text": f"*Log Level:*\n{logLevel}"},
                {"type": "mrkdwn", "text": f"*Process:*\n{process}"},
                {"type": "mrkdwn", "text": f"*facility:*\n{facility}"},
                {"type": "mrkdwn", "text": f"*Timestamp:*\n{time}"},
                {"type": "mrkdwn", "text": f"*message:*\n{message}"}
            ]
        }
        divider = {"type": "divider"}
        send_data["blocks"].append(divider)
        send_data["blocks"].append(section)
        send_data["blocks"].append(divider)

    return send_data

def send_message_to_slack(device_id, timestamp):
    send_data = make_data(device_id, timestamp)
    request_url = WEBHOOK_URL
    send_text = json.dumps(send_data).encode('utf-8')

    try:
        request_post = urllib.request.Request(url=request_url, method="POST", data=send_text)
        with urllib.request.urlopen(request_post) as res:
            body = res.read().decode()
            print(body)
    except urllib.error.HTTPError as error:
        status_code = error.code
        print("エラーログなし %s\n URL: %s" % (status_code, request_url))
    except urllib.error.URLError as error:
        status_code = "HTTP通信の不可"
        print(status_code)

def parse(log, device_id):
    """
    ログを分解してする関数.
    emergency, alert, critical, error
    を検知したら, エラーログ専用配列に追加する.

    Args:
        log (string): ログ本文
        device_id (string): デバイス識別子(MACアドレス)

    Returns:
        json: ログの分解後
    """
    parts = log.split(None, 5)
    logLevel = parts[3].split(".")[1].split(':')[0]
    if logLevel in ['emerg', 'alert', 'crit', 'err']:
        UNUSUAL_LOGS.append(log)

    return json.dumps({
        "timestamp": f'{parts[0]} {parts[1]}',
        "hostname": parts[2],
        "component": parts[3].rstrip(':'),
        "process": parts[4].rstrip(':'),
        "message": parts[5]
    })

def lambda_handler(event, context):
    global UNUSUAL_LOGS
    UNUSUAL_LOGS = []

    try:
        # イベントの構造を確認
        if isinstance(event.get('body'), str):
            body = json.loads(event['body'])
        elif isinstance(event.get('body'), dict):
            body = event['body']
        else:
            body = event
    except json.JSONDecodeError:
        return {
            'statusCode': 400,
            'body': json.dumps('Invalid JSON in request body')
        }

    logs = body['logs']
    device_id = body['device_id']
    timestamp = datetime.fromisoformat(body['timestamp'])
    file_key = f"logs/{timestamp.year}/{timestamp.month:02d}/{timestamp.day:02d}/{device_id}_logs{timestamp.strftime('%Y%m%d_%H%M%S')}.jsonl"

    log_lines = '\n'.join(parse(log, device_id) for log in logs)
    print(log_lines)

    if UNUSUAL_LOGS:
        send_message_to_slack(device_id, timestamp)

    try:
        obj = S3.Object(BUCKET_NAME, file_key)
        obj.put(Body=log_lines)
        print(f"File uploaded successfully: {file_key}")
    except Exception as e:
        print(f"Error uploading file: {str(e)}")

    return {
        'statusCode': 200,
        'body': json.dumps(f'This is {device_id}. We accepted your logs')
    }

(a) Slack 通知用メッセージの組み立て (make_data 関数)

  • Slack のメッセージをBlock Kit形式の JSON として組み立てる関数
  • 先頭で「デバイス名」「ログ送信日時」の要約文を作成
  • UNUSUAL_LOGS(重大ログリスト) の各行をループして、ログレベル・プロセス名・タイムスタンプ・メッセージなどをSlackのブロックに追加

どのデバイスから送られたのかを示すために, device_idとしてSerialアドレスを採用しています.

def make_data(device_id, timestamp):
    """
    先頭: 要約文, ログ送信時間(timestamp), device_id,
    一行毎:
    loglevel, process, facility, timestamp,
    message
    """
    send_data = {
        "blocks": [
            {
                "type": "rich_text",
                "elements": [
                    {
                        "type": "rich_text_section",
                        "elements": [
                            {
                                "type": "emoji",
                                "name": "rotating_light",
                                "unicode": "1f6a8"
                            },
                            {
                                "type": "text",
                                "text": "以下のログを検知しました.\n"
                            }
                        ]
                    },
                    {
                        "type": "rich_text_quote",
                        "elements": [
                            {
                                "type": "text",
                                "text": "デバイス名:",
                                "style": {
                                    "bold": True
                                }
                            },
                            {
                                "type": "text",
                                "text": f'{device_id}\n'
                            },
                            {
                                "type": "text",
                                "text": "ログ送信日時:",
                                "style": {
                                    "bold": True
                                }
                            },
                            {
                                "type": "text",
                                "text": timestamp.strftime("%m/%d/%Y, %H:%M:%S")
                            }
                        ]
                    }
                ]
            },
        ]
    }

    for i in UNUSUAL_LOGS:
        ...
        # 各行のログ情報をSlackブロックに組み込む

    return send_data

(b) Slack Webhook へ通知する (send_message_to_slack)

  • make_data() で準備した Slack 用のメッセージ JSON を、Slack の Incoming Webhook URL に HTTP POST 送信
def send_message_to_slack(device_id, timestamp):
    send_data = make_data(device_id, timestamp)
    request_url = WEBHOOK_URL
    send_text = json.dumps(send_data).encode('utf-8')

    try:
        request_post = urllib.request.Request(url=request_url, method="POST", data=send_text)
        with urllib.request.urlopen(request_post) as res:
            body = res.read().decode()
            print(body)
    ...

(c) ログの解析(parse)

  • 1 行のログテキスト(例: "2024-12-31 13:50:33 raspberrypi daemon.err: dhcpcd[727]: some message") を分割し、重要ログ (err 以上) なら UNUSUAL_LOGS に追加
  • 後にS3に保存するため, 戻り値として、そのログを JSON 形式に変換した文字列を返す
def parse(log, device_id):
    parts = log.split(None, 5)
    logLevel = parts[3].split(".")[1].split(':')[0]
    if logLevel in ['emerg', 'alert', 'crit', 'err']:
        UNUSUAL_LOGS.append(log)

    return json.dumps({
        "timestamp": f'{parts[0]} {parts[1]}',
        "hostname": parts[2],
        "component": parts[3].rstrip(':'),
        "process": parts[4].rstrip(':'),
        "message": parts[5]
    })

(d) メイン処理 (lambda_handler)

  • API Gateway から受け取ったリクエスト (event) を処理するエントリポイント
  • ラズパイから送信された logs, device_id, timestamp などを使い,
    • ログ解析 (parse)
    • 重大ログあれば Slack 通知(send_message_to_slack)
    • JSON Lines 形式で S3 に保管
  • 最終的に 200 OK のレスポンスを返す

S3では, 取得した全ログを JSON Lines 形式で1ファイルにまとめ, logs/YYYY/MM/DD/deviceID_logsYYYYMMDD_HHMMSS.jsonl のように日付・デバイス別に保管します.

def lambda_handler(event, context):
    global UNUSUAL_LOGS
    UNUSUAL_LOGS = []

    # 1) イベントの構造を確認し、JSONデコード
    if isinstance(event.get('body'), str):
        body = json.loads(event['body'])
    elif isinstance(event.get('body'), dict):
        body = event['body']
    else:
        body = event

    logs = body['logs']
    device_id = body['device_id']
    timestamp = datetime.fromisoformat(body['timestamp'])

    # 2) S3に保存するファイルキーを組み立て
    file_key = f"logs/{timestamp.year}/{timestamp.month:02d}/{timestamp.day:02d}/{device_id}_logs{timestamp.strftime('%Y%m%d_%H%M%S')}.jsonl"

    # 3) ログを一行ずつparseし、JSON Linesを作る
    log_lines = '\n'.join(parse(log, device_id) for log in logs)
    print(log_lines)

    # 4) もし重大ログがあればSlackへ通知
    if UNUSUAL_LOGS:
        send_message_to_slack(device_id, timestamp)

    # 5) S3にアップロード
    try:
        obj = S3.Object(BUCKET_NAME, file_key)
        obj.put(Body=log_lines)
        print(f"File uploaded successfully: {file_key}")
    except Exception as e:
        print(f"Error uploading file: {str(e)}")

    return {
        'statusCode': 200,
        'body': json.dumps(f'This is {device_id}. We accepted your logs')
    }

5-3. IAM ポリシー設定

Lambda が S3 に PutObject できるように, 以下のポリシーをロールに付与します.

Deployを忘れずに.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::<バケット名>",
                "arn:aws:s3:::<バケット名>/*"
            ]
        }
    ]
}

5-4. テスト

テストイベントを作成して, ログがS3にアップされるか確認します.

テストタブからテストを作成します.
今回は以下を使います.

{
        "body": "{\"device_id\": \"THISISTEST\", \"timestamp\": \"2024-07-13T15:40:03.587907\", \"logs\": [\"2024-07-06 13:50:33 raspberrypi daemon.error: dhcpcd[727]: wlan0: Router Advertisement from 1234abcdeaaaaaaaaaa\", \"2024-07-06 13:50:33 raspberrypi daemon.critical: dhcpcd[727]: wlan0: adding address 1234:a73:a3e0:100:a24a:9ee:5bv1:1ta2/84\"]}"
}

テストボタンを押し,

{
  "statusCode": 200,
  "body": "\"This is THISISTEST. We accepted your logs\""
}

と出力されば正常に動作しています.

また, S3のバケットに以下のように追加されていたら, S3に追加するのも正常です.

image.png

6. API Gateway の作成

以下の手順で API を作成します.

  1. REST API を選択し、新規作成
  2. メソッド: POST を作って, 統合先に Lambda を選択
  3. メソッドリクエスト: I AM
  4. リクエストパラメータ: 本文、クエリ文字列パラメータ、及びヘッダーを検証
  5. Deploy して新しいステージ (dev 等) を作成

URLは, https://<api_id>.execute-api.us-east-1.amazonaws.com/devこのような形になります.

6-1. Cognito オーソライザーの設定

  1. オーソライザー名: 任意
  2. タイプ: Cognito
  3. ユーザープール: 作成したユーザプール
  4. トークンのソース: Authorization

これにより, Authorization: <JWT> ヘッダーが正しい場合のみ API を通す設定が可能になります.

6-2. ID プールと API Gateway Invokeの関連付け

ID プール側のデフォルトロールに, 下記のようなポリシーをアタッチし、API Gateway へのアクセス権を付与します.

まず, 以下のようなポリシーを任意の名前で作成します.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "execute-api:Invoke",
      "Resource": "arn:aws:execute-api:<region>:<account-id>:<api-id>/*/POST/*"
    }
  ]
}

デフォルトロールに作成したポリシーをアタッチします.

このロールが見つけにくいので,
ID プール -> ユーザプール -> ID プロパイダーに進むと, デフォルトロールが見つけられます.

6-3. マッピングテンプレートの作成

統合リクエスト -> マッピングテンプレートと進みます.

コンテンツタイプをapplication/jsonに指定します.

本文は以下のようにします.

{
  "body": $input.json('$')
}

6-4. ここまでのテスト

以下を実行し, テストします.

logger -p info "Hi"
python send_log.py

S3にログのファイルが作成されていることを確認し, 以下のように返ってきていれば完了です.

{"statusCode": 200, "body": "\"This is <device_id>. We accepted your logs\""}

7. ラズパイ側でログ送信スクリプトを実装

7-1. .envを作成

ラズパイ側に環境変数を保持する.envファイルを作って行きます.

vim /home/pi/.env

以下のような必要情報を記載します.

AWS_REGION=<region>
AWS_USERPOOLID=<userpool_id>
AWS_CLIENTID=<client_id>
AWS_IDPOOLID=<idpool_id>
AWS_USERNAME="<user_name>"
AWS_PASSWORD="<password>"
AWS_CLIENTSECRET=<client_secret>
AWS_ENDPOINTURL="https://<api_id>.execute-api.us-east-1.amazonaws.com/dev
"

7-2. ログ送信スクリプト

今回は5分毎に送信するので, 最後にログを送信した時刻を保持するファイルを作ります.

touch /home/pi/timestamp.txt

ログ送信スクリプトを作成します.

vim /home/pi/send_log.py

コード全文

import os
import boto3
import hmac
import hashlib
import base64
import requests
import subprocess
from datetime import datetime, timedelta
from dotenv import load_dotenv
from warrant.aws_srp import AWSSRP
from requests_aws4auth import AWS4Auth

SYSLOG_PATH='/var/log/syslog'
TIMESTAMP_FILE = "/home/pi/timestamp.txt"
load_dotenv()  # take environment variables from .env.
aws_userPoolId = os.environ['AWS_USERPOOLID']
aws_identityPoolId = os.environ['AWS_IDPOOLID']
aws_region = os.environ['AWS_REGION']
aws_clientId = os.environ['AWS_CLIENTID']
aws_userName = os.environ['AWS_USERNAME']
aws_password = os.environ['AWS_PASSWORD']
aws_clientSecret = os.environ['AWS_CLIENTSECRET']
aws_endPointUrl = os.environ['AWS_ENDPOINTURL']

def calculate_secret_hash(client_id, client_secret, username):
    message = username + client_id
    dig = hmac.new(
        client_secret.encode('utf-8'),
        msg=message.encode('utf-8'),
        digestmod=hashlib.sha256
    ).digest()
    return base64.b64encode(dig).decode()

def get_cognito_credentials():
    idp = boto3.client('cognito-idp', region_name=aws_region)
    identity = boto3.client('cognito-identity', region_name=aws_region)
    response = idp.initiate_auth(
        AuthFlow='USER_PASSWORD_AUTH',
        AuthParameters={
            'USERNAME': aws_userName,
            'PASSWORD': aws_password,
            'SECRET_HASH': calculate_secret_hash(aws_clientId, aws_clientSecret, aws_userName)
        },
        ClientId=aws_clientId
    )

    id_token = response['AuthenticationResult']['IdToken']

    logins = {f'cognito-idp.{aws_region}.amazonaws.com/{aws_userPoolId}': id_token}
    identity_response = identity.get_id(
        IdentityPoolId=aws_identityPoolId,
        Logins=logins
    )

    credentials = identity.get_credentials_for_identity(
        IdentityId=identity_response['IdentityId'],
        Logins=logins
    )

    return credentials['Credentials']


def write_last_time(time):
    with open(TIMESTAMP_FILE, "w") as f:
        f.write(str(time))

def read_last_time():
    try:
        with open(TIMESTAMP_FILE, "r") as f:
            time_str = f.readline().strip()
            if time_str:
                return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
    except (FileNotFoundError, ValueError):
        pass

    return datetime.strptime((datetime.now() - timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')


def read_log():
    last_time = read_last_time()
    new_logs = []
    latest_time = last_time
    with open(SYSLOG_PATH, "r") as f:
        lines = [line.strip() for line in f if line.strip()]
        for line in lines:
            date_part = line[:19].replace('T', ' ')
            log_time = datetime.strptime(date_part, '%Y-%m-%d %H:%M:%S')
            updated_line = date_part + line[19:]
            if log_time > last_time:
                new_logs.append(updated_line.strip())
                if log_time > latest_time:
                    latest_time = log_time

    if new_logs:
        write_last_time(latest_time)
    return new_logs

def post_log(endpoint_url, logs):
    load_dotenv()
    region = os.environ['AWS_REGION']
    credentials = get_cognito_credentials()
    access_key_id = credentials['AccessKeyId']
    secret_key = credentials['SecretKey']
    session_token = credentials['SessionToken']
    output = subprocess.check_output("grep Serial /proc/cpuinfo | awk '{print substr($3,9,8)}'", shell=True)
    device_id = output.strip().decode('utf-8')

    auth = AWS4Auth(access_key_id, secret_key, region, 'execute-api', session_token=session_token)
    payload = {
        "device_id": device_id,
        "timestamp": datetime.now().isoformat(),
        "logs": logs
    }
    headers = {"Content-Type": "application/json"}
    try:
        responce = requests.post(endpoint_url, auth=auth, headers=headers, json=payload)
        responce.raise_for_status()
        print(responce.text)
    except requests.exceptions.RequestException as e:
        print("Log post error: ", e)
        print(responce.text)

if __name__ == '__main__':
    endpoint_url = aws_endPointUrl
    logs = read_log()
    print(logs)
    post_log(endpoint_url, logs)

(a) 環境変数の読み込み (dotenv & os.environ)

.env に書かれた AWS 関連の設定 (ユーザプール ID, ID プール ID, リージョン, アプリクライアント ID, ユーザ名・パスワード, エンドポイント URL) を環境変数として取得します.

コード内で直接キーを書き込まないので, セキュリティや環境の切り替えが簡単です.

load_dotenv()  # .envファイルを読み込む

aws_userPoolId = os.environ['AWS_USERPOOLID']
aws_identityPoolId = os.environ['AWS_IDPOOLID']
aws_region = os.environ['AWS_REGION']
aws_clientId = os.environ['AWS_CLIENTID']
aws_userName = os.environ['AWS_USERNAME']
aws_password = os.environ['AWS_PASSWORD']
aws_clientSecret = os.environ['AWS_CLIENTSECRET']
aws_endPointUrl = os.environ['AWS_ENDPOINTURL']

(b) Cognito 認証用のハッシュ計算 (calculate_secret_hash)

  • Cognito の「アプリクライアントシークレット」を使ってSECRET_HASH を生成
  • USER_PASSWORD_AUTH で認証する際の, SECRET_HASH パラメータを送るのに使用
  • HMAC-SHA256 -> Base64 エンコードした文字列
def calculate_secret_hash(client_id, client_secret, username):
    message = username + client_id
    dig = hmac.new(
        client_secret.encode('utf-8'),
        msg=message.encode('utf-8'),
        digestmod=hashlib.sha256
    ).digest()
    return base64.b64encode(dig).decode()

(c) Cognito で ID トークン&一時的な AWS クレデンシャルを取得 (get_cognito_credentials)

ラズパイ上で長期的な IAM ユーザーのアクセスキーを持たないための鍵となる仕組みです.

Cognito により, 短期認証 & 一時的クレデンシャル が得られるのでセキュアです.

  • Cognito ユーザプール (cognito-idp) で USER_PASSWORD_AUTH を実行し、IdTokenを取得
  • Cognito ID プール (cognito-identity) に対し, この IdToken を渡して一時的な AWS クレデンシャル (AccessKey / SecretKey / SessionToken) を入手
  • 返り値は credentials['Credentials'] にまとめられており, AccessKeyId, SecretKey, SessionToken, Expirationがある
def get_cognito_credentials():
    idp = boto3.client('cognito-idp', region_name=aws_region)
    identity = boto3.client('cognito-identity', region_name=aws_region)
    response = idp.initiate_auth(
        AuthFlow='USER_PASSWORD_AUTH',
        AuthParameters={
            'USERNAME': aws_userName,
            'PASSWORD': aws_password,
            'SECRET_HASH': calculate_secret_hash(aws_clientId, aws_clientSecret, aws_userName)
        },
        ClientId=aws_clientId
    )

    id_token = response['AuthenticationResult']['IdToken']

    logins = {f'cognito-idp.{aws_region}.amazonaws.com/{aws_userPoolId}': id_token}
    identity_response = identity.get_id(
        IdentityPoolId=aws_identityPoolId,
        Logins=logins
    )

    credentials = identity.get_credentials_for_identity(
        IdentityId=identity_response['IdentityId'],
        Logins=logins
    )

    return credentials['Credentials']

(d) ログ取得時間の管理 (write_last_time / read_last_time)

  • 直近ログを取得した日時をファイル (timestamp.txt) に書き込み・読み込みする仕組み
  • read_last_time():前回処理した最後の時刻を読み込み, もしファイルがなければ現在時刻の 5 分前をデフォルトに設定
  • write_last_time():今回取得した最新ログのタイムスタンプを記録し, 次回はそこから後のログだけを送る
def write_last_time(time):
    with open(TIMESTAMP_FILE, "w") as f:
        f.write(str(time))

def read_last_time():
    try:
        with open(TIMESTAMP_FILE, "r") as f:
            time_str = f.readline().strip()
            if time_str:
                return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
    except (FileNotFoundError, ValueError):
        pass

    return datetime.strptime(
        (datetime.now() - timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S'),
        '%Y-%m-%d %H:%M:%S'
    )

(e) syslog を読み, 前回以降のログだけ抽出 (read_log)

  1. read_last_time() で前回取得した最後のタイムスタンプを参照
  2. /var/log/syslog を一行ずつ読み込み, 行頭 19 文字を日時 (YYYY-MM-DD HH:MM:SS) として解析
  3. もしそのログの時刻が last_time より新しければ new_logs に追加
  4. 最終的に発見した最新のログ時刻を write_last_time() で更新
def read_log():
    last_time = read_last_time()
    new_logs = []
    latest_time = last_time
    with open(SYSLOG_PATH, "r") as f:
        lines = [line.strip() for line in f if line.strip()]
        for line in lines:
            date_part = line[:19].replace('T', ' ')
            log_time = datetime.strptime(date_part, '%Y-%m-%d %H:%M:%S')
            updated_line = date_part + line[19:]
            if log_time > last_time:
                new_logs.append(updated_line.strip())
                if log_time > latest_time:
                    latest_time = log_time

    if new_logs:
        write_last_time(latest_time)
    return new_logs

(f) ログを API Gateway に POST (post_log)

  • AWS4Auth により, IAM 認証が必要な API Gateway のエンドポイントにアクセスできる
  • 短期クレデンシャルが有効期限切れになるたびに再取得され, セキュリティを保ちながらログ送信
  • device_id がユニークであれば, サーバー側 (Lambda) でどのラズパイから送られたログか区別できる

動作

  1. Cognito で取得した一時的認証情報 (AccessKey, SecretKey, SessionToken) を使って AWS4Auth を作成

  2. Raspberry Pi のシリアル番号を grep Serial /proc/cpuinfo で抽出し, device_id として識別子に使う

  3. requests.post() で API Gateway (endpoint_url) に対して JSON を送信

def post_log(endpoint_url, logs):
    load_dotenv()
    region = os.environ['AWS_REGION']
    credentials = get_cognito_credentials()
    access_key_id = credentials['AccessKeyId']
    secret_key = credentials['SecretKey']
    session_token = credentials['SessionToken']

    output = subprocess.check_output("grep Serial /proc/cpuinfo | awk '{print substr($3,9,8)}'", shell=True)
    device_id = output.strip().decode('utf-8')

    auth = AWS4Auth(access_key_id, secret_key, region, 'execute-api', session_token=session_token)

    payload = {
        "device_id": device_id,
        "timestamp": datetime.now().isoformat(),
        "logs": logs
    }
    headers = {"Content-Type": "application/json"}

    try:
        responce = requests.post(endpoint_url, auth=auth, headers=headers, json=payload)
        responce.raise_for_status()
        print(responce.text)
    except requests.exceptions.RequestException as e:
        print("Log post error: ", e)
        print(responce.text)

(g) メイン処理 (if name == 'main':)

  1. .env から読み込んだ aws_endPointUrl を使って 送信先 API Gateway を決定
  2. read_log() で新規ログを取得し, 内容を print で確認 (デバッグ)
  3. post_log(endpoint_url, logs) で API Gateway に送信
if __name__ == '__main__':
    endpoint_url = aws_endPointUrl
    logs = read_log()
    print(logs)
    post_log(endpoint_url, logs)

8. crontab で5分ごとに実行

crontab を使っていきます.

crontab -e

以下のように書き込みます.

*/5 * * * * /home/pi/.venv/bin/python /home/pi/send_log.py

仮想環境のPythonを直接設定することで, 仮想環境を直接使うことが出来ます.

9. 動作確認

以下の手順で動作を確認します.

  1. logger -p err "This is error" などでエラーログを投入
  2. 5 分後, Slack に通知が来ているか & S3 にログファイルが増えているかを確認
  3. logger -p info "This is info" などで普通のログが通知されないことも確認

10. まとめ

  • Cognito を使うメリット: Raspberry Pi 側に長期的な AWS 認証情報を置かないため安全
  • S3 + Lambda のシンプルな構成: リアルタイム性は不要だが、定期的にまとめてログを送る場合に最適
  • コスト: Lambda 無料枠内であればほぼ無償、S3 保存料金 (数 GB 程度であれば格安), Cognito ID プールも無料枠が広い

以上で, Raspberry Pi × AWS で構築するセキュアな syslog 管理システム の紹介でした.

必要に応じて通知の仕組みを拡張し, Slack だけでなくメール・SNS などにも通知できるようにしてみてください.

ご質問やご要望などあればコメントいただけると幸いです.

コード全文

参考文献

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?