10
6

More than 5 years have passed since last update.

Serverless Framework で Kinesis のプロデューサーを作る

Last updated at Posted at 2016-11-04

Serverless Frameworkで Kinesis を使います。
プロデューサー側を作るのがメインで、コンシューマー側は簡単なログ出力のみです。

Serverless Framework で DynamoDB を使う」を参考にすれば、DynamoDB へ反映するコンシューマーを作れると思います。

イベントソーシングをどう使っていくかは、サービス特性を考える必要がありますが、イベントソーシングを使うっと決まったときにすぐに構築できるように準備をしておきます。

ソースリポジトリ

ソースは以下のリポジトリを参照してください。

バージョン

version
node 6.9.1
npm 3.10.8
serverless framework 1.1.0

Lambda が Node.js V4.3 なので、合わせるべきかもしれません。今はローカルでは動かしていないので合わしていません。

Kinesis プロデューサー

todos.js

Serverless Framework で DynamoDB を使う」の内容を流用しています。

create / update / delete すべて、Kinesis Stream へ putRecord しているのみです。

Kinesis の Stream名にステージを prefix として追加しています。やり方は他にもあるかもしれませんが、各ステージごとに Kinesis Stream を作る仕組みを盛り込まないと実際に利用するときに困ると思います。今回は、ステージを prefix として Stream名につけることで対応しました。

'use strict';

const uuid = require('uuid'),
      moment = require('moment'),
      streamName = `${process.env.STAGE}-todos`;

module.exports.create = (stream, item, callback) => {
  item.id = uuid.v1();
  item.updatedUtc = moment().utc().toISOString();

  const params = {
    StreamName: streamName,
    PartitionKey: uuid.v1(),
    Data: JSON.stringify({ method: 'CREATE', item: item })
  };

  return stream.putRecord(params, (err, data) => {
    if (err) {
      callback(err);
    } else {
      callback(err, item);
    }
  });
};

module.exports.update = (stream, id, item, callback) => {
  item.id = id;
  item.updatedAt = moment().utc().toISOString();

  const params = {
    StreamName: streamName,
    PartitionKey: uuid.v1(),
    Data: JSON.stringify({ method: 'UPDATE', item: item })
  };

  return stream.putRecord(params, (err, data) => {
    if (err) {
      callback(err);
    } else {
      callback(err, item);
    }
  });
};

module.exports.delete = (stream, id, callback) => {
  const item = { id: id };

  const params = {
    StreamName: streamName,
    PartitionKey: uuid.v1(),
    Data: JSON.stringify({ method: 'DELETE', item: item })
  };

  return stream.putRecord(params, (err, data) => {
    if (err) {
      callback(err);
    } else {
      callback(err, item);
    }
  });
};

handler.js

Lambda Proxy に対応した返却を行っています。context ではなく callback  を使っています。しっかりした情報を得ることはできませんでしたが、callback を使うのが良いのではないかと思います。

DynamoDB を利用したときとほぼ一緒です。違いは、AWS.DynamoDB.DocumentClient のかわりに AWS.Kinesis を使ってる点です。

'use strict';

const AWS = require('aws-sdk'),
      kinesis = new AWS.Kinesis(),
      env = require('dotenv').config(),
      todos = require('./todos.js');

const createResponse = (statusCode, body) => (
  {
    statusCode,
    headers: {
      'Access-Control-Allow-Origin': '*', // Required for CORS support to work
    },
    body: JSON.stringify(body),
  }
);

module.exports.todosCreate = (event, context, callback) => {
  const item = JSON.parse(event.body);

  todos.create(kinesis, item, (err, result) => {
    if (err) {
      callback(createResponse(500, { message: err.message }));
    } else {
      callback(null, createResponse(201, result));
    }
  });
};

module.exports.todosUpdate = (event, context, callback) => {
  const id = event.pathParameters.id,
        item = JSON.parse(event.body);

  todos.update(kinesis, id, item, (err, result) => {
    if (err) {
      callback(createResponse(500, { message: err.message }));
    } else {
      callback(null, createResponse(200, result));
    }
  });
};

module.exports.todosDelete = (event, context, callback) => {
  const id = event.pathParameters.id;

  todos.delete(kinesis, id, (err, result) => {
    if (err) {
      callback(createResponse(500, { message: err.message }));
    } else {
      callback(null, createResponse(204));
    }
  });
};

serverless.yml

iamRoleStatementsResources を使って Kinesis へアクセスする権限と Stream 作成を行っています。

この設定で作られた Kinesis Stream は DeletionPolicy: Retain を指定しているので、sls remove を実行しても削除されません。
不要な場合、AWS Console 等を使って別途削除する必要があります。

service: sls-kinesis

provider:
  name: aws
  runtime: nodejs4.3
  stage: ${opt:stage, self:custom.defaultStage}
  region: ${opt:region, self:custom.defaultRegion}
  profile: ${self:custom.profiles.${self:provider.stage}}

  iamRoleStatements:
    - Effect: Allow
      Action:
        - kinesis:DescribeStream
        - kinesis:PutRecord
        - kinesis:ListStreams
      Resource: "arn:aws:kinesis:${self:provider.region}:*:stream/${self:provider.stage}-todos"

custom:
  defaultStage: dev
  defaultRegion: ap-northeast-1
  profiles:
    dev: devSls
    prod: prodSls
  writeEnvVars:
    STAGE: ${self:provider.stage}

package:
  exclude:
    - .git/**
    - README.md
    - node_modules/serverless-plugin-write-env-vars/**

plugins:
  - serverless-plugin-write-env-vars

functions:
  todosCreate:
    handler: handler.todosCreate
    events:
      - http:
          path: todos
          method: post
          cors: true

  todosUpdate:
    handler: handler.todosUpdate
    events:
      - http:
          path: todos/{id}
          method: patch
          cors: true

  todosDelete:
    handler: handler.todosDelete
    events:
      - http:
          path: todos/{id}
          method: delete
          cors: true

resources:
  Resources:
    TodosKinesisStream:
      Type: 'AWS::Kinesis::Stream'
      DeletionPolicy: Retain
      Properties:
        Name: "${self:provider.stage}-todos"
        ShardCount: 1

Kinesis コンシューマー

ログ出力(console.log)するのみの簡単なコンシューマーです。

handler.js

複数のレコードを Kinesis から取得できるので forEach しています。何件取得するかは serverless.yml で指定できます。

実際に出力される内容を確認すると分かりますが base64 されているので、デコードしています。この後 DynamoDB 等で処理する場合、JSON.pase() を使って JSON 形式にする必要があります。

'use strict';

module.exports.todosConsumer = (event, context, callback) => {
  console.log('Event:', JSON.stringify(event));

  event.Records.forEach(function(record) {
    const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
    console.log('Decoded payload:', payload);
  });

  callback(null, "message");
};

serverless.yml

iamRoleStatements の設定は不要です。events.stream を利用すると Serverless Framework が必要な IAM Policy を追加してくれます。と思ったのですが、初回のデプロイ時のみ必要になるようです。いずれいらなくなると思います。

Kinesis Stream の arn ですがFn::Joinを使って、アカウントIDを埋め込みたかったのですが、エラーが出て動かなかったので環境変数でアカウントIDを渡すようにしています。

この Issue かなっとは思うのですが、なんとも。Kinesis Stream の arn でFn::Joinを使う方法がわかれば修正します。ただ、今回に限っては環境変数を serverless.yml に書く方法がわかったので勉強になりました。

batchSizeが一度の処理で読み込むレコード数です。

startingPositionTRIM_HORIZONLATESTを指定することになりますが、どちらが良いか悩ましいところです。
冪等性を考慮して TRIM_HORIZON が良いのではないかと思っていますが、考えが変わるかもしれません。

service: sls-log-from-kinesis

provider:
  name: aws
  runtime: nodejs4.3
  stage: ${opt:stage, self:custom.defaultStage}
  region: ${opt:region, self:custom.defaultRegion}
  profile: ${self:custom.profiles.${self:provider.stage}}

  iamRoleStatements:
    - Effect: Allow
      Action:
        - kinesis:DescribeStream
        - kinesis:GetRecords
        - kinesis:GetShardIterator
        - kinesis:ListStreams
      Resource: "arn:aws:kinesis:${self:provider.region}:*:stream/${self:provider.stage}-todos"

custom:
  defaultStage: dev
  defaultRegion: ap-northeast-1
  profiles:
    dev: devSls
    prod: prodSls
  writeEnvVars:
    STAGE: ${self:provider.stage}

package:
  exclude:
    - .git/**
    - README.md

functions:
  todosConsumer:
    handler: handler.todosConsumer
    events:
      - stream:
          arn: "arn:aws:kinesis:${self:provider.region}:${env:ACCOUNT_ID}:stream/${self:provider.stage}-todos"
          #arn:
          #  Fn::Join:
          #    - ""
          #    - - "arn:aws:kinesis:${self:provider.region}:"
          #      - Ref: "AWS::AccountId"
          #      - ":stream/${self:provider.stage}-todos"
          batchSize: 1
          startingPosition: TRIM_HORIZON

サンプルを動かしてみる

Serverless Framework のインストール等については「Serverless Framework で Hello World を作る」を参照してください。

サンプルのインストール/デプロイ

Serverless Framework のインストールと AWS への接続ができていたら、以下のコマンドでサンプルをインストール/デプロイできます。

Kinesis プロデューサーのインストール/デプロイ

serverless install --url https://github.com/katsuhiko/sls-kinesis
cd sls-kinesis
npm install
serverless deploy -v

Kinesis コンシューマーのインストール/デプロイ

serverless cli を実行するときに環境変数「ACCOUNT_ID」を指定するようにしてください。

serverless install --url https://github.com/katsuhiko/sls-log-from-kinesis
cd sls-log-from-kinesis
npm install
ACCOUNT_ID=9999 serverless deploy -v

実行

Kinesis プロデューサーの実行

curl を使って Kinesis プロデューサーを実行します。「XXXX」部分は各環境に合わして変更してください。

Create

curl -X POST https://XXXX.execute-api.ap-northeast-1.amazonaws.com/dev/todos --data '{ "content" : "Learn Serverless" }'

Update

curl -X PATCH https://XXXX.execute-api.ap-northeast-1.amazonaws.com/dev/todos/<id> --data '{ "content" : "Understand Serverless" }'

Delete

curl -X DELETE https://XXXX.execute-api.ap-northeast-1.amazonaws.com/dev/todos/<id>

Kinesis コンシューマーの実行

デプロイした時点で、コンシューマーは動いています。ログを参照して本当に動いているかを確認します。

cd sls-log-from-kinesis
ACCOUNT_ID=9999 serverless logs -f todosConsumer -t

あと片付け

cd sls-kinesis
serverless remove -v
cd sls-log-from-kinesis
ACCOUNT_ID=9999 sls-log-from-kinesis remove -v

上記でも Kinesis Stream が残っているので、AWS Console 等から Kinesis Stream を削除します。

感想

serverless.yml の Resources は CloudFormation の設定をそのまま記述できる便利さを理解することができました。
Serverless Framework は aws cli がなくても動く点も良いです。

AWS Console で画面からポチポチリソース作りをしている場合、Serverless Framework を使って yaml でリソース管理をするっという利用方法もあるのではないかと思いました。

Serverless Framework + API Gateway + Lambda + Kinesis + DynamoDB をこんなに簡単に使うことができると、イベントソーシングして CQRS も楽勝です。

イベントソーシングして CQRS が楽勝だと、システムをピタゴラスイッチ化して、新たな混沌を生み出すプロジェクトが、たくさんできそうな気がします。

そうならないように使いどころを見極めていきたいです。

10
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
6