LoginSignup
1
0

AWS Lambdaでpython-gnupgでPGP暗号化を実装するサンプルコード

Last updated at Posted at 2023-12-24

はじめに

PGP暗号化をLambdaで実装する際に、少しだけコードで苦戦したのでメモしておく。
あくまでもサンプルコード(参考)としてみてください。

使用したAWSサービス

  • Lambda
  • SecretsManager
  • S3

前提

  • 以下リンクの「AWS Lambdaでpython-gnupgのライブラリ実装方法」が完了していること。

  • ランタイムpython3.9で実装
  • S3にファイルがputされたらLambdaが起動する(別途設定が必要)
  • SecretsManagerにテキスト形式で鍵を登録しておく
  • SecretsManagerのReginonとLambdaのRegionは同一Regionであること
  • Lambdaのロールには使用するサービスが許可されていること(とりあえずフルアクセス付与)

アーキ図

pgp3.png

コード

lambda_function.py
import os
import gnupg
import boto3
import glob
from logging import getLogger

logger = getLogger(__name__)


def get_file_name(file_key):
    """ファイル名取得

    キーの右端のファイル名を取得.

    Args:
        file_key (String): ファイルキー.
    
    Return:
        ファイル名 (String).
    """

    if not "/" in file_key:
        return file_key

    return file_key.rsplit("/", 1)[1]

def get_secret(secret_name, secret_region):
    """SecretsManagerよりsecret情報取得
        
     接続情報をsecretmanagerより取得する.

    Args:
        secret_name    (String): secret名.
        secret_region  (String): secretリージョン名.
        
    Return:
        secret (Object)
    """

    session = boto3.session.Session()
    client = session.client(service_name="secretsmanager", region_name=secret_region)

    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)

    except Exception as e:
        logger.error("SecretsManager get error:" + str(e) + " SECRET_NAME:" + secret_name + " SECRET_REGION:" + secret_region)
        raise

    else:
        if "SecretString" in get_secret_value_response:
            secret = get_secret_value_response["SecretString"]

    return secret


def s3_put_file(s3, bucket_name, upload_contents, key):
    """ファイルS3保存
     
     処理した内容をS3に保存.

    Args:
        s3                  (Object): オブジェクト.
        bucket_name         (String): バケット名.
        upload_contents     (String): アップロード内容.
        key                 (String): キー名.
    """

    try:
        s3.Bucket(bucket_name).upload_file(upload_contents, key)
        logger.info("File upload success!! Key:" + bucket_name + "/" + key)
    
    except Exception as e:
        logger.error("File upload error:" + str(e) + " Key:" + bucket_name + "/" + key)
        raise


def delete_file(path):
    """ファイルの削除

     所定の場所のファイルを削除する.

    Args:
        path (String): パス.
    """

    try:
        os.remove(path)
        logger.info("File delete success:" + path)

    except Exception as e:
        logger.info("File delete error:" + str(e) + " path:" + path)


def lambda_handler(event, context):
    try:
        logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
        logger.info(str(event))
        get_bucket_name         = event['Records'][0]['s3']['bucket']['name']
        file_object_key         = event['Records'][0]['s3']['object']['key']
        SECRET_NAME             = os.environ["SECRET_NAME"]
        SECRET_REGION           = os.environ["SECRET_REGION"]
        EMAIL_ADDRESS           = os.environ["EMAIL_ADDRESS"]
        PUT_BUCKET_NAME         = os.environ["PUT_BUCKET_NAME"]

        put_key                 = file_object_key + ".pgp"
        file_object_name        = get_file_name(file_object_key)
        tmp_dl_file_key         = "/tmp/" + file_object_name
        tmp_encrypted_file_key  = "/tmp/" + file_object_name + ".pgp"
        s3_resource             = boto3.resource("s3")
    
        gpg = gnupg.GPG(gnupghome="/tmp", gpgbinary="./gpg")
        key_data = get_secret(SECRET_NAME, SECRET_REGION)
        logger.debug(key_data)
        gpg.import_keys(key_data)
        logger.info("Key import success!")

        bucket = s3_resource.Bucket(get_bucket_name)
        bucket.download_file(file_object_key, tmp_dl_file_key)        
        logger.info("File download success!:" + file_object_key + "" + tmp_dl_file_key)

        with open(tmp_dl_file_key, "rb") as f:
            gpg.encrypt_file(f, recipients=[EMAIL_ADDRESS], armor=False, always_trust=True, extra_args=["--yes"], output=tmp_encrypted_file_key)
            logger.info("Encrypt success!")
            
        s3_put_file(s3_resource, PUT_BUCKET_NAME, tmp_encrypted_file_key, put_key)
        delete_file(tmp_encrypted_file_key)
        delete_file(tmp_dl_file_key)

    except Exception as e:
        logger.error("Error:" + str(e))
        raise

解説

S3にファイルがPUTされた時に、Lambdaが駆動するので、eventパラメータより、バケット名とPUTされたファイルパスを受け取っています。

get_bucket_name = event['Records'][0]['s3']['bucket']['name']
file_object_key = event['Records'][0]['s3']['object']['key']

環境変数に値を設定している値を読み込みます。
SECRET_NAMEはSecretsManagerに登録しているシークレット名
SECRET_REGIONはSecretsManagerに登録しているシークレットのリージョン名
EMAIL_ADDRESSはPGP暗号化鍵生成時のメールアドレス
PUT_BUCKET_NAMEは、暗号化ファイル作成後PUTするバケット名

SECRET_NAME     = os.environ["SECRET_NAME"]
SECRET_REGION   = os.environ["SECRET_REGION"]
EMAIL_ADDRESS   = os.environ["EMAIL_ADDRESS"]
PUT_BUCKET_NAME = os.environ["PUT_BUCKET_NAME"]

PGP暗号化する際に必要なオブジェクトを生成

gpg = gnupg.GPG(gnupghome="/tmp", gpgbinary="./gpg")

SecretsManagerに登録していた鍵情報を取得

key_data = get_secret(SECRET_NAME, SECRET_REGION)

鍵情報をインポート

gpg.import_keys(key_data)

S3から対象のファイルをLambdaの一時領域にダウンロード

bucket = s3_resource.Bucket(get_bucket_name)
bucket.download_file(file_object_key, tmp_dl_file_key)  

暗号化したいファイルを開きPGP暗号化

armor=False

暗号化ファイルをバイナリで出力する場合は、armor=Falseとする必要がある。デフォルトはarmor=Trueである。今回はバイナリファイルで出力することにする。

always_trust=True, extra_args=["--yes"]

これがないとエラーが出でる。信頼されてない鍵だけどいいの?のようなメッセージ返ってくるので、このメッセージを出さないようにするために引数で与える必要がある。

with open(tmp_dl_file_key, "rb") as f:
    gpg.encrypt_file(f, recipients=[EMAIL_ADDRESS], armor=False, always_trust=True, extra_args=["--yes"], output=tmp_encrypted_file_key)

S3に暗号化したファイルをPUT

s3_put_file(s3_resource, PUT_BUCKET_NAME, tmp_encrypted_file_key, put_key)

一時領域に置いたファイルを削除。(Lambdaを連続起動した場合に前回のデータが残ってしまう場合があるのでそれを回避するため)

delete_file(tmp_encrypted_file_key)
delete_file(tmp_dl_file_key)

まとめ

以上がAWS Lambdaでpython-gnupgでPGP暗号化を実装するサンプルコードです。
今回はS3ファイルPUT契機で動くようなコードにしましたが、定刻起動させたい場合はEventBridgeなどをご検討下さい。SecretsManagerへの鍵ファイル登録方法、Lambdaのロールの権限付与などは割愛しているので、別途調べてご自身で登録をお願いします。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0