Help us understand the problem. What is going on with this article?

ClojureでKinesisとAmazon Lambdaを使う

More than 3 years have passed since last update.

書くこと

Clojureを使ってAWS Lambdaを使ったことを諸々と備忘録にします。

なぜClojureなのか

AWSのLambdaでは使えるのが

  • nodejs
  • Java8
  • Python

が使用できるようです。
今回はDBの書き込み処理があり、nodejsを使用しても良かったんですが、コールバック地獄も嫌だし、promissとかわざわざ使うのも嫌だったので、Java8を選択。
普段Vimで物を書いているので、Eclipse等のIDEの設定が面倒で、簡単にかける言語で行いたかったので、Clojureを選択しました。

KinesisからLambdaへ

fluentdで雑に送っておきます。

fluent.conf
type kinesis

stream_name **************

aws_key_id  **************
aws_sec_key **************
region ap-northeast-1
partition_key status
debug true

Clojureのプロジェクトを作る

雑にawstestっていう名前でプロジェクトを作ります。Leiningenは入っているものとします。

$ lein new awstest

とりあえずは何も内容は書かないので、この状態でコンパイルできるはず。

$ lein uberjar

LambdaからClojureへ

AWS CLIから実行が可能です。別にWeb画面から作っても良いです。

aws_deploy.sh
aws lambda delete-function --function-name awstest;
aws lambda create-function --function-name awstest --handler awstest.core::handleRequest --runtime java8 --memory 512 --timeout 60 --role arn:aws:iam::********:role/*********** --zip-file fileb://./target/awstest-0.1.0-SNAPSHOT-standalone.jar --vpc-config SubnetIds=subnet-********,subnet-********,SecurityGroupIds=sg-********

'''

VPCのサポートがつい最近行われたため、コード内にはないのでそのうち追記します。 => した
また、Kinesisをソースにしていするコードもないため、後で追記します。
roll部分はAWSのIAMでロールを作ります。
必要なのは

* kinesisにアクセスするロール
* 今回はVPC内のRDSにアクセスするのでec2系のネットワークインターフェース系
です。

それぞれ

```json:kinesis.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": [
        "*"
   /   ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}
vpc.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt***********",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DetachNetworkInterface"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "Stmt***********",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        }
    ]
}

Clojureでのコードの書き方

先ほどのdeploy.shでは--handler awstest.core::handleRequestとしています。
awstest.core/-handleRequest関数がKinesisからデータがLambdaに流れたのちに呼び出されます。
また、送られてくるデータは下記のような形で送られてきます。

input.json

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "200",
                "sequenceNumber": "***********",
                "data": "*******"
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:*******",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::************:role/*****************",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:************:stream/************"
        }
    ]
}

"data": "*******"部分がKinesisから入力をしたデータの内容です。
Base64でエンコードされているので取り出すコードとしては下記のような形。

core.clj

(defn -handleRequest [this is os context]
  (let
    [
      values (map #(json/read-str (String. (codec/decode (.getBytes (str (:data (:kinesis %1)))))) :key-fn keyword) records)
    ]
  )
)

で取り出せます。

Kakuni
PerlとTypescriptとLua書いてる
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした