Edited at

Lambda@javaでS3オブジェクトをEC2のDBにINSERTするまで:Java編[後編]

More than 1 year has passed since last update.


はじめに

本記事はAWS、java(ほぼ)初心者がLambda@javaでS3オブジェクトをEC2のDB(SQL Server)にINSERTするまでにやったことの備忘録となります。

初投稿三部作の最終盤です。

ここはこうしたほうがいい、などご指摘あればお願いします!!

1 : AWS編

2 : Java編[前編]

3 : Java編[後編] <-- 本編

3.5 : Java編[続]

前回はS3バケットにアップロードしたファイルの名前を取得できることを確認しました。

それでは、前回取得したファイル名をメールで送信してみましょう。

ちなみに、今回はJavaxのMailパッケージを使用してみました。


pom.xmlの編集

pom.xmlのdependenciesに新たに下の記述を追記します。

<dependency>

<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.5.4</version>
</dependency>


メール送信

ReadS3ObjectクラスのlistingNamesメソッドを以下のように変更します。


ReadS3Object.java

@SuppressWarnings("deprecation")

public void listingNames(Context context )
{
AmazonS3 client = new AmazonS3Client(
new BasicAWSCredentials(
"<accessKey>",
"<secretKey>"));

ListObjectsRequest request = new ListObjectsRequest()
.withBucketName("test-bucket-yut0201");
ObjectListing objectList = client.listObjects(request);

// オブジェクト一覧を取得し、オブジェクト名をコンソールに出力
List<S3ObjectSummary> objects = objectList.getObjectSummaries();
//System.out.println("objectList:");
//objects.forEach(object -> System.out.println(object.getKey()));

List<String> objectNameList = new ArrayList<String>();
objects.forEach(object -> objectNameList.add(object.getKey()));
sendMailTest(objectNameList);
}


コンソールへの出力は不要になるのでコメントアウトし、代わりにString型のリストに格納しています。

sendMailTest()メソッドでメール送信を行うので、早速中身を実装していきましょう。


ReadS3Object.java

public void sendMailTest(List<String> objectNames) {

final String fromAddr = "<fromMailAddress>";
final String toAddr = "<toMailaddress>";
final String subject = "test Mail Send";
final String charset = "UTF-8";
String content = "S3 object List : ";
content += String.join(",", objectNames);
final String encoding = "base64";

final Properties properties = new Properties();

// 基本情報。ここでは niftyへの接続例を示します。
properties.setProperty("mail.smtp.host", "smtp.gmail.com");
properties.setProperty("mail.smtp.port", "587");

// タイムアウト設定
properties.setProperty("mail.smtp.connectiontimeout", "60000");
properties.setProperty("mail.smtp.timeout", "60000");

// 認証
properties.setProperty("mail.smtp.auth", "true");
properties.setProperty("mail.smtp.starttls.enable", "true");

final Session session = Session.getInstance(properties, new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication("<fromMailAddress>", "<password>");
}
});

try {
MimeMessage message = new MimeMessage(session);

// Set From:
message.setFrom(new InternetAddress(fromAddr, "<userName>"));
// Set ReplyTo:
message.setReplyTo(new Address[]{new InternetAddress(fromAddr)});
// Set To:
message.setRecipient(Message.RecipientType.TO, new InternetAddress(toAddr));

message.setSubject(subject, charset);
message.setText(content, charset);

message.setHeader("Content-Transfer-Encoding", encoding);

Transport.send(message);

} catch (MessagingException e) {
throw new RuntimeException(e);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}


fromAddr, toAddr には任意の送信元、送信先のアドレスを設定してください。

今回はGmailのアドレスを使用しているため、SMTPサーバにはGmailのアドレスを設定しています。

content ではオブジェクトの数に応じて、カンマ区切りで文字列の後ろにオブジェクト名を連結しています。

タイムアウトは接続、送信ともに60000ミリ秒ですが、0~2147483647の範囲で任意の値を設定可能です。

また、Gmailでは第三者中継を防ぐためにSMTP認証を行う必要があるため、認証情報をpropertiesに追加しています。

PasswordAuthentication()の引数はGmailのアカウントのID(メールアドレス)とパスワードになります。

では、この状態で再度[実行]->[S3toLambda]によりjarへパッケージングし直します。

注意)S3toLambdaは前回Mavenビルド時に構成した実行構成のことであり、ゴールは"package shade:shade"となっています。

Lambdaにjarのアップロードが完了したら、[テスト]をクリックします。

宛先アドレスを持つGmailアカウントにログインし、受信トレイにメールが届いていることを確認できれば、成功です。


Lambdaロールのポリシー変更

Lambdaファンクションを実行するにあたって、実行結果の正常/異常終了をログで確認できるようにしたいので、

Lambdaの実行ロールである lambda_s3_exec_role に CloudWatch Logs の書き込み権限を与えておきましょう。

まず、IAMマネジメントコンソールに接続します。

画面左のナビゲーションバーから[ロール]を選択し、前回作成した lambda_s3_exec_role をクリックします。

AWSがデフォルトで用意しているポリシーもありますが、今回はインラインポリシーを作成してみます(カスタマイズできるので覚えると便利)。

画面右側の "インラインポリシーの作成" をクリックすると次のような画面が表示されます。

02_create_policy.png

[JSON]タブを選択して、次の記述をコピペします。

{

"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "logs:*",
"Resource": "*"
}
]
}

注意) "Version"に関してはAWS側の書式バージョンを示すものらしいので、変更せずそのままにします。

ちなみに、上の記述は「CloudWatch Logs の全てのリソース(log-group,log-streamなど)に対し、全てのアクション(Read/Write)を許可する」というフルアクセス状態です。

権限を制限したい場合は、色々編集して試してみてください。


ログの確認

ここまで来たら、ログを出力するためのコードを書いてみましょう。

前回作成したパッケージ配下に、新しいクラスを作成します。


DetectS3Event.java

package S3test.S3toLambda;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.event.S3EventNotification.S3EventNotificationRecord;

public class DetectS3Event implements RequestHandler<S3Event, Object>
{
@Override
public Object handleRequest(S3Event event, Context context) {
context.getLogger().log("Input: " + event);
LambdaLogger lambdaLogger = context.getLogger();

S3EventNotificationRecord record = event.getRecords().get(0);
lambdaLogger.log(record.getEventName()); //イベント名
lambdaLogger.log(record.getS3().getBucket().getName()); //バケット名
lambdaLogger.log(record.getS3().getObject().getKey()); //オブジェクトのキー(オブジェクト名)

return null;
}
}


S3上の何らかのイベントを検知し、当該イベント名および対象バケット名、対象オブジェクト名を取得して

LambdaLoggerによりログとして出力してみます。

Mavanビルドでjarへパッケージングできたら、Lambdaへのアップロードを行いましょう。

ハンドラは適宜変更します。

[保存]をクリックした後、[テスト]の左のプルダウンリストからテストイベントの設定を行います。

"新しいテストイベントの作成" を選択した状態で、イベントテンプレートから "S3 Put" もしくは "S3 Delete" を選び、

任意のテストイベント名をつけて保存します。

03_S3_testEvent.png

この状態で[テスト]を実行し、実行結果が成功になったことを確認したら、ログを見てみましょう。

04_lambda_result.png

05_cloudwatch_logs_01.png

ログストリームにログが作成されていることが確認できるので、展開してみます。

06_cloudwatch_logs_02.png

これで、テストイベントの情報が無事に取得できることがわかりました。


イベントトリガーの設定

テストデータに関する情報のログへの出力を確認できたので、実際に "ファイルのアップロード" というイベントをトリガーにして

CloudWatch上にログを出力してみます。

今回はS3マネジメントコンソールから、イベントの設定を行います(Lambda側からの設定も可能)。

対象のバケットを選択し、[プロパティ]タブをクリックします。

[Events]ブロックにイベントが登録されていないことを確認し、[+ 通知の追加]で下記のイベントを作成しましょう。

07_S3_create_event.png


イベントログの確認

本記事では

・バケット内にサブフォルダを作成しない

・CSVファイルをアップロードする

という制約のため、プレフィックスは空白、サフィックスは".csv"としています。

イベントの設定ができたら、先ほどのhandleRequest(S3Event event, Context context)を編集します。


DetectS3Event.java

package S3test.S3toLambda;

// import 省略

public class DetectS3Event implements RequestHandler<S3Event, Object>
{
@Override
public Object handleRequest(S3Event event, Context context) {
context.getLogger().log("Input: " + event);
LambdaLogger lambdaLogger = context.getLogger();

S3EventNotificationRecord record = event.getRecords().get(0);

String bucketName = record.getS3().getBucket().getName();
String key = record.getS3().getObject().getKey();

try {
@SuppressWarnings("deprecation")
AmazonS3 client = new AmazonS3Client(
new BasicAWSCredentials("<accessKey>","<secretKey>"));

GetObjectRequest request = new GetObjectRequest(bucketName, key);
S3Object object = client.getObject(request);

BufferedInputStream bis = new BufferedInputStream(object.getObjectContent());
BufferedReader br = new BufferedReader(new InputStreamReader(bis));

String line = "";
while ((line = br.readLine()) != null) {
String[] data = line.split(",", 0); // 行をカンマ区切りで配列に変換

for (String elem : data) {
System.out.println(elem);
}
}
br.close();
} catch (IOException e) {
System.out.println(e);
}
return null;
}
}


試しに、上記のソースの状態でパッケージングしたjarをLambdaにアップロードした状態で、

次のような、あらかじめ用意しておいたCSVをアップロードします。

08_create_csv.png

アップロード。

09_csv_upload.png

すると、このアップロードをトリガーに、

・S3がLambdaファンクションをキック -> Lambdaファンクションが実行ロールにしたがってCloudWatch Logsへ実行結果を書き込む

という流れが生まれます。

全てのログが出力されるまで、最大数分程度のタイムラグがあるようです(今回は正確に検証しておりません)が、CloudWatchのログをみてみます。

10_logs.png

ログの6行目〜23行目までの間に、アップロードしたCSVがカンマ区切りで1行ずつ出力されているのが分かりました。

最後に、再び上のCSVを使って、EC2上のDB(SQL Server)にINSERTしてみます。


データベースおよびユーザの作成

注意) SQL Server Express Edition はインストール済みのものとします。

公式を参照すればインストールできます。

まず、次のコマンドで作成済みのEC2インスタンスに接続しましょう。

ssh -i "<keyPair>" <userName>@<IPaddress>

キーペアはEC2インスタンス作成時に、同時に作成されたものを利用します。既存のキーペアを使用している場合はそちらでも問題ありません。

IPアドレスはEC2インスタンスに割り当てたElastic IPになります。接続コマンド自体は、EC2マネジメントコンソールにある[接続]ボタンで確認できます。

インスタンスに接続できたら、SQL Server に接続します。

sqlcmd -S localhost -U SA

プロンプトが 1> のような表記に変わったらログイン成功です。

次に、データベースと管理ユーザ、テーブルを作成しましょう。


データベースの作成

以下のコマンドを実行します。

今回は SQL Operations Studio を使用しています。

USE master

GO
IF NOT EXISTS (
SELECT name
FROM sys.databases
WHERE name = N'LambdaTestDB'
)
CREATE DATABASE [LambdaTestDB]
GO

ALTER DATABASE [LambdaTestDB] SET QUERY_STORE=ON
GO

正常にDBが作成されたことを確認します。

select name, dbid, mode, status from sysdatabases;

go
name dbid mode status
-------------------------------------------------------------------------------------------------------------------------------- ------ ------ -----------
master 1 0 65544
tempdb 2 0 65544
model 3 0 65536
msdb 4 0 65544
LambdaTestDB 5 0 65537

(5 rows affected)

続いて、作成したLambdaTestDB内にテーブルを作成します。

USE LambdaTestDB

GO
CREATE TABLE employee (
emp_id INT NOT NULL PRIMARY KEY,
emp_name VARCHAR(20) NOT NULL,
age INT)
GO

注意) エラーが表示された場合は適宜トラブルシューティングしてください。

テーブルが正常に作成されたことを確認します。

select name, object_id, type_desc from sys.objects where type = 'U'

go
name object_id type_desc
--------------------------------------------------------------------------------------- ----------- ------------------------------------------------------------
employee 885578193 USER_TABLE

(1 rows affected)

上記のような結果になればテーブルの作成は成功です。

INSERT対象となるテーブルの作成が完了したので、ソースの方も併せて再編集します。


まとめ!?

本記事は、3部構成の3作目として執筆しておりましたが、当初、構成などを特に考えていなかったため、3作目のボリュームが大変なことに。。。(謝罪)

という訳で、3.5作目として、また後ほど続きを公開させていただきます。

よろしくお願いします。