SlackのAPIは、時間当たりの呼び出し回数制限がある
この間、社内のSlackアカウントを一括更新するスクリプトに一つ当たった問題が、このリストです。
例えば、プロフィールの更新は Tier 3
なので、1分間に(最小で)50人分しか一括更新ができません。
実際には、部署ごとにMAX50人程度になるように小分けにして実行していったのですが、これをプログラム的に解決するにはどのようにすれば良いでしょう?
Stream を制するものは、 Node.js を制す
最近、Node.jsを使うことが多いです。
- jsは書いたことある人が多いので、人に見せやすい
- ライブラリが豊富でいろんなものがサクッと作れる
が主な理由です。逆に言うと、Node.jsらしい書き方、というのは特に考えてきませんでした。
ただ、Node.jsには Stream を制するものは、 Node.js を制す という格言があるそうです。
参考: highWaterMarkから探るNode.jsのStreamの仕組み
Node.js 最大の特徴は何と言っても非同期 I/O です。
そしてその非同期 I/O の実装こそが stream モジュールなのです。
Slackの呼び出し制限の問題がまさにこの Stream
で解決できるので、手を動かしながら「で、非同期 I/Oってなにが美味しいん?」というのを学んでいこうとしています。
二つの蛇口とバケツ
Streamの概念的に理解するのは、上記記事のこの絵がわかりやすいです。(highWaterMarkのことは気にせずに)
右側=下流の蛇口=「Slack APIの呼び出し」
今回のSlackの例を当てはめるとこうなります。
つまり、「この蛇口から水を出してもいいけど、1分間に50滴を超えないようにしてね」という状態です。
左側=上流の蛇口=社員リストのCSV
ここまで触れてきませんでしたが、Slackプロフィールの元となるデータは、CSV形式で用意してそれを読み込むようにしています。
"email", "display_name"
"kota_futami@mediado.jp","Futami Kota"
下流側に制限があったように、上流側にも蛇口のゆるさを調節したい場面があります。
例えば、CSVを読み込むロジックは、実際にはこう書きました
const csvRaw = fs.readFileSync(csvPath);
この一行でなにが起きるかというと、CSVファイルの中身がメモリ上に全て展開されます。
上記の例なら、二行しかありませんが、トヨタなら30万人社員がいるわけで、カラムも多ければ、下手したら100MBとかいくかもしれません。
ということは、このプログラムの実行にそれだけのメモリが必要になってしまいます。
そこから「じゃあ、100行ずつ読み込むようにしたい」という需要が生まれてきます。
実装例
Slackで実際に試す前に簡単なサンプルで理解していきました。
- 0から9までの数字を流す 蛇口 を作る
- 数字が「1秒間に3個しか流れないようにする」
- これを繋げて
- 0, 1, 2 が最初に出力される
- 1秒待って3,4,5と出力される
普通にやると、
待ちがなく、一気に9まで出力される
new MyReadable(10) // 0から9までの数字を順番に流す蛇口
.pipe(new MyWritable()) // 流れてきたものをコンソールログに出力する蛇口
0
1
2
3
4
5
6
7
8
9
流量を制限すると、
new MyReadable(10) // 0から9までの数字を順番に流す蛇口
.pipe(new Delayer(100, 3)) // 100msに3回だけ数字が流れるようにするフィルター的な役目
.pipe(new MyWritable()) // 流れてきたものをコンソールログに出力する蛇口
3個出力したら、100ms弱経つまで、出力を中断する
0
1
2
delay 99 ms
3
4
5
delay 99 ms
6
7
8
delay 100 ms
9
Delayerの実装
以下の「全実装」のこの部分で「1分以内に実行された回数」をチェックし、制限を超えていたら setTimeout
で指定されたミリ秒待つようになっています。
if (dealyMillsecs > 0 && unoutdatedCount >= this.maxCount) {
全実装
const { Writable, Readable, Transform } = require('stream');
class MyWritable extends Writable {
constructor(opt) {
super(Object.assign({ objectMode : true }, opt));
}
_write(chunk, encoding, done) {
console.log(chunk);
done();
}
}
class MyReadable extends Readable {
constructor(maxCount, opt) {
super(Object.assign({ objectMode : true }, opt));
this.count = 0;
this.maxCount = maxCount;
}
_read(n) {
if (this.count >= this.maxCount) {
this.push(null);
return;
}
this.push(this.count.toString());
this.count++;
}
}
class Delayer extends Transform {
constructor(durationMillsecs, maxCount) {
super({ objectMode : true });
this.durationMillsecs = durationMillsecs;
this.maxCount = maxCount;
this.transformTimestamps = [];
}
_getOldestTimestamp() {
return this.transformTimestamps.length > 0 ? this.transformTimestamps[0] : null;
}
_transform(chunk, encoding, callback) {
const now = Date.now();
const tsArr = this.transformTimestamps;
let oldest = this._getOldestTimestamp();
while (oldest != null && now - oldest > this.durationMillsecs) {
tsArr.shift();
oldest = this._getOldestTimestamp();
}
const dealyMillsecs = oldest != null ? this.durationMillsecs - (now - oldest) + 1 : 0;
const unoutdatedCount = this.transformTimestamps.length;
if (dealyMillsecs > 0 && unoutdatedCount >= this.maxCount) {
console.debug(`delay ${dealyMillsecs} ms`);
setTimeout(() => {
this.push(chunk);
tsArr.push(Date.now());
callback();
}, dealyMillsecs);
} else {
this.push(chunk);
tsArr.push(now);
callback();
}
}
}
(() => {
new MyReadable(10) // 0から9までの数字を順番に流す蛇口
.pipe(new Delayer(100, 3)) // 100msに3回だけ数字が流れるようにするフィルター的な役目
.pipe(new MyWritable()) // 流れてきたものをコンソールログに出力する蛇口
})()