Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
3
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

ClojureでKinesisとAmazon Lambdaを使う

書くこと

Clojureを使ってAWS Lambdaを使ったことを諸々と備忘録にします。

なぜClojureなのか

AWSのLambdaでは使えるのが

  • nodejs
  • Java8
  • Python

が使用できるようです。
今回はDBの書き込み処理があり、nodejsを使用しても良かったんですが、コールバック地獄も嫌だし、promissとかわざわざ使うのも嫌だったので、Java8を選択。
普段Vimで物を書いているので、Eclipse等のIDEの設定が面倒で、簡単にかける言語で行いたかったので、Clojureを選択しました。

KinesisからLambdaへ

fluentdで雑に送っておきます。

fluent.conf

type kinesis

stream_name **************

aws_key_id  **************
aws_sec_key **************
region ap-northeast-1
partition_key status
debug true

Clojureのプロジェクトを作る

雑にawstestっていう名前でプロジェクトを作ります。Leiningenは入っているものとします。

$ lein new awstest

とりあえずは何も内容は書かないので、この状態でコンパイルできるはず。

$ lein uberjar

LambdaからClojureへ

AWS CLIから実行が可能です。別にWeb画面から作っても良いです。

aws_deploy.sh

aws lambda delete-function --function-name awstest;
aws lambda create-function --function-name awstest --handler awstest.core::handleRequest --runtime java8 --memory 512 --timeout 60 --role arn:aws:iam::********:role/*********** --zip-file fileb://./target/awstest-0.1.0-SNAPSHOT-standalone.jar --vpc-config SubnetIds=subnet-********,subnet-********,SecurityGroupIds=sg-********

'''

VPCのサポートがつい最近行われたため、コード内にはないのでそのうち追記します。 => した
また、Kinesisをソースにしていするコードもないため、後で追記します。
roll部分はAWSのIAMでロールを作ります。
必要なのは

* kinesisにアクセスするロール
* 今回はVPC内のRDSにアクセスするのでec2系のネットワークインターフェース系
です。

それぞれ

```json:kinesis.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": [
        "*"
   /   ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}
vpc.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt***********",
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DetachNetworkInterface"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "Stmt***********",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        }
    ]
}

Clojureでのコードの書き方

先ほどのdeploy.shでは--handler awstest.core::handleRequestとしています。
awstest.core/-handleRequest関数がKinesisからデータがLambdaに流れたのちに呼び出されます。
また、送られてくるデータは下記のような形で送られてきます。

input.json

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "200",
                "sequenceNumber": "***********",
                "data": "*******"
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:*******",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::************:role/*****************",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:************:stream/************"
        }
    ]
}

"data": "*******"部分がKinesisから入力をしたデータの内容です。
Base64でエンコードされているので取り出すコードとしては下記のような形。

core.clj

(defn -handleRequest [this is os context]
  (let
    [
      values (map #(json/read-str (String. (codec/decode (.getBytes (str (:data (:kinesis %1)))))) :key-fn keyword) records)
    ]
  )
)

で取り出せます。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
3
Help us understand the problem. What are the problem?