いつの間にやら12月も終わりに近づきました。一年が長いやら短いやらですが、あんまりインプットもアウトプットもない年だった年だったように思います。
そんなことは置いておいて、唐突ではありますが、AWS Batchが発表されたのに合わせて(ちと遅いですが)、今回は Azure Batch を使ってみようかと思います。
Azure Batch
2016年の Re:invent で、 AWS Batch というサービスがリリースされましたが 、このサービスと同じような、マネージド型のバッチコンピューティングサービスです。
話は逸れますが、日本のIT業界にいると、 バッチ=夜間処理 とか バッチ=シーケンシャル とか バッチ=JP1 という印象を抱きがちです。(個人的な感想です)しかし、AWS BatchもAzure Batchも、時間のかかる処理を分散並列で処理する、分散コンピューティングを管理するサービスです。この辺、誤解されそうですね。
Azure Batchの特徴は、公式ページを見ると書いていますが、以下のようになっています。
- コンピューティングノードとしてWindows/Linuxを選択可能
- 簡単なスケジュールや順序制御が可能
- JP1とかDataSpiderとかそういったものと同じ、というわけではなく、あくまで分散処理の1タスクにおける処理の順序。
- スケーリングとかデータのダウンロードとかまでを管理
- スケーリングでは仮想マシンのプールとかも必要なので、ここをちゃんとやらないと支払金額 UP は必至。
AWS Batchと同様、プールされた仮想マシン内で動作するような形式となっています。自動でスケーリングするような形というのも同様ですが、AWS Batchでは Spot Instance を利用するとか、そういった設定も可能です。ただし、実行形式が異なり、AWS Batchでは基本的にElastic Container Serviceをバックエンドとして利用しているので、Dockerコンテナを利用していますが、Azure Batchでは、任意のプログラムを実行させることが出来ます。
最近、こういったバッチコンピューティングのためのサービスが各クラウドプロバイダから出ていますが、どっちかというとエンタープライズからの要望でできているサービス、という印象です。それだけエンタープライズでもクラウドが広く利用されてきているということでしょう。
Azure Batchで動かすプログラム
Azure Batchでは、以下のようなプログラムを実行できます。
- Cloud Service (.NET/Windows限定)
- コマンドライン (Windows/Linux)
Azureの基本として、ここでも .NET が推奨されています。コマンドライン経由であれば、好きなプログラムを実行することが出来ます。当然終了ステータスの管理とか事前準備とかは自分でやる必要がありますので、その辺はトレードオフです。
Azure Batchのサンプルとしては、Python/Node.jsとAzure Batchのライブラリを利用したサンプルが用意されています。このままPythonでやってもいいんですが、今回はワンバイナリという特徴を活かしやすいと思うので、Golangで簡単に作ってやってみたいと思います。
Azure Batchの構造
Azure Batchには、以下のような構造があります。
- Pool
- Node
- Job
- Task
上から下の順に、階層構造になっています。NodeはあくまでTaskを実行するための仮想マシンなので、正確にはPool/Job/Taskの構造です。Taskには実際のタスクが実行する処理以外に、開始前後の処理を追加することが出来ます。具体的な利用例としては、処理後に結果をアップロードするとかですね。Nodeには起動時に実行するためのTaskを指定することができ、これを利用して共通の初期処理を行ったり、アプリケーションインストールを行ったり出来ます。
並列で実行する場合は、基本的にはNodeを複数立てる、つまり1Node:1Taskという前提となります。(実際には、一つのNodeの中で複数のTaskが動き得ます)あるPoolに含まれるNodeは一種類だけなので、その辺はワークロード次第というところでしょうか。
Node.jsで作るBatchクライアント
さて、チュートリアルでは、ローカルから実行するための仕組みとして、PythonでAzure Batchを操作するためのクライアントを作成しています。おんなじ物を使ってもいいんですが、ここではNode.js版のSDKを利用して、同様の動きをするBatchクライアントを作ってみましょう。
まずはSDK for Nodejsをインストールします。といっても普通にnpmでインストールするだけです。
$ npm install --save azure
チュートリアルでは、Batchで利用するアプリケーションやinput/output用のコンテナやBatchJob/Poolなどを全て作成しています。その流れを崩す理由もないので、同じ感じで行ってみましょう。全体がちょっと長いですが、貼り付けてみます。Azure SDK for Nodeがコールバック関数を多用する形になっていたのと、async/awaitを使ってみたかったので、全体がasync/await構成になっています。Node.js v7.2.0でフラグ付きで動作します。
const azure = require('azure');
const path = require('path');
const moment = require('moment');
/**
* Create blob containers
*/
async function createContainers(blobClient, containers = []) {
let promises = containers.map((container) => new Promise((resolve, reject) => {
const option = {publicAccessLevel: 'blob'};
blobClient.createContainerIfNotExists(container, option, (err, result) => {
if (err) {
console.error(err);
reject(err);
} else {
resolve(result);
}
});
}));
await Promise.all(promises);
}
/**
* Upload a local file to specified container.
*/
async function uploadResourceFile(blobClient, container, filePath) {
const blobName = path.basename(filePath);
const absolutePath = path.resolve(filePath);
const promise = new Promise((resolve, reject) => {
blobClient.createBlockBlobFromLocalFile(container, blobName, filePath, (err) => {
if (err) {
console.error(err);
reject(err);
} else {
resolve(filePath);
}
});
});
await promise;
const startDate = new Date();
const expiryDate = new Date(startDate);
expiryDate.setMinutes(startDate.getMinutes() + 100);
startDate.setMinutes(startDate.getMinutes() - 100);
const sharedAccessPolicy = {
AccessPolicy: {
Permissions: azure.Constants.BlobConstants.SharedAccessPermissions.READ,
Start: startDate,
Expiry: expiryDate
},
};
const token = blobClient.generateSharedAccessSignature(container, blobName, sharedAccessPolicy);
const sasUrl = blobClient.getUrl(container, blobName, token);
return {
blobSource: sasUrl,
filePath: blobName
};
}
async function createPool(batchClient, poolId, resourceFiles, publisher, offer, sku) {
console.log('Creating pool ' + poolId);
const promise = new Promise((resolve, reject) => {
const commandLine = [
'cp -r $AZ_BATCH_TASK_WORKING_DIR/* $AZ_BATCH_NODE_SHARED_DIR',
].join(' && ');
batchClient.pool.add({
id: poolId,
displayName: poolId,
vmSize: 'STANDARD_A1',
virtualMachineConfiguration: {
imageReference: {
publisher,
offer,
sku,
},
nodeAgentSKUId: 'batch.node.ubuntu 16.04',
},
targetDedicated: 3,
startTask: {
waitForSuccess: true,
runElevated: true,
commandLine: `/bin/sh -c "${commandLine}"`,
resourceFiles,
}
}, (err, result) => {
if (err) {
console.error(err);
reject(err);
} else {
resolve(poolId);
}
});
});
await promise;
}
async function createJob(batchClient, poolId, jobId) {
console.log('creating job');
const promise = new Promise((resolve, reject) => {
batchClient.job.add({
id: jobId,
poolInfo: {poolId},
}, (err, result) => {
if (err) {
console.error(err);
reject(err);
} else {
resolve(jobId);
}
});
});
await promise;
}
async function addTasks(batchClient, jobId, inputFiles = []) {
console.log('Adding tasks');
const promise = new Promise((resolve, reject) => {
const tasks = inputFiles.map((file, idx) => {
const commandLine = [
'chmod +x word-count',
`word-count $AZ_BATCH_NODE_SHARED_DIR/${file.filePath}`
].join(' && ');
return {id: `task${idx}`, commandLine: `bin/sh -c "${commandLine}"`, resourceFiles: [file]};
});
batchClient.task.addCollection(jobId, tasks, (err, result) => {
if (err) {
console.error(err);
reject(err);
} else {
resolve(result);
}
});
});
await promise;
}
async function waitForTasksToComplete(batchClient, jobId, timeout) {
console.log('Waiting for tasks to complete...');
const expire = moment();
expire.add(timeout, 's');
async function loop(batchClient, jobId) {
return new Promise((resolve, reject) => {
batchClient.task.list(jobId, {
taskListOptions:{
maxResults: 10
},
}, (err, result) => {
if (err) {
console.error(err);
reject(err);
} else {
if (result.filter(({state}) => state !== 'completed').length === 0) {
resolve('completed');
} else if (moment().isBefore(expire)) {
resolve('wait');
} else {
reject('timeout');
}
}
});
});
}
let completed = false;
let sleep = async function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
while (!completed) {
let result = await loop(batchClient, jobId );
switch (result) {
case 'completed':
completed = true;
break;
case 'wait':
await sleep(1000);
break;
default:
throw new Error(result);
}
}
}
async function deleteContainer(blobClient, containerName) {
const promise = new Promise((resolve, reject) => {
blobClient.deleteContainer(containerName, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
await promise;
}
async function deleteJob(batchClient, jobId) {
const promise = new Promise((resolve, reject) => {
batchClient.job.deleteMethod(jobId, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
await promise;
}
async function deletePool(batchClient, poolId) {
const promise = new Promise((resolve, reject) => {
batchClient.pool.deleteMethod(poolId, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
await promise;
}
(async function() {
const blobClient = azure.createBlobService();
const credential = new azure.BatchServiceClient.SharedKeyCredentials(process.env['AZURE_BATCH_ACCOUNT'],
process.env['AZURE_BATCH_ACCOUNT_KEY']);
const batchClient = new azure.BatchServiceClient.ServiceClient(credential, process.env['AZURE_BATCH_ACCOUNT_URL']);
// 利用するContainer類を作成する
await createContainers(blobClient, ['application', 'input']);
// 各ファイルをアップロードする
const applicationFiles = [await uploadResourceFile(blobClient, 'application', './word-count')];
const inputFiles = [await uploadResourceFile(blobClient, 'input', './input1.txt'),
await uploadResourceFile(blobClient, 'input', './input1.txt'),
await uploadResourceFile(blobClient, 'input', './input1.txt'),
];
const poolId = 'pool';
const jobId = 'job';
await createPool(batchClient, poolId, applicationFiles, 'Canonical', 'UbuntuServer', '16.04.0-LTS');
await createJob(batchClient, poolId, jobId);
await addTasks(batchClient, jobId, inputFiles);
await waitForTasksToComplete(batchClient, jobId, 1000);
console.log('Deleting resources');
await deleteContainer(blobClient, 'application');
await deleteContainer(blobClient, 'input');
await deleteJob(batchClient, jobId);
await deletePool(batchClient, poolId);
console.log('finish!');
})();
これを実行するために、次のような環境変数を設定する必要があります。
export AZURE_BATCH_ACCOUNT=""
export AZURE_BATCH_ACCOUNT_KEY=""
export AZURE_BATCH_ACCOUNT_URL=""
export AZURE_STORAGE_ACCOUNT=""
export AZURE_STORAGE_ACCESS_KEY=""
これを実行すると、Azure Batch公式のチュートリアルと同様に、
- VM Poolの作成
- Jobの作成
- Taskの実行
- 後始末
という流れになります。
Taskで実行するプログラム
さて、今回実行するタスクは、バッチコンピューティングとかでは定番となっているWord countです。あんまりGolangは使ったことがないので、効率的な実装になってませんが、今回は実験なのでまぁ問題ないでしょう。
package main
import (
"io/ioutil"
"flag"
"os"
"strings"
"fmt"
)
func main() {
flag.Parse()
var fp string = flag.Arg(0)
if fp == "" {
os.Exit(1)
}
if data, err := ioutil.ReadFile(fp); err != nil {
os.Exit(1)
} else {
words := strings.Fields(string(data))
wordCount := map[string]int{}
for _, v := range(words) {
wordCount[v] += 1
}
for k, v := range(wordCount) {
fmt.Printf("word '%s' displayed %d\n", k, v)
}
}
}
これをビルドした実行ファイルは、各Nodeの開始Taskで共通領域に置かれるようになっています。他に依存するものがないワンバイナリなので、インストールも置くだけです。
このプログラムを実行した結果は標準出力に出ますが、今回結果のアップロードまではやってないので、タスクが削除された時点で無かったことになります。GolangにはまだAzure SDKがないので、実際にアップロードする場合は別のプログラムで行うか、Azure CLIを利用する必要があるでしょう。
終わりに
今回はチュートリアルをなぞる形で、Azure Batchを試してみました。同じだと面白くないのでJavaScriptとGolangで作ってみましたが、一番時間がかかったのがasync/awaitの使い方に慣れる、という点でした。Azure SDKは、asm/armがまだまだ混在していますが、十分利用できるものになっているんじゃないかと。まだまだドキュメントの充実度とか言語の広がりではAWS SDKに敵いませんが。
チュートリアルではやってませんでしたが、Jobのスケジューリングとか、リトライ設定とか、優先順位なども設定できます。俗に言う業務システムでは、随時入ってくる内容を順番に実行していく、ということをよくやりますが、そういった用途でも利用できるように感じます。というよりもそういうものユースケースに入っているようですね。
Azure FunctionやAWS Lambdaといったサーバーレスアーキテクチャが盛り上がっていますが、Azure BatchやAWS Batchは、それらを補完するような形になって行くんじゃないでしょうか。