API Gatewayでリクエストして、Lambdaで処理させて、AppSyncで受け取る
この記事はサーバーレスWebアプリ Mosaicを開発して得た知見を振り返り定着させるためのハンズオン記事の1つです。
以下を見てからこの記事をみるといい感じです。
はじめに
検出した顔を並べて表示しました。次はいよいよ顔にモザイクをかけてゆきます。
処理をキックするためにAPI GatewayとLambdaを利用します。また、実行結果はAppSyncのSubscriptionで受け取ります。
顔の位置情報を渡す
前回の記事(Lambda(Python) + Rekognition で顔検出)では顔を検出して切抜き画像を作って表示するだけでした。この時に顔の位置情報(座標)が分かってます。モザイク処理する時にこの座標が必要になりますのでAppSync経由で渡してあげましょう。
(キックした後にまた検出し直すこともできますが、それだと無駄が多いですよね。)
以前書いた顔検出するサンプルコードにpointsという位置情報を入れておくための変数を追加します。
def uploadImage(image, localFilePath, bucket, s3Key, group, points):
logger.info("start uploadImage({0}, {1}, {2}, {3})".format(localFilePath, bucket, s3Key, group))
try:
cv2.imwrite(localFilePath, image)
s3.upload_file(Filename=localFilePath, Bucket=bucket, Key=s3Key)
apiCreateTable(group, s3Key, points)
except Exception as e:
logger.exception(e)
raise e
finally:
if os.path.exists(localFilePath):
os.remove(localFilePath)
def apiCreateTable(group, path, points):
logger.info("start apiCreateTable({0}, {1}, {2})".format(group, path, points))
try:
query = gql("""
mutation create {{
createSampleAppsyncTable(input:{{
group: \"{0}\"
path: \"{1}\"
points: \"{2}\"
}}){{
group path points
}}
}}
""".format(group, path, points))
_client.execute(query)
except Exception as e:
logger.exception(e)
raise e
def detectFaces(bucket, key, fileName, image, group, dirPathOut):
logger.info("start detectFaces ({0}, {1}, {2}, {3}, {4})".format(bucket, key, fileName, group, dirPathOut))
try:
response = rekognition.detect_faces(
Image={
"S3Object": {
"Bucket": bucket,
"Name": key,
}
},
Attributes=[
"DEFAULT",
]
)
name, ext = os.path.splitext(fileName)
imgHeight = image.shape[0]
imgWidth = image.shape[1]
index = 0
for faceDetail in response["FaceDetails"]:
index += 1
faceFileName = "face_{0:03d}".format(index) + ext
box = faceDetail["BoundingBox"]
x = max(int(imgWidth * box["Left"]), 0)
y = max(int(imgHeight * box["Top"]), 0)
w = int(imgWidth * box["Width"])
h = int(imgHeight * box["Height"])
points = "{0},{1}|{2},{3}|{4},{5}|{6},{7}".format(x, y, x+w, y, x+w, y+h, x, y+h)
logger.info("BoundingBox({0},{1},{2},{3})".format(x, y, w, h))
faceImage = image[y:min(y+h, imgHeight-1), x:min(x+w, imgWidth)]
localFaceFilePath = os.path.join("/tmp/", faceFileName)
uploadImage(faceImage, localFaceFilePath, bucket, os.path.join(dirPathOut, faceFileName), group, points)
cv2.rectangle(image, (x, y), (x+w, y+h), (0, 0, 255), 3)
processedFileName = "faces-" + fileName
processedFilePath = "/tmp/" + processedFileName
uploadImage(image, processedFilePath, bucket, os.path.join(dirPathOut, processedFileName), group, points)
except Exception as e:
logger.exception(e)
raise e
AppSyncのスキーマも更新しましょう。
AWSコンソール > AppSync > 目的のAPI > スキーマ
input(引数)とtype(戻り値)のデータに points: String を加えておきます。
input CreateSampleAppsyncTableInput {
group: String!
path: String!
points: String
}
type SampleAppsyncTable {
group: String!
path: String!
points: String
}
フロントのWebアプリ側も更新します。
getListやsubscriptionでpointsを受け取るため、graphqlファイルを更新します。
export const listSampleAppsyncTables = `query listSampleAppsyncTables($group: String) {
listSampleAppsyncTables(
limit: 1000000
filter: {
group: {eq:$group}
}
)
{
items
{
group
path
points
}
}
}
`;
export const onCreateSampleAppsyncTable = `subscription OnCreateSampleAppsyncTable($group: String) {
onCreateSampleAppsyncTable(group : $group) {
group
path
points
}
}
`;
さぁこれで、検出した顔の座標をクライアント側に渡すことができるようになりました。
受け取ったpointsをメンバ変数にセットしたり、ログに出したりして確認してみましょう。
:
<script>
:
async getList() {
this.group = this.$route.query.group;
console.log("group : " + this.group);
if(!this.group){
return;
}
let apiResult = await API.graphql(graphqlOperation(listSampleAppsyncTables, { group : this.group }));
let listAll = apiResult.data.listSampleAppsyncTables.items;
for(let data of listAll) {
let tmp = { path : data.path, image : "", points : data.points };
let list = [...this.dataList, tmp];
this.dataList = list;
console.log("path : " + data.path);
console.log("points : " + data.points);
Storage.get(data.path.replace('public/', ''),
{ level: 'public', expires: dataExpireSeconds }).then(result => {
tmp.image = result;
console.log("image : " + result);
}).catch(err => console.log(err));
}
API.graphql(
graphqlOperation(onCreateSampleAppsyncTable, { group : this.group } )
).subscribe({
next: (eventData) => {
let data = eventData.value.data.onCreateSampleAppsyncTable;
let tmp = { path : data.path, image : "", points : data.points };
let list = [...this.dataList, tmp];
this.dataList = list;
console.log("path : " + data.path);
console.log("points : " + data.points);
Storage.get(data.path.replace('public/', ''),
{ level: 'public', expires: dataExpireSeconds }).then(result => {
tmp.image = result;
console.log("image : " + result);
}).catch(err => console.log(err));
}
});
},
:
顔を選択するための実装は今回は割愛し、検出された全ての顔にモザイクをかけるようにしたいと思います。
全ての顔にモザイクをかける処理を実行するためのボタンを設置し、押下したらAPI Gatewayに対して処理をリクエストします。モザイク処理された画像は他の画像と同様にS3にアップロードしたらAppSync経由でパスを通知し、クライアント側でそれを受け取るような流れです。
API GatewayとLambdaのセットアップ
以下のような流れでAPI GatewayとLambdaをセットアップしてゆきます。
Lambdaの作成
先にLambdaの関数を作成しておきます。
AWSコンソール > Lambda > 関数 > 関数の作成
一から作成, 任意の関数名, ランタイムにはPython3.6, 実行ロールは「基本的な Lambda アクセス権限で新しいロールを作成」を選択して作成します。
関数名は「sample_lambda_apply」としておきました。
関数コードはひとまずインラインで以下のようにしておきます。
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Hello from lambda! - sample_lambda_apply")
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda! - sample_lambda_apply')
}
LambdaのIAMロール編集
LambdaがS3操作やAppSyncアクセスできるようにします。
AWSコンソール > IAM > ロール
Lambdaと一緒に作成された sample_lambda_apply-role-xxxxxxxx というロールを表示します。
アクセス権限タブの「+インラインポリシーの追加」から追加してください。
ポリシーのJSONは以下のような感じになります。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:PutObject",
"s3:GetObject",
"appsync:GraphQL"
],
"Resource": [
"arn:aws:s3:::sample-vue-project-bucket-work/*",
"arn:aws:appsync:ap-northeast-1:888888888888:apis/xxxxxxxxxxxxxxxxxxxxxxxxxx/*"
],
"Effect": "Allow"
}
]
}
API Gatewayの作成
続いて、API Gatewayを作成します。
AWSコンソール > API Gateway
APIを作成ボタンを押下し、APIタイプを選択画面でREST APIを選択します。
REST, 新しいAPI, 任意の名前を設定し, エンドポイントタイプはリージョンとし、APIの作成ボタンを押下します。
processという名前の子リソースを作成し、そこにPOSTメソッドを作成してゆこうと思います。リソースを作成する際、CORSを有効にするチェックはONにしておきましょう。
POSTのセットアップでは、統合タイプはLambda関数とし、Lambda関数は先程作成した「sample_lambda_apply」を選択し、保存ボタンを押下します。
CORSの有効化をしないと、クライアント側からリクエストした際に以下のような例外が発生すると思います。
Access to XMLHttpRequest at 'https://j2byqj306a.execute-api.ap-northeast-1.amazonaws.com/work/process' from origin 'https://fed9513d88324171b593944f5acca30f.vfs.cloud9.ap-northeast-1.amazonaws.com' has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.
Error: Network Error
at createError (createError.js?2d83:16)
at XMLHttpRequest.handleError (xhr.js?b50d:81)
CORSの有効化をしたら、APIのデプロイも再実施しましょう。
念の為テストを実行してstatusCode=200が返ってくることも確認しておきましょう。
確認できたら、APIのデプロイから任意のステージ名を設定し、デプロイを行っておいてください。
VueのWebアプリからAPI Gatewayに対してリクエストする
WebアプリからAPI Gatewayを呼べるようにします。
:
<v-list>
<v-list-item v-for="data in this.dataList" :key="data.path">
<v-list-item-content>
<a :href="data.image" target=”_blank”>
<v-list-item-title v-text="data.path"></v-list-item-title>
</a>
</v-list-item-content>
<v-list-item-avatar>
<v-img :src="data.image"></v-img>
</v-list-item-avatar>
</v-list-item>
</v-list>
<v-btn v-if="dataList.length > 0" @click="processMosaic">
PROCESS MOSAIC
</v-btn>
:
<script>
import { API, graphqlOperation, Storage } from 'aws-amplify';
import { listSampleAppsyncTables } from "../graphql/queries";
import { onCreateSampleAppsyncTable } from "../graphql/subscriptions";
import axios from 'axios';
const apiUrl = "https://j2byqj306a.execute-api.ap-northeast-1.amazonaws.com/work/process";
const config = {headers: {
'Content-Type': 'application/json'
}}
: ![Screenshot 2020-03-09 at 23.33.49.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/394775/db920be5-a8e0-8f5c-55bf-b5e0590994f5.png)
processMosaic() {
let pointsList = [];
let orgKey = "";
for(let index = 0; index < this.dataList.length; index++) {
let data = this.dataList[index];
if(data.points != "-"){
pointsList.push(data.points);
}else if(data.path.startsWith("processed") == false){
orgKey = data.path;
}
}
this.myGuid = this.getGUIDString(new Date());
let requestData = { guid: this.myGuid, orgKey: orgKey, pointsList: pointsList };
console.log(requestData);
axios
.post(apiUrl, requestData, config)
.then(response => {
let result = response.data
console.log(result)
})
.catch(error => console.log(error))
},
getGUIDString(date){
let random = date.getTime() + Math.floor(100000 * Math.random());
random = Math.random() * random;
random = Math.floor(random).toString(16);
return random;
},
:
PROCESS MOSAICボタンを押したらstatusCode=200が返ってくることを確認しておきましょう。
Lambdaの実装
Lambdaファンクションの実装をしてゆきます。
以前の記事「Lambda + OpenCVで画像処理 (グレー画像作成)」と同じ要領で必要なライブラリをインストールし、lambdafunction.pyを実装し、zip圧縮して、Lambdaにデプロイします。
必要なライブラリのインストール
$ pip install opencv-python -t .
$ pip install gql -t .
実装
# coding: UTF-8
import json
import boto3
import os
import datetime
import numpy as np
import cv2
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client("s3")
BUCKET_INPUT = "sample-vue-project-bucket-work"
BUCKET_OUTPUT = "sample-vue-project-bucket-work"
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
ENDPOINT = "https://xxxxxxxxxxxxxxxxxxxxxxxxxx.appsync-api.ap-northeast-1.amazonaws.com/graphql"
API_KEY = "da2-XXXXXXXXXXXXXXXXXXXXXXXXXX"
_headers = {
"Content-Type": "application/graphql",
"x-api-key": API_KEY,
}
_transport = RequestsHTTPTransport(
headers = _headers,
url = ENDPOINT,
use_json = True,
)
_client = Client(
transport = _transport,
fetch_schema_from_transport = True,
)
def lambda_handler(event, context):
try:
logger.info(event)
guid = event["guid"]
orgKey = event["orgKey"]
pointsList = event["pointsList"]
pathList = orgKey.split("/")
name, ext = os.path.splitext(os.path.basename(orgKey))
fileName = "mosaic.w2or3w.com." + guid + ext
dirPath = os.path.dirname(orgKey)
dirName = os.path.basename(dirPath)
rootDirName = pathList[0]
localTmpPath = u'/tmp/' + os.path.basename(orgKey)
s3.download_file(Bucket = BUCKET_INPUT, Key = orgKey, Filename = localTmpPath)
before = cv2.imread(localTmpPath)
after = cv2.imread(localTmpPath)
after = mosaicFromPointsList(pointsList, before, after)
uploadAppliedImage(after, BUCKET_OUTPUT, os.path.join(rootDirName, "processed", dirName), fileName)
return {
'statusCode': 200,
'body': json.dumps("completed")
}
except Exception as e:
logger.exception(e)
return {
"statusCode": 500,
"body": json.dumps("failed")
}
finally:
if os.path.exists(localTmpPath):
os.remove(localTmpPath)
def mosaicFromPointsList(pointsList, before, after):
try:
height = before.shape[0]
width = before.shape[1]
mosaicImg = mosaic(before, 0.08)
mask = np.tile(np.uint8(0), (height, width, 1))
for points in pointsList:
pointList = points.split('|')
lt = Point(pointList[0])
rt = Point(pointList[1])
rb = Point(pointList[2])
lb = Point(pointList[3])
contours = np.array(
[
[lt.x, lt.y],
[rt.x, rt.y],
[rb.x, rb.y],
[lb.x, lb.y],
]
)
cv2.fillConvexPoly(mask, contours, color=(255, 255, 255))
after = np.where(mask != 0, mosaicImg, after)
return after
except Exception as e:
logger.exception(e)
raise e
def mosaic(src, ratio=0.1):
try:
small = cv2.resize(src, None, fx=ratio, fy=ratio, interpolation=cv2.INTER_NEAREST)
return cv2.resize(small, src.shape[:2][::-1], interpolation=cv2.INTER_NEAREST)
except Exception as e:
logger.exception(e)
raise e
def uploadAppliedImage(img, bucket, dirPath, name):
tmp = "/tmp/" + name
guid = os.path.basename(dirPath)
s3key = dirPath + "/" + name
try:
cv2.imwrite(tmp, img)
s3.upload_file(Filename=tmp, Bucket=bucket, Key=s3key)
apiCreateMosaicTable(guid, s3key)
except Exception as e:
logger.exception(e)
raise e
finally:
if os.path.exists(tmp):
os.remove(tmp)
def apiCreateMosaicTable(guid, s3key):
logger.info("apiCreateMosaicTable : guid={0}, s3key={1}".format(guid, s3key))
time = datetime.datetime.now()
time = time + datetime.timedelta(minutes=30)
epocTime = int(time.timestamp())
try:
query = gql("""
mutation create {{
createSampleAppsyncTable(input:{{
group: \"{0}\"
path: \"{1}\"
points: \"-\"
deleteTime: {2}
}}){{
group path points
}}
}}
""".format(guid, s3key, epocTime))
_client.execute(query)
except Exception as e:
logger.exception(e)
raise e
class Point:
def __init__(self, text):
tmp = text.strip("("")")
tmpList = tmp.split(',')
self.x = int(tmpList[0])
self.y = int(tmpList[1])
モザイク処理の詳細についてはこちらの記事(画像に様々な形のモザイクをかける(Python, OpenCV))も見てみてください。
Lambdaにデプロイしたら、Webアプリに追加したPROCESS MOSAICボタンを押下しましょう。顔にモザイク処理された画像が追加されましたね。
API GatewayにCognito認証を設定する
現状、API Gatewayには認証制限を設けてません。呼び放題です。
フロント側はCognito認証を利用してますので、このCognito認証をAPI Gatewayにも適用させましょう。
AWSコンソール > API Gateway > 作成したAPI > オーソライザー
「+オーソライザーの作成」ボタンを押下。Cognitoユーザープールのオーソライザーを作成します。
POSTメソッドリクエストを編集します。
認可に対して作成したCognitoユーザープールオーソライザーを指定、HTTPリクエストヘッダに「Authorization」を必須にして追加します。
そして、APIをデプロイしましょう。
WebアプリのPROCESS MOSAICボタンを押すと、以下のような例外が帰ってくるようになりました。
Access to XMLHttpRequest at 'https://j2byqj306a.execute-api.ap-northeast-1.amazonaws.com/work/process' from origin 'https://fed9513d88324171b593944f5acca30f.vfs.cloud9.ap-northeast-1.amazonaws.com' has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.
List.vue?9185:114 Error: Network Error
at createError (createError.js?2d83:16)
at XMLHttpRequest.handleError (xhr.js?b50d:81)
CORSの有効化をしない時の例外と同じ感じでイマイチ判断つかないですが、とにかく失敗し、Cognito認証が効いてるらしいことが確認できました。
それでは、Webアプリの実装を更新してゆきましょう。
API GatewayにリクエストするHeaderに、Cognito認証でLocal Storageに保存されているidTokenの値を設定してあげます。
<script>
:
const currSession = await Auth.currentSession();
config.headers["Authorization"] = currSession.getIdToken().getJwtToken();
axios
.post(apiUrl, requestData, config)
.then(response => {
let result = response.data
console.log(result)
})
.catch(error => console.log(error))
:
WebアプリのPROCESS MOSAICボタンを押すと、成功するようになりましたね。
あとがき
久しぶりにこのシリーズの記事を追加しました。
薄々分かっていましたが、やっぱりこの記事は内容が盛りだくさんになってしまいました。
最近AppSync関係の記事にお熱で、本当はAppSyncのデータソースを複数登録するヤツをやりたいと思っているのです。モザイク処理をキックする手段として、この記事でも書いた通りAPI Gatewayを利用しているのですが、AppSyncだけでイケるんじゃね?って思いまして。
それを実際にやってみて記事を書くにあたり、API Gatewayでやってるこの記事が必要だなと思い、超重たい腰を上げて書いた次第です。
後回しにした上にこんな不純な動機で、ごめんね、API Gateway。