PM2 であるプロセスが別のプロセスの挙動に依存している際に、依存するプロセスの特定のイベントを待ってから次のプロセスを動かすようにしたくなった。
やり方としてはそもそものスクリプトを全て 1 つにまとめるという方法もあるだろうが、自分の要件には見合わなかったのでいい感じに動くようなスクリプトを組んだ。
PM2 のデフォルトの機能ではプロセス間の依存関係などはカバーできなさそうなので、PM2 API を使用していい感じに作った。
前準備 - pm2 関連メソッドの Promise 化
Node の util.promisify
を使えばサクサク終わるかと思ったがそれだとエラーに見舞われたので自前で書いておく。
import path from 'path';
import pm2Base from 'pm2';
const pm2 = {
connect: () =>
new Promise<void>((resolve, reject) => {
pm2Base.connect(err => {
if (err) {
reject(err);
}
resolve();
});
}),
disconnect: () =>
new Promise<void>((resolve, reject) => {
try {
pm2Base.disconnect();
resolve();
} catch (e) {
reject(e);
}
}),
start: (config: pm2Base.StartOptions) =>
new Promise<pm2Base.Proc>((resolve, reject) =>
pm2Base.start(config, (err, proc) => {
if (err) reject(err);
resolve(proc);
})
),
describe: (jobName: string) =>
new Promise<pm2Base.ProcessDescription | undefined>((resolve, reject) =>
pm2Base.describe(jobName, (err, descs) => {
if (err) reject(err);
resolve(descs[0]);
})
),
launchBus: () =>
new Promise<{ on: (eventName: string, func: (packet: unknown) => unknown) => void }>((resolve, reject) => {
pm2Base.launchBus((err, bus) => {
if (err) reject(err);
resolve(bus);
});
}),
};
自分が使ったのはこれらだけだったが、restart
とか欲しければ必要に応じて同じように定義すれば良い。
PM2 のプロセス同士でデータをやり取りする
プロセスの実行順序だけが問題の場合であれば順番に pm2.start
していくだけで良いと思うが、今回はあるプロセスが生成した情報を次のプロセスで使いたいみたいなニーズがあったため、pm2.launchBus
を使ってプロセス間通信によって情報をやり取りできるようにした。
以下は実装例
-
initialScript
がデータを生成する -
secondScript
はinitialScript
が生成したデータを引数に必要とするスクリプトである
(本筋と関係ないが zod での型チェックも入れている)
import z from 'zod';
const cwd = process.cwd();
const initialScriptPath = path.join(cwd, './path/to/initialScript.js');
const secondScriptPath = path.join(cwd, './path/to/secondScript.js');
const run = async () => {
console.log(`Start connecting PM2`);
await pm2.connect();
console.log(`PM2 connected!`);
try {
const processBus = await pm2.launchBus();
const allJobPromises = new Promise<void>((resolve, reject) => {
processBus.on('pm2Master:fireSecondScript', async packet => {
console.log(`processBus on fireSecondScript`);
// しれっと zod で型チェックをしている
const packetParsedResult = z
.object({
data: z.object({
testData: z.string(),
}),
})
.safeParse(packet);
if (!packetParsedResult.success) {
console.log(`error: ${packaet}`);
reject();
return;
}
const testData = packetParsedResult.data.data.testData;
console.log(`testData: ${testData}`);
await pm2
.start({
name: 'secondScript',
script: secondScriptPath,
args: `${testData}`,
autorestart: false,
time: true,
})
.catch(reject);
resolve();
});
});
await pm2.start({
name: 'initialScript',
script: initialScriptPath,
autorestart: false,
time: true,
});
console.log(`Started initialScript job`);
await Promise.race([
allJobPromises,
new Promise<void>((_, reject) => {
const timeout = setInterval(() => {
void pm2.describe('initialScript').then(d => {
if (d?.pm2_env?.status === 'stopped') {
console.log(`initialScript job is stopped... so end up whole job itself`);
clearInterval(timeout);
reject(new Error(`initialScript job is stopped`));
}
});
}, 1000);
}),
]);
console.log(`Successfully finish job`);
} finally {
console.log(`Finally, disconnecting PM2`);
await pm2.disconnect();
}
};
void run();
initialScript から、この親プロセスにデータを送るスクリプトはこんな感じ
// initialScript.ts
const run = () => {
console.log(`init start`);
process.send?.({
type: 'pm2Master:fireSecondScript',
data: {
testData: 'helloWorld',
},
});
console.log(`init end`);
};
void run();
initialScript
が異常終了した場合には親プロセスも終了するようにする
pm2.launchBus
で initialScript
からメッセージが送られてくるのを待っているため、もし initialScript
がメッセージを送信しないまま終了してしまった場合には油断していると永遠に終わらないプロセスとなってしまう。
そのため、メッセージが受信されて正しく secondScript
が発火する以外にも、initialScript
のステータスが stopped
になった場合にはこの親スクリプトもきちんと終了するように仕込んでおく必要がある。(今回の自分の要件では autorestart: false
なので stopped
を見ているが、そうでなければいい感じに別プロセスのエラー情報をキャッチして再度実行し直すような機構を作る必要があるだろう)
await Promise.race([
allJobPromises,
new Promise<void>((_, reject) => {
const timeout = setInterval(() => {
void pm2.describe('initialScript').then(d => {
if (d?.pm2_env?.status === 'stopped') {
console.log(`initialScript job is stopped... so end up whole job itself`);
clearInterval(timeout);
reject(new Error(`initialScript job is stopped`));
}
});
}, 1000);
}),
]);