Help us understand the problem. What is going on with this article?

Lambda@javaでS3オブジェクトをEC2のDBにINSERTするまで:Java編[後編]

More than 1 year has passed since last update.

はじめに

本記事はAWS、java(ほぼ)初心者がLambda@javaでS3オブジェクトをEC2のDB(SQL Server)にINSERTするまでにやったことの備忘録となります。
初投稿三部作の最終盤です。
ここはこうしたほうがいい、などご指摘あればお願いします!!

1 : AWS編
2 : Java編[前編]
3 : Java編[後編] <-- 本編
3.5 : Java編[続]

前回はS3バケットにアップロードしたファイルの名前を取得できることを確認しました。
それでは、前回取得したファイル名をメールで送信してみましょう。

ちなみに、今回はJavaxのMailパッケージを使用してみました。

pom.xmlの編集

pom.xmlのdependenciesに新たに下の記述を追記します。

<dependency>
  <groupId>com.sun.mail</groupId>
  <artifactId>javax.mail</artifactId>
  <version>1.5.4</version>
</dependency>

メール送信

ReadS3ObjectクラスのlistingNamesメソッドを以下のように変更します。

ReadS3Object.java
@SuppressWarnings("deprecation")
public void listingNames(Context context )
{
    AmazonS3 client = new AmazonS3Client(
            new BasicAWSCredentials(
                    "<accessKey>",
                    "<secretKey>"));

    ListObjectsRequest request = new ListObjectsRequest()
            .withBucketName("test-bucket-yut0201");
    ObjectListing objectList = client.listObjects(request);

    // オブジェクト一覧を取得し、オブジェクト名をコンソールに出力
    List<S3ObjectSummary> objects = objectList.getObjectSummaries();
    //System.out.println("objectList:");
    //objects.forEach(object -> System.out.println(object.getKey()));

    List<String> objectNameList = new ArrayList<String>();
    objects.forEach(object -> objectNameList.add(object.getKey()));
    sendMailTest(objectNameList);
}

コンソールへの出力は不要になるのでコメントアウトし、代わりにString型のリストに格納しています。
sendMailTest()メソッドでメール送信を行うので、早速中身を実装していきましょう。

ReadS3Object.java
public void sendMailTest(List<String> objectNames) {
        final String fromAddr = "<fromMailAddress>";
        final String toAddr   = "<toMailaddress>";
        final String subject  = "test Mail Send";
        final String charset  = "UTF-8";
        String content  = "S3 object List : ";
        content += String.join(",", objectNames);
        final String encoding = "base64";

        final Properties properties = new Properties();

        // 基本情報。ここでは niftyへの接続例を示します。
        properties.setProperty("mail.smtp.host", "smtp.gmail.com");
        properties.setProperty("mail.smtp.port", "587");

        // タイムアウト設定
        properties.setProperty("mail.smtp.connectiontimeout", "60000");
        properties.setProperty("mail.smtp.timeout", "60000");

        // 認証
        properties.setProperty("mail.smtp.auth", "true");
        properties.setProperty("mail.smtp.starttls.enable", "true");

        final Session session = Session.getInstance(properties, new Authenticator() {
            protected PasswordAuthentication getPasswordAuthentication() {
                return new PasswordAuthentication("<fromMailAddress>", "<password>");
            }
        });

        try {
            MimeMessage message = new MimeMessage(session);

            // Set From:
            message.setFrom(new InternetAddress(fromAddr, "<userName>"));
            // Set ReplyTo:
            message.setReplyTo(new Address[]{new InternetAddress(fromAddr)});
            // Set To:
            message.setRecipient(Message.RecipientType.TO, new InternetAddress(toAddr));

            message.setSubject(subject, charset);
            message.setText(content, charset);

            message.setHeader("Content-Transfer-Encoding", encoding);

            Transport.send(message);

          } catch (MessagingException e) {
            throw new RuntimeException(e);
          } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
          }
    }

fromAddr, toAddr には任意の送信元、送信先のアドレスを設定してください。
今回はGmailのアドレスを使用しているため、SMTPサーバにはGmailのアドレスを設定しています。
content ではオブジェクトの数に応じて、カンマ区切りで文字列の後ろにオブジェクト名を連結しています。

タイムアウトは接続、送信ともに60000ミリ秒ですが、0~2147483647の範囲で任意の値を設定可能です。

また、Gmailでは第三者中継を防ぐためにSMTP認証を行う必要があるため、認証情報をpropertiesに追加しています。
PasswordAuthentication()の引数はGmailのアカウントのID(メールアドレス)とパスワードになります。

では、この状態で再度[実行]->[S3toLambda]によりjarへパッケージングし直します。

注意)S3toLambdaは前回Mavenビルド時に構成した実行構成のことであり、ゴールは"package shade:shade"となっています。

Lambdaにjarのアップロードが完了したら、[テスト]をクリックします。
宛先アドレスを持つGmailアカウントにログインし、受信トレイにメールが届いていることを確認できれば、成功です。

Lambdaロールのポリシー変更

Lambdaファンクションを実行するにあたって、実行結果の正常/異常終了をログで確認できるようにしたいので、
Lambdaの実行ロールである lambda_s3_exec_role に CloudWatch Logs の書き込み権限を与えておきましょう。

まず、IAMマネジメントコンソールに接続します。

画面左のナビゲーションバーから[ロール]を選択し、前回作成した lambda_s3_exec_role をクリックします。
AWSがデフォルトで用意しているポリシーもありますが、今回はインラインポリシーを作成してみます(カスタマイズできるので覚えると便利)。
画面右側の "インラインポリシーの作成" をクリックすると次のような画面が表示されます。

02_create_policy.png

[JSON]タブを選択して、次の記述をコピペします。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "logs:*",
            "Resource": "*"
        }
    ]
}

注意) "Version"に関してはAWS側の書式バージョンを示すものらしいので、変更せずそのままにします。

ちなみに、上の記述は「CloudWatch Logs の全てのリソース(log-group,log-streamなど)に対し、全てのアクション(Read/Write)を許可する」というフルアクセス状態です。
権限を制限したい場合は、色々編集して試してみてください。

ログの確認

ここまで来たら、ログを出力するためのコードを書いてみましょう。
前回作成したパッケージ配下に、新しいクラスを作成します。

DetectS3Event.java
package S3test.S3toLambda;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord;

public class DetectS3Event implements RequestHandler<S3Event, Object>
{
    @Override
    public Object handleRequest(S3Event event, Context context) {
        context.getLogger().log("Input: " + event);
        LambdaLogger lambdaLogger = context.getLogger();

        S3EventNotificationRecord record = event.getRecords().get(0);
        lambdaLogger.log(record.getEventName());                //イベント名
        lambdaLogger.log(record.getS3().getBucket().getName()); //バケット名
        lambdaLogger.log(record.getS3().getObject().getKey());  //オブジェクトのキー(オブジェクト名)

        return null;
    }
}

S3上の何らかのイベントを検知し、当該イベント名および対象バケット名、対象オブジェクト名を取得して
LambdaLoggerによりログとして出力してみます。

Mavanビルドでjarへパッケージングできたら、Lambdaへのアップロードを行いましょう。
ハンドラは適宜変更します。
[保存]をクリックした後、[テスト]の左のプルダウンリストからテストイベントの設定を行います。

"新しいテストイベントの作成" を選択した状態で、イベントテンプレートから "S3 Put" もしくは "S3 Delete" を選び、
任意のテストイベント名をつけて保存します。
03_S3_testEvent.png
この状態で[テスト]を実行し、実行結果が成功になったことを確認したら、ログを見てみましょう。
04_lambda_result.png
05_cloudwatch_logs_01.png
ログストリームにログが作成されていることが確認できるので、展開してみます。
06_cloudwatch_logs_02.png
これで、テストイベントの情報が無事に取得できることがわかりました。

イベントトリガーの設定

テストデータに関する情報のログへの出力を確認できたので、実際に "ファイルのアップロード" というイベントをトリガーにして
CloudWatch上にログを出力してみます。

今回はS3マネジメントコンソールから、イベントの設定を行います(Lambda側からの設定も可能)。

対象のバケットを選択し、[プロパティ]タブをクリックします。
[Events]ブロックにイベントが登録されていないことを確認し、[+ 通知の追加]で下記のイベントを作成しましょう。
07_S3_create_event.png

イベントログの確認

本記事では
・バケット内にサブフォルダを作成しない
・CSVファイルをアップロードする
という制約のため、プレフィックスは空白、サフィックスは".csv"としています。

イベントの設定ができたら、先ほどのhandleRequest(S3Event event, Context context)を編集します。

DetectS3Event.java
package S3test.S3toLambda;

// import 省略

public class DetectS3Event implements RequestHandler<S3Event, Object>
{
    @Override
    public Object handleRequest(S3Event event, Context context) {
        context.getLogger().log("Input: " + event);
        LambdaLogger lambdaLogger = context.getLogger();

        S3EventNotificationRecord record = event.getRecords().get(0);

        String bucketName = record.getS3().getBucket().getName();
        String key = record.getS3().getObject().getKey();

        try {
            @SuppressWarnings("deprecation")
            AmazonS3 client = new AmazonS3Client(
            new BasicAWSCredentials("<accessKey>","<secretKey>"));

            GetObjectRequest request = new GetObjectRequest(bucketName, key);
            S3Object object = client.getObject(request);

            BufferedInputStream bis = new BufferedInputStream(object.getObjectContent());
            BufferedReader br = new BufferedReader(new InputStreamReader(bis));

            String line = "";
            while ((line = br.readLine()) != null) {
                String[] data = line.split(",", 0); // 行をカンマ区切りで配列に変換

                for (String elem : data) {
                    System.out.println(elem);
                }
            }
            br.close();
        } catch (IOException e) {
            System.out.println(e);
        }
        return null;
    }
}

試しに、上記のソースの状態でパッケージングしたjarをLambdaにアップロードした状態で、
次のような、あらかじめ用意しておいたCSVをアップロードします。
08_create_csv.png

アップロード。
09_csv_upload.png

すると、このアップロードをトリガーに、
・S3がLambdaファンクションをキック -> Lambdaファンクションが実行ロールにしたがってCloudWatch Logsへ実行結果を書き込む
という流れが生まれます。

全てのログが出力されるまで、最大数分程度のタイムラグがあるようです(今回は正確に検証しておりません)が、CloudWatchのログをみてみます。
10_logs.png
ログの6行目〜23行目までの間に、アップロードしたCSVがカンマ区切りで1行ずつ出力されているのが分かりました。

最後に、再び上のCSVを使って、EC2上のDB(SQL Server)にINSERTしてみます。

データベースおよびユーザの作成

注意) SQL Server Express Edition はインストール済みのものとします。
公式を参照すればインストールできます。

まず、次のコマンドで作成済みのEC2インスタンスに接続しましょう。

ssh -i "<keyPair>" <userName>@<IPaddress>

キーペアはEC2インスタンス作成時に、同時に作成されたものを利用します。既存のキーペアを使用している場合はそちらでも問題ありません。
IPアドレスはEC2インスタンスに割り当てたElastic IPになります。接続コマンド自体は、EC2マネジメントコンソールにある[接続]ボタンで確認できます。

インスタンスに接続できたら、SQL Server に接続します。

sqlcmd -S localhost -U SA

プロンプトが 1> のような表記に変わったらログイン成功です。
次に、データベースと管理ユーザ、テーブルを作成しましょう。

データベースの作成

以下のコマンドを実行します。
今回は SQL Operations Studio を使用しています。

USE master
GO
IF NOT EXISTS (
   SELECT name
   FROM sys.databases
   WHERE name = N'LambdaTestDB'
)
CREATE DATABASE [LambdaTestDB]
GO

ALTER DATABASE [LambdaTestDB] SET QUERY_STORE=ON
GO

正常にDBが作成されたことを確認します。

select name, dbid, mode, status from sysdatabases;
go
name                                                                                                                             dbid   mode   status     
-------------------------------------------------------------------------------------------------------------------------------- ------ ------ -----------
master                                                                                                                                1      0       65544
tempdb                                                                                                                                2      0       65544
model                                                                                                                                 3      0       65536
msdb                                                                                                                                  4      0       65544
LambdaTestDB                                                                                                                          5      0       65537

(5 rows affected)

続いて、作成したLambdaTestDB内にテーブルを作成します。

USE LambdaTestDB
GO
CREATE TABLE employee (
  emp_id INT NOT NULL PRIMARY KEY,
  emp_name VARCHAR(20) NOT NULL,
  age    INT)
GO

注意) エラーが表示された場合は適宜トラブルシューティングしてください。

テーブルが正常に作成されたことを確認します。

select name, object_id, type_desc from sys.objects where type = 'U'
go
name                                                                                    object_id   type_desc                                                   
--------------------------------------------------------------------------------------- ----------- ------------------------------------------------------------
employee                                                                                  885578193 USER_TABLE                                                  

(1 rows affected)

上記のような結果になればテーブルの作成は成功です。

INSERT対象となるテーブルの作成が完了したので、ソースの方も併せて再編集します。

まとめ!?

本記事は、3部構成の3作目として執筆しておりましたが、当初、構成などを特に考えていなかったため、3作目のボリュームが大変なことに。。。(謝罪)
という訳で、3.5作目として、また後ほど続きを公開させていただきます。

よろしくお願いします。

yut0201
毎月お茶が届く会員です。モルモットを2匹飼育中。 情報発信はAWSメインです。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away