Edited at

ClojureでKinesisとAmazon Lambdaを使う

More than 3 years have passed since last update.


書くこと

Clojureを使ってAWS Lambdaを使ったことを諸々と備忘録にします。


なぜClojureなのか

AWSのLambdaでは使えるのが


  • nodejs

  • Java8

  • Python

が使用できるようです。

今回はDBの書き込み処理があり、nodejsを使用しても良かったんですが、コールバック地獄も嫌だし、promissとかわざわざ使うのも嫌だったので、Java8を選択。

普段Vimで物を書いているので、Eclipse等のIDEの設定が面倒で、簡単にかける言語で行いたかったので、Clojureを選択しました。


KinesisからLambdaへ

fluentdで雑に送っておきます。


fluent.conf


type kinesis

stream_name **************

aws_key_id **************
aws_sec_key **************
region ap-northeast-1
partition_key status
debug true



Clojureのプロジェクトを作る

雑にawstestっていう名前でプロジェクトを作ります。Leiningenは入っているものとします。



$ lein new awstest

とりあえずは何も内容は書かないので、この状態でコンパイルできるはず。



$ lein uberjar


LambdaからClojureへ

AWS CLIから実行が可能です。別にWeb画面から作っても良いです。


aws_deploy.sh


aws lambda delete-function --function-name awstest;
aws lambda create-function --function-name awstest --handler awstest.core::handleRequest --runtime java8 --memory 512 --timeout 60 --role arn:aws:iam::********:role/*********** --zip-file fileb://./target/awstest-0.1.0-SNAPSHOT-standalone.jar --vpc-config SubnetIds=subnet-********,subnet-********,SecurityGroupIds=sg-********

'''

VPCのサポートがつい最近行われたため、コード内にはないのでそのうち追記します。 => した
また、Kinesisをソースにしていするコードもないため、後で追記します。
roll部分はAWSのIAMでロールを作ります。
必要なのは

* kinesisにアクセスするロール
* 今回はVPC内のRDSにアクセスするのでec2系のネットワークインターフェース系
です。

それぞれ

```json:kinesis.json

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"*"
/ ]
},
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}



vpc.json

{

"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt***********",
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DetachNetworkInterface"
],
"Resource": [
"*"
]
},
{
"Sid": "Stmt***********",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:*"
]
}
]
}


Clojureでのコードの書き方

先ほどのdeploy.shでは--handler awstest.core::handleRequestとしています。

awstest.core/-handleRequest関数がKinesisからデータがLambdaに流れたのちに呼び出されます。

また、送られてくるデータは下記のような形で送られてきます。


input.json



{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "200",
"sequenceNumber": "***********",
"data": "*******"
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:*******",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::************:role/*****************",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:************:stream/************"
}
]
}

"data": "*******"部分がKinesisから入力をしたデータの内容です。

Base64でエンコードされているので取り出すコードとしては下記のような形。


core.clj



(defn -handleRequest [this is os context]
(let
[
values (map #(json/read-str (String. (codec/decode (.getBytes (str (:data (:kinesis %1)))))) :key-fn keyword) records)
]
)
)


で取り出せます。