書くこと
Clojureを使ってAWS Lambdaを使ったことを諸々と備忘録にします。
なぜClojureなのか
AWSのLambdaでは使えるのが
- nodejs
- Java8
- Python
が使用できるようです。
今回はDBの書き込み処理があり、nodejsを使用しても良かったんですが、コールバック地獄も嫌だし、promissとかわざわざ使うのも嫌だったので、Java8を選択。
普段Vimで物を書いているので、Eclipse等のIDEの設定が面倒で、簡単にかける言語で行いたかったので、Clojureを選択しました。
KinesisからLambdaへ
fluentdで雑に送っておきます。
fluent.conf
type kinesis
stream_name **************
aws_key_id **************
aws_sec_key **************
region ap-northeast-1
partition_key status
debug true
Clojureのプロジェクトを作る
雑にawstestっていう名前でプロジェクトを作ります。Leiningenは入っているものとします。
$ lein new awstest
とりあえずは何も内容は書かないので、この状態でコンパイルできるはず。
$ lein uberjar
LambdaからClojureへ
AWS CLIから実行が可能です。別にWeb画面から作っても良いです。
aws_deploy.sh
aws lambda delete-function --function-name awstest;
aws lambda create-function --function-name awstest --handler awstest.core::handleRequest --runtime java8 --memory 512 --timeout 60 --role arn:aws:iam::********:role/*********** --zip-file fileb://./target/awstest-0.1.0-SNAPSHOT-standalone.jar --vpc-config SubnetIds=subnet-********,subnet-********,SecurityGroupIds=sg-********
'''
VPCのサポートがつい最近行われたため、コード内にはないのでそのうち追記します。 => した
また、Kinesisをソースにしていするコードもないため、後で追記します。
roll部分はAWSのIAMでロールを作ります。
必要なのは
* kinesisにアクセスするロール
* 今回はVPC内のRDSにアクセスするのでec2系のネットワークインターフェース系
です。
それぞれ
```json:kinesis.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"*"
/ ]
},
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
vpc.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt***********",
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DetachNetworkInterface"
],
"Resource": [
"*"
]
},
{
"Sid": "Stmt***********",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:*"
]
}
]
}
Clojureでのコードの書き方
先ほどのdeploy.shでは--handler awstest.core::handleRequestとしています。
awstest.core/-handleRequest関数がKinesisからデータがLambdaに流れたのちに呼び出されます。
また、送られてくるデータは下記のような形で送られてきます。
input.json
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "200",
"sequenceNumber": "***********",
"data": "*******"
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:*******",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::************:role/*****************",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:************:stream/************"
}
]
}
"data": "*******"部分がKinesisから入力をしたデータの内容です。
Base64でエンコードされているので取り出すコードとしては下記のような形。
core.clj
(defn -handleRequest [this is os context]
(let
[
values (map #(json/read-str (String. (codec/decode (.getBytes (str (:data (:kinesis %1)))))) :key-fn keyword) records)
]
)
)
で取り出せます。