1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Step FunctionsでRDS×ElastiCacheを統合してみた!

Last updated at Posted at 2024-10-19

はじめに ~RDS × ElastiCache って?~

概要

Amazon ElastiCache は、フルマネージドのインメモリデータストアおよびキャッシュサービスです。
ざっくりいうと、RDSと比べ、複雑なデータ構造は持てないが、処理が超早いデータベースです。

主な用途はこんな感じですね。

  • データのキャッシュ保存
  • セッションや重要なデータの高速読み取りがあります

RDS × ElastiCache

以下の記事で紹介されているように、ElastiCacheにメインのDBのキャッシュを任せることで、処理速度の向上や、RDSのリードレプリカ数の削減によるコスト割引などのメリットを享受することができます。

RDS × ElastiCacheの注意点

ここまで聞くと最高じゃん!となるのですが、ちょっとした注意点があります。
それは、RDS × ElastiCache の統合は、AWSがサービスの機能として提供してくれるわけではないということです。
実現しようと思うと、自前での実装が必要になります。

トランザクションやエラー時の制御などを色々考えると、若干ハードルが高いわけですね。

今回のソリューション

今回はこのハードルを少しでも下げられないか?ということで、 AWS Step Functions を使用して、キャッシュ読み取り機能を実装してみようと思います!

つづいて ~AWS Step Functions って?~

概要

ざっくりいうと、サービス間(または組み込み関数)を用いた複雑なフローを実行・制御してくれるサービスです!
AWS Batchをよりフレンドリーにした感じで、GUIで楽しく設計できるのが大きなウリです。

cap0.PNG

こんな感じのGUIでフローを設定できます。
画像はS3オブジェクトをforeachしてDynamoDBに保存し、エラーがあればSNS通知するフローです。適当に置いたので動くのかわかりませんが
このように、各サービスの連携やloop、ifなどをグラフィカルに設定できるサービスです。
異常系の処理も簡単に設定できるのがすごいですよね。

こんかいのアーキテクチャ図

arch_a.png

主な処理フローです
① キャッシュ取得lambdaを実行
② elasticacheへクエリ
③ 結果を返却
④ キャッシュが存在しない場合オリジンへクエリ
⑤ RDSへクエリ
⑥ 結果を返却
⑦ elasticacheへキャッシュをインサート
⑧ 結果を返却

※本来はキャッシュの更新は非同期でやるべしだと思うのですが、簡単アーキにしてます
※本来はRDS Proxyなどを使用するべしなのですが、簡単アーキにしています。あくまでsfnの検証がスコープ

今回はこのフローをAWS Step Functions で構築しようと思います。

下準備

主役以外のDB、lambda、AWS Step Functions用のサービスロールはTerraformでサクッと作ります。

PJTコード(長いのでおりたたみ)
SFN-MANAGE-CACHE/
│
├── lambda/
│   ├── operate-postgresql/
│   │   ├── node_modules/
│   │   ├── index.mjs
│   │   └── package.json             // postgresql操作用にpgライブラリを使用する
│   │
│   ├── operate-redis_get/           // キャッシュget用lambda
│   │   ├── node_modules/
│   │   ├── index.mjs
│   │   └── package.json             // redis操作用にioredisライブラリを使用する
│   │
│   ├── operate-redis_set/           // キャッシュset用lambda
│   │   ├── node_modules/
│   │   ├── index.mjs
│   │   └── package.json             // redis操作用にioredisライブラリを使用する
│   │
│   ├────── operate-postgresql.zip   // lambda用zip
│   ├────── operate-redis_get.zip    // lambda用zip
│   └────── operate-redis_set.zip    // lambda用zip
│
└── terraform/
    └── main.tf

zipの作成から、リソースの作成まで全てterraformに任せています。

terraform/main.tf
############################################################################
## terraformブロック
############################################################################
terraform {
  # Terraformのバージョン指定
  # TODO:その日が来たらアプデする
  required_version = "~> 1.7.0"

  # Terraformのaws用ライブラリのバージョン指定
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.33.0"
    }
  }

  # 簡単な検証なので、ローカルにtfstateを保持する
  backend "local" {
    path = "local.tfstate"
  }
}

############################################################################
## providerブロック
############################################################################
provider "aws" {
  # リージョンを指定
  region = "ap-northeast-1"
}

############################################################################
## localsブロック
############################################################################
locals {
  project = "stepfunction-manage-cache"

  lambda_base_path  = "../lambda"

  lambda_redis_functions = ["get", "set"]
  lambda_redis      = "operate-redis"
  lambda_redis_path = "${local.lambda_base_path}/${local.lambda_redis}"

  lambda_postgresql      = "operate-postgresql"
  lambda_postgresql_path = "${local.lambda_base_path}/${local.lambda_postgresql}"

  db_count = 1

  db_username = "testuser"
  db_password = "password"
  db_name     = "test"
}

############################################################################
## VPC
## 特に理由もないので、公式modulesを使用する
## natも不要なので、intra subnetのみ作成する
############################################################################
module "intra_vpc" {
  source  = "terraform-aws-modules/vpc/aws"
  version = "5.9.0" # TODO: providerと一緒に更新する

  name = "${local.project}-intra-vpc"
  cidr = "10.0.1.0/24"

  azs                = ["ap-northeast-1a", "ap-northeast-1c"]
  intra_subnets      = ["10.0.1.0/25", "10.0.1.128/25"]
  enable_nat_gateway = false
}

############################################################################
## Lambda 2つ作る
## 共通で参照するリソース群を定義
############################################################################
# lambda用AWSロール
data "aws_iam_policy" "vpc_access_execution" {
  arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}

data "aws_iam_policy_document" "assume_lambda" {
  statement {
    effect = "Allow"

    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

resource "aws_iam_role" "lambda_vpc" {
  name               = "${local.project}-lambda-vpc-role"
  assume_role_policy = data.aws_iam_policy_document.assume_lambda.json
}

resource "aws_iam_role_policy_attachment" "lambda_vpc" {
  role       = aws_iam_role.lambda_vpc.name
  policy_arn = data.aws_iam_policy.vpc_access_execution.arn
}

# lambda用セキュリティグループ
# VPC内のみ通信できればよい
resource "aws_security_group" "lambda_vpc" {
  name   = "${local.project}-lambda-vpc-sg"
  vpc_id = module.intra_vpc.vpc_id

  tags = {
    Name = "${local.project}-lambda-vpc-sg"
  }
}

resource "aws_security_group_rule" "lambda_vpc" {
  security_group_id = aws_security_group.lambda_vpc.id
  type              = "egress"
  from_port         = 0
  to_port           = 0
  protocol          = "-1"
  cidr_blocks       = [module.intra_vpc.vpc_cidr_block]
}

############################################################################
## Lambda in VPC その1
## redisを操作するlambda関数
## GetとSet用に個別作成する
## 忘れてた。特に理由がないなら性能の高いARM64にすべき。デフォはx86_64
## https://qiita.com/hats_yaki/items/9d7e3522b1158f099645
## 検証後気づいたので、今後気を付ける
############################################################################
# ログ残す
resource "aws_cloudwatch_log_group" "redis" {
  for_each = toset(local.lambda_redis_functions)

  name              = "/aws/lambda/${local.lambda_redis}_${each.key}"
  retention_in_days = 14
}

# zipを作成
data "archive_file" "redis" {
  for_each = toset(local.lambda_redis_functions)

  type             = "zip"
  output_file_mode = "0666"
  source_dir       = "${local.lambda_redis_path}_${each.key}"
  output_path      = "${local.lambda_base_path}/${local.lambda_redis}_${each.key}.zip"
}

resource "aws_lambda_function" "redis" {
  for_each = toset(local.lambda_redis_functions)

  function_name = "${local.lambda_redis}_${each.key}"
  role          = aws_iam_role.lambda_vpc.arn

  runtime  = "nodejs18.x"
  filename = data.archive_file.redis[each.key].output_path
  handler  = "index.handler"

  source_code_hash = filebase64sha256(data.archive_file.redis[each.key].output_path)

  logging_config {
    log_format = "Text"
    log_group  = aws_cloudwatch_log_group.redis[each.key].name
  }

  vpc_config {
    subnet_ids         = module.intra_vpc.intra_subnets
    security_group_ids = [aws_security_group.lambda_vpc.id]
  }

  environment {
    variables = {
      # redisのaddressが評価できた場合、設定。countが0の場合はエラーになるので、tryでcatchしてnullを返却する。
      # coalesceはnullの場合第二引数をとる。冗長なcatchな気がする
      REDIS_HOST = coalesce(try(aws_elasticache_cluster.redis[0].cache_nodes[0].address, null), "DUMMY_REDIS_HOST")
    }
  }

  # zip内の変更は無視する
  lifecycle {
    ignore_changes = [filename, source_code_hash]
  }
}

############################################################################
## Lambda in VPC その2
## postgresqlを操作するlambda関数
############################################################################
# ログ残す
resource "aws_cloudwatch_log_group" "postgresql" {
  name              = "/aws/lambda/${local.lambda_postgresql}"
  retention_in_days = 14
}

# zipを作成
data "archive_file" "postgresql" {
  type             = "zip"
  output_file_mode = "0666"
  source_dir       = local.lambda_postgresql_path
  output_path      = "${local.lambda_base_path}/${local.lambda_postgresql}.zip"
}

resource "aws_lambda_function" "postgresql" {
  function_name = local.lambda_postgresql
  role          = aws_iam_role.lambda_vpc.arn

  runtime  = "nodejs18.x"
  filename = data.archive_file.postgresql.output_path
  handler  = "index.handler"

  source_code_hash = filebase64sha256(data.archive_file.postgresql.output_path)

  logging_config {
    log_format = "Text"
    log_group  = aws_cloudwatch_log_group.postgresql.name
  }

  vpc_config {
    subnet_ids         = module.intra_vpc.intra_subnets
    security_group_ids = [aws_security_group.lambda_vpc.id]
  }

  environment {
    variables = {
      # rdsのaddressが評価できた場合、設定。countが0の場合はエラーになるので、tryでcatchしてnullを返却する。
      # coalesceはnullの場合第二引数をとる。冗長なcatchな気がする
      DB_HOST     = coalesce(try(aws_db_instance.postgresql[0].address, null), "DUMMY_HOST") # port不要なのでaddressを取得する。endpointだとportもついてくる
      DB_USER     = coalesce(try(aws_db_instance.postgresql[0].username, null), "DUMMY_DB_USER")
      DB_PASSWORD = coalesce(try(aws_db_instance.postgresql[0].password, null), "DUMMY_DB_PASSWORD") # 簡単のため今回はこの方法で参照するが、tfstateに記載されるので、本来はsecret managerから取得するべき
      DB_DATABASE = coalesce(try(aws_db_instance.postgresql[0].db_name, null), "DUMMY_DB_DATABASE")
    }
  }

  # zip内の変更は無視する
  lifecycle {
    ignore_changes = [filename, source_code_hash]
  }
}

############################################################################
## ElastiCache SSO Redis
## valkeyはドキュメント少なそうだったので、とりあえずRedisを選択
## クラスタなどは不要なので最小構成で作成する
############################################################################
resource "aws_elasticache_subnet_group" "redis" {
  name       = "redis-cache-intra-subnet"
  subnet_ids = module.intra_vpc.intra_subnets
}

resource "aws_security_group" "redis" {
  name   = "${local.project}-redis-sg"
  vpc_id = module.intra_vpc.vpc_id

  tags = {
    Name = "${local.project}-redis-sg"
  }
}

resource "aws_security_group_rule" "redis" {
  security_group_id        = aws_security_group.redis.id
  type                     = "ingress"
  from_port                = 6379
  to_port                  = 6379
  protocol                 = "TCP"
  source_security_group_id = aws_security_group.lambda_vpc.id
}

resource "aws_elasticache_cluster" "redis" {
  # 節約のため不要時は削除
  count = local.db_count

  cluster_id           = "${local.project}-cluster-example"
  engine               = "redis"
  node_type            = "cache.t2.medium" # aws_elasticache_cluster.redis: Creation complete after 5m17s
  num_cache_nodes      = 1
  parameter_group_name = "default.redis7"
  engine_version       = "7.1"
  port                 = 6379
  subnet_group_name    = aws_elasticache_subnet_group.redis.name
  security_group_ids   = [aws_security_group.redis.id]
}

############################################################################
## RDS postgresql
## エンジンはなんでもいいので、なんとなくpostgresql
## クラスタなどは不要なので最小構成で作成する
############################################################################
resource "aws_db_subnet_group" "postgresql" {
  name       = "${local.project}-intra-subnet"
  subnet_ids = module.intra_vpc.intra_subnets

  tags = {
    Name = "${local.project}-intra-subnet"
  }
}

resource "aws_security_group" "postgresql" {
  name   = "${local.project}-postgresql-sg"
  vpc_id = module.intra_vpc.vpc_id

  tags = {
    Name = "${local.project}-postgresql-sg"
  }
}

resource "aws_security_group_rule" "postgresql" {
  security_group_id        = aws_security_group.postgresql.id
  type                     = "ingress"
  from_port                = 5432
  to_port                  = 5432
  protocol                 = "TCP"
  source_security_group_id = aws_security_group.lambda_vpc.id
}

resource "aws_db_instance" "postgresql" {
  # 節約のため不要時は削除
  count = local.db_count

  #################################################
  ## インスタンス基本設定
  #################################################
  identifier             = "${local.project}-rds-postgresql"
  engine                 = "postgres"
  engine_version         = "16.3"        # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Concepts.General.DBVersions.html
  instance_class         = "db.t3.micro" # aws_db_instance.postgresql: Creation complete after 4m47s 
  vpc_security_group_ids = [aws_security_group.postgresql.id]

  #################################################
  ## DBアプリ設定
  #################################################
  db_name = local.db_name

  #################################################
  ## ストレージ設定
  #################################################
  storage_type      = "gp2"
  storage_encrypted = false
  allocated_storage = 10
  # ストレージ自動スケーリング上限(GB)
  max_allocated_storage = 30

  #################################################
  ## ログイン情報
  ## adminは基本的にアプリに使用しないが、簡単のため
  #################################################
  username = local.db_username
  password = local.db_password
  port     = 5432

  #################################################
  ## ネットワーク
  #################################################
  publicly_accessible  = false
  db_subnet_group_name = aws_db_subnet_group.postgresql.name
  multi_az             = false

  #################################################
  ## DBインスタンス管理
  #################################################
  backup_window = "09:10-09:40"
  # アップデートの実行を次のメンテナンスウィンドウまで待機
  apply_immediately          = false
  maintenance_window         = "mon:10:10-mon:10:40"
  auto_minor_version_upgrade = false

  #################################################
  ## 削除保護
  #################################################
  deletion_protection      = false
  skip_final_snapshot      = true
  delete_automated_backups = false
  backup_retention_period  = 0
}

############################################################################
## Step Functions
## 本体はGUIで作成するので、ロールだけ用意
## 作業証跡を残すため、sfn本体も一時的に定義
############################################################################
# Step Functions用のIAMロールを作成
data "aws_iam_policy_document" "assume_sfn" {
  statement {
    effect = "Allow"

    principals {
      type        = "Service"
      identifiers = ["states.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

resource "aws_iam_role" "sfn" {
  name               = "${local.project}-sfn-role"
  assume_role_policy = data.aws_iam_policy_document.assume_sfn.json
}

data "aws_iam_policy_document" "sfn" {
  statement {
    effect = "Allow"
    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
      "logs:DescribeLogGroups",
      "logs:DescribeLogStreams",
    ]
    resources = ["*"]
  }
  statement {
    effect = "Allow"
    actions = [
      "lambda:InvokeFunction",
    ]
    resources = concat(
      [for lambda in aws_lambda_function.redis : "${lambda.arn}:*"],
      ["${aws_lambda_function.postgresql.arn}:*"]
    )
  }
}

resource "aws_iam_policy" "sfn" {
  name        = "${local.project}-sfn-policy"
  description = "IAM policy for Step Functions"
  policy      = data.aws_iam_policy_document.sfn.json
}

resource "aws_iam_role_policy_attachment" "sfn" {
  role       = aws_iam_role.sfn.name
  policy_arn = aws_iam_policy.sfn.arn
}
lambda/operate-postgresql/package.json
{
  "name": "operate-postgres",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "pg": "^8.13.0"
  }
}
// lambda/operate-postgresql/index.mjs
export const handler = async (event, context) => {
    console.log("========================================");
    console.log(event);
    console.log("========================================");

    return {
        "statusCode": "200",
        "body": JSON.stringify({ "test": "value" })
    };
};

getもsetも同一の内容。あとでいろいろコードは変更します。

lambda/operate-redis_get/package.json
{
  "name": "operate-redis",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "ioredis": "^5.4.1"
  }
}
// lambda/operate-redis_get/index.mjs
export const handler = async (event, context) => {
    console.log("========================================");
    console.log(event);
    console.log("========================================");

    return {
        "statusCode": "200",
        "body": JSON.stringify({ "test": "value" })
    };
};

リソースの確認

terraformを実行し、リソースを作成します

RDS

cap4.PNG

元気そうですね

Elasticache

cap3.PNG

大丈夫そうですね

lambdaたち

cap72.PNG

しっかり作成されています。
コードを編集し、DBを操作してみます。

redisを操作する

index.mjsを以下のように編集します。

import { Redis } from "ioredis"

// ElastiCacheの接続情報
const redisConfig = {
    host: process.env.REDIS_HOST,
    port: 6379,
};

const redisClient = new Redis(redisConfig);

export const handler = async (event, context) => {
    const res = await redisClient.set("test", "asfdaaaaa")
    console.log(res)
    console.log(111, await redisClient.get("test"));
    console.log(222, await redisClient.get("testaaaa")); // 存在しない場合、nullが返却される

    return {
        "statusCode": "200",
        "body": JSON.stringify({ "test": "value" })
    };
};

実行してみます。

cap5.PNG

cap6.PNG

ちゃんと疎通していることと、
データが存在しない場合、nullが返却されることがわかりました。

動作確認が終わったので、以下のように編集し、準備完了です。

// operate-redis_get/index.mjs
import { Redis } from "ioredis"

// ElastiCacheの接続情報
const redisConfig = {
    host: process.env.REDIS_HOST,
    port: 6379,
};

// ウォームスタート時にコネクションを再利用する
const redisClient = new Redis(redisConfig);

export const handler = async (event, context) => {
    const key = event.email
    console.log(`key is ${key}`)
    const result = await redisClient.get(key) // 存在しない場合、nullが返却される

    return {
        "cache": JSON.parse(result),
    };
};
// operate-redis_set/index.mjs
import { Redis } from "ioredis"

// ElastiCacheの接続情報
const redisConfig = {
    host: process.env.REDIS_HOST,
    port: 6379,
};

// ウォームスタート時にコネクションを再利用する
const redisClient = new Redis(redisConfig);

export const handler = async (event, context) => {
    const user = event.user
    console.log(`user is`)
    console.log('%o', user)

    const result = await redisClient.set(user.email, JSON.stringify(user))

    return {
        user
    };
};

postgresqlを操作する

index.mjsを以下のように編集します。

import pg from 'pg';
const { Client } = pg;

const client = new Client({
    host: process.env.DB_HOST,
    user: process.env.DB_USER,
    database: process.env.DB_DATABASE,
    password: process.env.DB_PASSWORD,
    port: 5432,
    ssl: {
        rejectUnauthorized: false // 注意: 本番環境では適切な証明書を使用すること
    }
});

const setup = async () => {
    await client.connect();

    // テーブルの作成 (既に存在する場合はスキップ)
    await client.query(`
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL
        )
    `);

    // 3件のデータを挿入
    const users = [
        ['John Doe', 'john@example.com'],
        ['Jane Smith', 'jane@example.com'],
        ['Bob Johnson', 'bob@example.com']
    ];

    for (const [name, email] of users) {
        const insertResult = await client.query(
            'INSERT INTO users (name, email) VALUES ($1, $2) ON CONFLICT (email) DO NOTHING RETURNING id',
            [name, email]
        );
        if (insertResult.rows.length > 0) {
            console.log('Inserted user ID:', insertResult.rows[0].id);
        } else {
            console.log('User already exists:', email);
        }
    }
}

export const handler = async (event, context) => {
    try {
        await setup();

        // データの選択
        const selectResult = await client.query('SELECT * FROM users');
        console.log('Current users in the table:', selectResult.rows);

        return {
            statusCode: 200,
            body: JSON.stringify({ 
                message: "Operation completed successfully",
                users: selectResult.rows
            })
        };
    } catch (error) {
        console.error('Error:', error);
        return {
            statusCode: 500,
            body: JSON.stringify({ error: "An error occurred during execution" })
        };
    } finally {
        await client.end();
    }
};

実行してみます。

cap7.PNG

cap8.PNG

users テーブル:
+----+-------------+--------------------+
| id | name        | email              |
+----+-------------+--------------------+
|  1 | John Doe    | john@example.com   |
|  2 | Jane Smith  | jane@example.com   |
|  3 | Bob Johnson | bob@example.com    |
+----+-------------+--------------------+
(3 行)

ちゃんと疎通していることがわかりました。

動作確認が終わったので、以下のように編集し、準備完了です。

import pg from 'pg';
const { Client } = pg;

const client = new Client({
    host: process.env.DB_HOST,
    user: process.env.DB_USER,
    database: process.env.DB_DATABASE,
    password: process.env.DB_PASSWORD,
    port: 5432,
    ssl: {
        rejectUnauthorized: false // 注意: 本番環境では適切な証明書を使用すること
    }
});

// ウォームスタート時にコネクションを再利用する
await client.connect()

export const handler = async (event, context) => {
    const email = event.email
    console.log(`email is ${email}`)
    
    // event.emailをキーにして1件のユーザー情報を取得
    const selectResult = await client.query('SELECT * FROM users WHERE email = $1', [email]);
    console.log(selectResult)

    return {
        user: selectResult.rows[0]
    };
};

今回、try-catch句はあえてjs側では実装しませんでした。
そこらへんは全てAWS Step Functionsに任せる方が、シンプルかなーと思いました。
意見あったらコメント欲しいです!

AWS Step Functions用のIamロール

ロールの存在を確認します。

cap2.PNG

良い感じですね。
(出力しませんが)logsとlambda発火の権限を持っていることを確認します。

AWS Step Functions

では、ここからが本番です。AWS Step Functionsを作成します。

ステートマシンを作成する

cap9.PNG

コンソールから作成していきます。
今すぐ始める をクリックします。

cap10.PNG

テンプレートを選べます。
今回は 自分で作成する をクリックします。

ステートマシンをいったん保存する

cap71.PNG

作成されたステートマシンが表示されます。このままでは保存できないので、編集します。

cap16.PNG

フロータブへ移動します。

cap17.PNG

Success を追加します。これで保存可能ですが、ステートマシンの名前や、ロールなどを設定します。

cap11.PNG

設定タブを選択します。
ステートマシンの名前や、ロールなど、その他の設定を編集します。

cap12.PNG

ステートマシン名を入力します。任意の名前で良いので、適当。

cap13.PNG

ロールを選択します。Terraformで用意したものを選択します。

cap14.PNG

その他設定はいったんなしにしておきます。X-Rayと統合されてるの良いですね。

cap15.PNG

作成 をクリックします。

cap18.PNG

保存されました!!

ステートマシンを実行してみる

実行してどんな感じか体験してみます。

cap19.PNG

右上の 実行を開始 をクリックします。

cap20.PNG

input Jsonが要求されます。何も設定せず、 実行を開始 をクリックします。

cap22.PNG

フローが実行されました!
こんな感じで、フローをGUIで編集、Inputを指定して実行、を繰り返して開発していきます。

ではさっそくlambdaとつないでいきましょう!!

キャッシュを取得する

まずはキャッシュ取得処理を追加します!

cap23.PNG

アクションタブから、 AWS Lambda Invoke アクションを選択します。
見ての通り、Lambdaを実行するアクションですね。
配置したら、アクションの設定を行っていきます。

cap24.PNG

ラベルと実行するlambda関数を設定します。
キャッシュを取得するので、「Query Cache」、関数は作成した redis_get を指定します。

cap25.PNG

保存してテストしましょう。

export const handler = async (event, context) => {
    const key = event.email
    console.log(`key is ${key}`)
    const result = await redisClient.get(key) // 存在しない場合、nullが返却される

    return {
        "cache": JSON.parse(result),
    };
};

redisからキャッシュをgetするlambdaのコードの一部を再掲します。
event.emailをキーにしているので、Input Jsonにemailを含めてあげます。

cap26.PNG

こんな感じです!
先ほどlambdaとredisの疎通確認でtestをキーにvalueを保存していたので、取得されてほしいところです。
実行してみます。

cap27.PNG

lambdaの実行が成功しました。ロールは正しく設定されていそうです。
気になる出力を確認します。

cap28.PNG

cacheに先ほどテスト用にsetした値が返ってきてますね!!
では、存在しないキーでも検索してもらいましょう。

cap30.PNG

Input Jsonでemailに存在しないキーを設定してみます。
実行を開始!!

cap31.PNG

lambdaが実行され、nullが返却されました。
いいですね!

では次は、lambdaの実行結果でif分岐するようにフローを編集します。

Choiceフローを追加する

Step Functionsでは、Choiceという部品で分岐ロジックを実装できます。

cap32.PNG

フローからChoice を選択し、lambda関数につなげます。

cap33.PNG

Choice Rulesを編集します。if文の評価式をいじるイメージです。

cap34.PNG

Add Conditionsをクリックします。

cap35.PNG

$はinputを意味します。この場合はlambda関数のアウトプットですね。
画像の設定では、lambda関数のアウトプットのcacheが、Nullでなければ、という条件になります。
条件を保存します。

cap40.PNG

その後、next state を 新しい状態を追加 に設定します。

cap38.PNG

Choiceの下に、新しくスペースがうまれました。

cap39.PNG

まずは分岐するかどうかを確認したいので、lambdaを設定せず、Passというデータを変換するフローを置いてみます。

cap41.PNG

Passの名前を変更して、どっちに分岐したかわかりやすくします。
Nullじゃなかった場合に通るフローなので、 Cache Hit と設定します。

Nullだった場合の分岐も作ります。
Choiceを再度編集します。

cap36.PNG

Add new choice rule をクリックします。elseの役割をもつDefault ruleでも大丈夫です。

cap37.PNG

2つ目のRuleを指定します。
cacheが存在しない場合(is present === false)または、Nullの場合、こちらに分岐させます。

cap42.PNG

同様に後続にPassを追加し、分岐ロジックを設定できました。

cap43.PNG

テストします!!

input.json
{
	"email" : "test"
}

で実行しました。

cap44.PNG

cap45.PNG

いい感じです。

input.json
{
	"email" : "sonzai shinai key"
}

で実行しました。

cap46.PNG

こっちもいい感じ!

if分岐が実装できたので、次は、キャッシュが存在しない場合、実際にRDSへクエリするようフローを編集していきます!

RDSをクエリするlambdaを追加する

cap47.PNG

キャッシュが存在しなかった場合のPassの後続に、Invoke Lambdaを追加します。
わかりやすい名前と、関数の選択をします。

cap48.PNG

lambdaのevent引数に渡す値を設定します。
Step Functionsはデフォルトでは一つ前のアクションのアウトプットが次のアクションのインプットになります。
このlambdaの場合はPassのアウトプットの { "cache" : null} がインプットですね。
ただ、これをインプットにされても、クエリするべきemailが分かりません。
そこで、payloadを入力し、Passのアウトプットではない値をlambdaに渡してあげます。

cap49.PNG

Input Jsonの中身は、$$.Execution.Input で参照できます。
今回はemailを参照したいので、$$.Execution.Input.email としました。
キーが「.$」で終わる必要があると記載のあるので、キーは user.$ とします。

では実行してみましょう!!

cap53.PNG

RDSにレコードが存在するemailを指定します。

cap50.PNG

きたーーーちゃんとクエリできてますね。
これでキャッシュミス時のオリジンRDSへのクエリ処理が実装できました。
次は、オリジンへのクエリ後、キャッシュを更新する処理を実装しましょう。
lambdaつなぐだけなので、Step Functionsなら簡単です!!!

キャッシュを追加するlambdaを追加する

cap52.PNG

Incoke Lambdaを追加します。
今回はひとつ前のlambdaのアウトプットをそのまま渡せばオッケーなので、他の操作は行いません。
テストします!!!!!

cap53.PNG

レコードの存在するemailを設定し、実行します。

cap54.PNG

RDSの検索結果が返却されました!!!
lambdaが落ちていないので、キャッシュは追加されたように見えます。

もう一度同じemailでステートマシンを実行します。

cap55.PNG

きたーーーーーーーーキャッシュから取得し、RDSは触ってないですね!!
ただ。キー名がcache になってますね。オリジンからクエリしたときと同様の形式にしましょう。

cap57.PNG

Cache Hit時のPassを編集します。
Passは入力値を整形してアウトプットする使い方もできます。

入力の一部を使用して新しいJSONペイロードを構築、と書かれてますね。
Parametersを使用して入力を変換 にチェックをいれます。

cap58.PNG

{ "user" : { ... }} の形式になるよう設定します。

再度同じemailでステートマシンを実行してみます。

cap59.PNG

おっけーですね!!!
RDSへクエリした時と同じ形式で返却されました。

cap61.PNG

最後に、実行時間の差も見てみましょう。
set処理が同期的に実行されるため正しい比較ではないですが、結構違いますね。redisはやい!

以上で、今回作りたかったRDS × ElastiCache のキャッシュ処理フローは完成です!

Step Functionsでフローを構築するメリットとデメリット

最後に、Step Functionsでフローを構築するメリットとデメリットをまとめてみます。

メリット

  • フローの流れを一望できる
    Step FunctionsのGUIのおかげで、
    各サービスがどのように処理されるか/エラーハンドリングされるか?はかなり分かりやすいと思います。
    SNSやSQSで繋がってるフローであれば比較的楽に把握できるかもしれませんが、
    eventbridgeが待機しているフローなどの全容の把握ってかなり難しいですよね。

cap60.PNG

Step Functionsなら視覚的に把握できます!!

  • フロー全体のエラーハンドリングを簡単に実装できる
    記事引用で恐縮なのですが・・・。

上記ブログ記事の通り、step functions内のフロー全体をまるまるキャッチする仕組みを構築できます。
とりあえずSNSに送る!みたいな運用が可能で、どんなエラーが起こるかわからないローンチ初期などは、こういったキャッチ機構が存在すると嬉しいのではと思います。

デメリット

  • JSON pathの勉強が必要
    理解してるような、してないような。自信がない・・・。

  • Step Functionsに多くの権限が付与される
    ゴッドクラス・ゴッドメソッドのような問題が発生するのでは、と思っています。
    最小権限の法則とは若干喧嘩してしまうなーと感じます。

総論

デメリットはありつつ、いろんなサービスを統合するフローをGUIで作成できるのはかなり良いと感じます。
様々な使い方ができるサービスだと思うので、慣れていきたいですねー

おまけ

今回の作業リポジトリ

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?