はじめに
分散型データベースのMacrometaに関する記事"AkamaiとMacrometaを活用したアクセストークン不正共有の検知と無効化"を参考に、同じ仕組みを別の用途にも応用しようと考えました。今回は、MacrometaとEdgeWorkersを使用して、お問い合わせフォームのアクセス数制限を実現してみました。
実現したい内容
一般的な「お問い合わせフォーム」は誰でも気軽に書き込みができます。しかし、いたずらやスパムの書き込みが多い場合、それが担当者の工数を奪い、迷惑になるだけでなく、大量の通知メールによってメールサーバーがダウンする可能性もあります。
そこで、今回実現したいのは、問い合わせページに対して、例えば同じIPからは1日あたり10件など、一定期間内の書き込み数を制限することです。
設定内容
- Macrometaの設定
MacrometaにLoginすると以下のURLにリダイレクトされ、MacrometaのGlobal Data Network (GDN) のWebコンソールが利用できます。
最初の「treefish-xxxxxxxxxx.paas」に「api-」を付けると外部からCallするAPIのエンドポイントになります。
使用例)
curl -H "Authorization: apikey [API_KEY]" "https://{api-treefish-xxxxxxxxxx.paas}.macrometa.io/_fabric/_system/_api/database"
1.1 2つのDocument Storeの作成(JSON形式でデータを保存するNoSQLデータベース)
- InboundTraffic: クライアント情報(IP、パスなど)を格納、<Stream型>
- blockList: Block対象のIPアドレスを格納
1.1.1 Document Store Streamの作成(Stream形式)
Collection Name: InboundTraffic
Type:Document Store
Collection Streamを有効化 <ー InboundTrafficだけStreamを有効化
Index TTL追加Field:updatedTimestamp
Expireafter: 10
1.1.2 Document Store Dataの作成(Stream有効化は不要)
Name: blockList
Type:Document Store
Index TTL追加
Field:creationDate
Expireafter: 10
1.2 Query Workersの作成(EdgeWorkersからCallするEndPoint)
1.2.1 クライアントリクエストのパラメーターをInboundTrafficへ書き込むためのQueryを作成
Name: updatIP
*役割り:ip, path, updatedTimestamp を 1.1.1で作成した「InboundTraffic」へ記録
Query WorkersをSaveすると外部で利用できるAPI Endpointが自動生成されます。
https://{api-treefish-xxxxxxx.paas.macrometa.io}/_fabric/_system/_api/restql/execute/updatIP
UPSERT{ ip: @ip, path: @path}
INSERT { ip: @ip, path: @path,
updatedTimestamp: ROUND(DATE_NOW() / 1000) }
UPDATE { ip: @ip, path: @path,
updatedTimestamp: ROUND(DATE_NOW() / 1000) }
IN InboundTraffic OPTIONS { exclusive: true }
1.2.2 クライアントIPが「blockList」に登録されているかどうかを照会するQueryを作成
Name: checkBlocked
*役割り:クライアントIPがblockListに入っているかを確認
FOR doc IN blockList
FILTER doc._key == TO_STRING(@ip)
RETURN doc.value
Query WorkersをSaveすると外部で利用できるAPI Endpointが自動生成されます。
https://{api-treefish-xxxxxxx.paas.macrometa.io}/_fabric/_system/_api/restql/execute/checkBlocked
1.3 Stream Workersの作成
設定したスライディングウィンドウ内で一定数以上リクエストがあった場合、BlockListへそのIPアドレスを記録する。
@App:name("RateLimit")
@App:qlVersion("2")
CREATE SOURCE InboundTraffic WITH (source.type = 'database', collection='InboundTraffic', map.type='json') (ip string);
CREATE STORE BlockListStore WITH (type='database', collection='blockList', map.type='json') (_key string, value long, creationDate string);
INSERT INTO BlockListStore
SELECT ip as _key, Count(ip) as value, time:currentTimestamp() as creationDate
FROM InboundTraffic WINDOW sliding_time(600 sec)
GROUP BY ip
HAVING value >= 1;
Saveした後、Publishをします。
Publish状態になるとInboundTrafficのデータを一定時間「Sliding_time(10Sec)」カウントし「Count(ip)」、1回以上アクセスしたIP(value>=1)をblockListに記録します。
Publish/UnpublishはStream Workersタブからも制御できます。
1.4 API Keyの取得(外部からMacrometaのAPI EndpointをCallする際に利用)
2 アカマイの設定
2.1 CDN Propertyの設定
EdgeWorkersで参照するための変数追加
リクエストヘッダーに「Macrometa-Origin」が付いている場合、MacrometaのQueryWorkersのAPI EndPointをオリジンに設定する
client.js
import { httpRequest } from 'http-request';
import {
C8_URL,
FABRIC,
MACROMETA_ORIGIN_NAME
} from './config.js';
import { logger } from 'log';
export const client = {
executeQueryWorker: (queryWorkerName, bindVars, apiKey) => {
return httpRequest(
`${C8_URL}/_fabric/${FABRIC}/_api/restql/execute/${queryWorkerName}`,
{
method: 'POST',
headers: {
authorization: apiKey,
'Macrometa-Origin': MACROMETA_ORIGIN_NAME,
},
body: JSON.stringify({
queryWorkerName,
bindVars,
batchSize: 500,
}),
},
);
},
getNextBatch: (cursorId, apiKey) => {
return httpRequest(
`${C8_URL}/_fabric/${FABRIC}/_api/restql/fetch/${cursorId}`,
{
method: 'PUT',
headers: {
authorization: apiKey,
'Macrometa-Origin': MACROMETA_ORIGIN_NAME,
},
body: JSON.stringify({}),
},
);
},
};
config.js
export const C8_URL = "https://{www.example.com}"; //配信URLを記入
export const FABRIC = "_system";
export const MACROMETA_ORIGIN_NAME = "treefish-xxxxxxxxx.paas"; //API Endpoint
export const API_KEY = "{xxxxxxxxxxxxxxxxxxxxxxxxxxxxx}"; //API Key
main.js
import URLSearchParams from 'url-search-params';
import { logger } from 'log';
import { executeRestQL, executeRestQLawait } from './service.js';
import { API_KEY } from './config.js';
export async function onClientResponse(request, response) {
if (response.status == 200 || response.status == 206) {
const params = new URLSearchParams(request.query);
const clientip = request.getVariable('PMUSER_CLIENT_IP');
const rpath = request.path
let result = {};
try {
//MacrometaのQuery Workers(checkBlocked)をCallし、clientIPがBlock対象かを確認し、Block対象の場合403を返す
const validity = await executeRestQLawait(
'checkBlocked',
'apikey ' + API_KEY,
{
"ip": clientip
}
);
if (validity.length > 0) {
request.respondWith(403, { 'Content-Type': ['application/json;charset=utf-8'] }, 'ただいまシステムメンテナンス中のため、弊社ホームページのお問い合わせなど一部サービスがご利用頂けません。');
return;
}
// MacrometaのQuery Workers(updateIP)をCallし、ClientIPとパスをInboundTrafficに記録する
executeRestQL(
'updateIP',
'apikey ' + API_KEY,
{
"ip": clientip,
"path" : rpath
}
);
logger.log(
'Success'
);
} catch (error) {
logger.log(
'Error occurred while executing edgeWorker',
JSON.stringify(error),
);
result.error = true;
result.errorMessage =
error.errorMessage ||
error.message ||
JSON.stringify(error) ||
'Something went wrong';
}
}
}
service.js
import { client } from './client.js';
import { logger } from 'log';
export const fetchNextBatch = async (
cursorId,
authorizationToken,
previousResult = [],
) => {
try {
const fetchPromise = client.getNextBatch(
cursorId,
authorizationToken,
);
const { result, hasMore, id } = fetchPromise.json();
previousResult = previousResult.concat(result);
if (!hasMore) {
return previousResult;
}
return fetchNextBatch(id, authorizationToken, previousResult);
} catch (error) {
logger.log(`Failed to fetch ${cursorId}`, JSON.stringify(error));
throw error;
}
};
export const executeRestQL = async (name, authorizationToken, params = {}) => {
let restQLResponse = [];
try {
const response = client.executeQueryWorker(
name,
params,
authorizationToken,
);
const { result, id, hasMore } = response.json();
if (!hasMore) {
return result;
}
restQLResponse = fetchNextBatch(id, authorizationToken, result);
return restQLResponse;
} catch (error) {
logger.log(`Failed to fetch ${name}`, JSON.stringify(error));
throw error;
}
};
export const executeRestQLawait = async (name, authorizationToken, params = {}) => {
let restQLResponse = [];
try {
const response = await client.executeQueryWorker(
name,
params,
authorizationToken,
);
const { result, id, hasMore } = await response.json();
if (!hasMore) {
return result;
}
restQLResponse = await fetchNextBatch(id, authorizationToken, result);
return restQLResponse;
} catch (error) {
logger.log(`Failed to fetch ${name}`, JSON.stringify(error));
throw error;
}
};
今回の内容では、1つのIP から、10秒間に1つのリクエストのみ許可するように設定しましたが、Document StoreのIndex TTLやStream Workers(RateLimit)のSLIDING_TIME及びStream Workers(RateLimit)のValueを変更することで、時間と許可されるクライアントの数を調整できます。
例)5分に2回のリクエストのみ許可したい場合、
Collection(InboundTraffic、BlockList)のIndex TTLのexpireAfterを600に設定し、
Stream Workers(RalteLimit)のsliding_time(600 sec)」、value >= 2に設定します。
おわりに
今回、EdgeWorkersとMacrometaのGlobal Data Network(GDN)を連携させ、リクエスト数を制限する構成を実現しました。動作およびパフォーマンスのテストはまだ行っていないので、MacrometaのGDNプラットフォームを理解するためのデモ環境として活用したいと考えています。