LoginSignup
3
3

More than 5 years have passed since last update.

ClojureでKinesisとAmazon Lambdaを使う

Last updated at Posted at 2016-02-27

書くこと

Clojureを使ってAWS Lambdaを使ったことを諸々と備忘録にします。

なぜClojureなのか

AWSのLambdaでは使えるのが

  • nodejs
  • Java8
  • Python

が使用できるようです。
今回はDBの書き込み処理があり、nodejsを使用しても良かったんですが、コールバック地獄も嫌だし、promissとかわざわざ使うのも嫌だったので、Java8を選択。
普段Vimで物を書いているので、Eclipse等のIDEの設定が面倒で、簡単にかける言語で行いたかったので、Clojureを選択しました。

KinesisからLambdaへ

fluentdで雑に送っておきます。

fluent.conf

type kinesis

stream_name **************

aws_key_id  **************
aws_sec_key **************
region ap-northeast-1
partition_key status
debug true

Clojureのプロジェクトを作る

雑にawstestっていう名前でプロジェクトを作ります。Leiningenは入っているものとします。

$ lein new awstest

とりあえずは何も内容は書かないので、この状態でコンパイルできるはず。

$ lein uberjar

LambdaからClojureへ

AWS CLIから実行が可能です。別にWeb画面から作っても良いです。

aws_deploy.sh

aws lambda delete-function --function-name awstest;
aws lambda create-function --function-name awstest --handler awstest.core::handleRequest --runtime java8 --memory 512 --timeout 60 --role arn:aws:iam::********:role/*********** --zip-file fileb://./target/awstest-0.1.0-SNAPSHOT-standalone.jar --vpc-config SubnetIds=subnet-********,subnet-********,SecurityGroupIds=sg-********

'''

VPCのサポートがつい最近行われたため、コード内にはないのでそのうち追記します。 => した
また、Kinesisをソースにしていするコードもないため、後で追記します。
roll部分はAWSのIAMでロールを作ります。
必要なのは

* kinesisにアクセスするロール
* 今回はVPC内のRDSにアクセスするのでec2系のネットワークインターフェース系
です。

それぞれ

```json:kinesis.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": [
        "*"
   /   ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}
vpc.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt***********",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DetachNetworkInterface"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "Stmt***********",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        }
    ]
}

Clojureでのコードの書き方

先ほどのdeploy.shでは--handler awstest.core::handleRequestとしています。
awstest.core/-handleRequest関数がKinesisからデータがLambdaに流れたのちに呼び出されます。
また、送られてくるデータは下記のような形で送られてきます。

input.json

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "200",
                "sequenceNumber": "***********",
                "data": "*******"
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:*******",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::************:role/*****************",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:************:stream/************"
        }
    ]
}

"data": "*******"部分がKinesisから入力をしたデータの内容です。
Base64でエンコードされているので取り出すコードとしては下記のような形。

core.clj

(defn -handleRequest [this is os context]
  (let
    [
      values (map #(json/read-str (String. (codec/decode (.getBytes (str (:data (:kinesis %1)))))) :key-fn keyword) records)
    ]
  )
)

で取り出せます。

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3