Edited at

Highlandで流れるようにスクレーピングする話

More than 1 year has passed since last update.

JavaScriptでストリームを扱うときの定番「Highland」ですが、日本語の記事を見かけないので、使い方とサンプルをメモしておきます。何かの際の参考まで。


Highlandとは?

公式サイトに書かれているように(↓)、ストリームを扱う際のツールベルトです。


The high-level streams library for Node.js and the browser.


Underscoreを意識していて、APIなども似ています。2014年のはじめに最初のリリース、同年中にv2.0、2017年1月時点でv3.0beta3という感じの開発状況。GitHub上のスターは2.6Kくらい。(RxJSと比較するとあれですが、この手のライブラリとしては人気)


あらゆるものをストリームに

Highlandの特徴は、なんでもかんでも「ストリーム」にして統一的に扱えるところです。


  • 配列

  • Nodeのストリーム

  • EventEmitter

  • コールバック

  • generator

  • プロミス (あるいは、async関数)

といった、一通りのオブジェクトがHighland仕様の「ストリーム」になります。

highland([1, 2, 3, 4]) // 配列から

highland(request('https://someapi.dom/get/something')) // Nodeのストリームから

JavaScriptの通常の配列では、次のようにmap()や、filter()というメソッドが使えました。Highlandでは同様のことがストリームに対してできます。

// JavaScriptの配列

[1, 2, 3, 4].map(x => x * x) // [1, 4, 9, 16]

// Highlandのストリーム
highland([1, 2, 3, 4]).map(x => x * x).toArray() // [1, 4, 9, 16]

上記のように配列を扱うだけだとまどろっこしいだけですが、request()の戻り値のストリームに対してmap()filter()が使えるというのは非常に便利です。


  • map

  • filter

  • reduce

  • batch

  • take

  • sort...

など40ほどの基本メソッドのほか、20ほどのメタな操作のメソッド(並列化など)が用意されています。メソッド数の多さに最初圧倒されますが、必要になるまではほっとけばOK。


スクレーピングのサンプル

この手のライブラリは、ライブラリ自体の説明を聞いてもなかなかイメージがつかないものです。ここでは実際の使用例を見ながら説明してみようと思います。なお、細かいドキュメントは公式サイトを見た方が早いので、そちらをどうぞ。

例として、先週リリースされたばかりの書誌情報API「openBD」から、データを取得するためのスクリプトを考えます。

上記の一つ目のAPIから、書籍の全ID(ISBN)が取れるので、あとは一つずつ2つ目のAPIをたたけば、全情報が取得できることになります。ただ、一つずつ取るのは非効率なので、1万件ごと取得してJSONデータを解析しつつ、ファイルに保存するという方針で進めます。(スクレーピングというよりは、単に「ダウンロードして整形する話」ですね)

ステップを書き出すと次のようになりますが、Highlandならひとつなぎのスクリプトで書けます。


  1. APIに全ID(ISBN)の問い合わせ

  2. jsonをパースしてISBNのリストに

  3. 1万件ごとにまとめる

  4. APIコールを1分間に4回までに制限

  5. requestに渡すオプションの作成

  6. APIに書誌情報の問い合わせ

  7. jsonをパースして詳細をオブジェクトに

  8. 書誌情報を整形

  9. json文字列に変換

  10. ファイルに保存する

これが、次のようになります。(該当箇所のみ)

// 1. APIに全ID(ISBN)の問い合わせ

highland(request(`${apiRoot}/coverage`))
// 2. jsonをパースしてISBNのリストに
.through(parse('*'))
// 3. 1万件ごとにまとめる
.batch(10000)
// 4. APIコールを1分間に4回までに制限
.ratelimit(4, 60 * 1000)
// 5. requestに渡すオプションの作成
.map(isbns => ({method: 'POST', url: `${apiRoot}/get`, form: {isbn: isbns.join(',')}}))
// 6. APIに書誌情報の問い合わせ
.flatMap(opts => highland(request(opts)))
// 7. jsonをパースして詳細をオブジェクトに
.through(parse('*'))
// 8. 書誌情報を整形
.map(book => Object.assign({}, book.summary, {
description: find(book, 'onix.CollateralDetail.TextContent[0].Text', ''),
price: find(book, 'onix.ProductSupply.SupplyDetail.Price[0].PriceAmount', '')
}))
// 9. json文字列に変換
.through(stringify())
// 10. ファイルに保存する
.pipe(createWriteStream(dist))

以下、ステップごとに簡単に説明します。


1. APIに全ID(ISBN)の問い合わせ

requestはストリームを返します。これをhighland()で包んで、highland仕様の「ストリーム」に変えます。

highland(request(`${apiRoot}/coverage`))


2. jsonをパースしてISBNのリストに

JSONStreamparse()を使い、jsonストリームからオブジェクトを取り出します。parse()はHighland用に作られているわけではないので、.through()というメソッドを通します。このメソッドは入出力のあるストリーム(要はduplex)であれば引数にとることができ、通常のNodeであれば.pipe()したのと同じことになります。

.through(parse('*'))

メモ: duplexのほか、highlandストリームを返す関数でもOK。


3. 1万件ごとにまとめる

.batch()メソッドで、ストリームに流れてきたオブジェクト(ISBN)を10000個ずつグルーピングします。

.batch(10000)


4. APIコールを1分間に4回までに制限

あんまり一気にAPIを叩くと迷惑なので、ある程度自粛します。

.ratelimit(4, 60 * 1000)


5. requestに渡すオプションの作成

GETで渡すには長いので、POSTメソッドを指定します。

.map(isbns => ({

method: 'POST',
url: `${apiRoot}/get`,
form: {isbn: isbns.join(',')}
}))


6. APIに書誌情報の問い合わせ

最初と同じ形が出て来たことに注意してください。ここで、requestはreadableなので、.through()は使えません。.through()に渡せるのはduplexだけです。ですが、次のようにすれば、前のストリームから渡されたオプションを元にrequestを呼び出すことが可能です。(すごい)

.flatMap(opts => highland(request(opts)))

メモ: 少し(未来の自分のために)補足をすると、.flatMap()は「ストリームのストリーム」を「シリアライズされたストリーム」に変形します。なので、上記の書き方で、ストリームがネストされることなく、次の処理に繋げられるのです。


7. jsonをパースして詳細をオブジェクトに

先ほどと同じ。

.through(parse('*'))


8. 書誌情報を整形

API特有の話なので略。

.map(book => Object.assign({}, book.summary, {

description: find(book, 'onix.CollateralDetail.TextContent[0].Text', ''),
price: find(book, 'onix.ProductSupply.SupplyDetail.Price[0].PriceAmount', '')
}))


9. json文字列に変換

さっきのparse()関数の逆で、文字列に変換します。JSONStreamstringify()を渡します。

.through(stringify())


10. ファイルに保存する

最後は、Nodeのfs.createWriteStream()ストリームに渡してファイルに保存します。.pipe()はHighlandストリームを通常のNodeのストリームに変換してくれるため、次のようにつなぐことができます。

.pipe(createWriteStream(dist))


全体像

全体としては、次のようになります。

import {join} from 'path'

import {createWriteStream} from 'fs'
import request from 'request'
import {parse, stringify} from 'JSONStream'
import find from 'lodash.get'
import highland from 'highland'

const apiRoot = 'https://api.openbd.jp/v1'
const dist = join(process.cwd(), 'all.json')

highland(request(`${apiRoot}/coverage`))
.through(parse('*'))
.batch(10000)
.ratelimit(4, 60 * 1000)
.map(isbns => ({
method: 'POST', url: `${apiRoot}/get`, form: {isbn: isbns.join(',')}
}))
.flatMap(opts => highland(request(opts)))
.through(parse('*'))
.map(book => Object.assign({}, book.summary, {
description: find(book, 'onix.CollateralDetail.TextContent[0].Text', ''),
price: find(book, 'onix.ProductSupply.SupplyDetail.Price[0].PriceAmount', '')
}))
.through(stringify())
.pipe(createWriteStream(dist))

このスクリプトを実行するには、事前に次のライブラリをインストールしておきます。

$ npm init -y

$ npm i JSONStream highland lodash.get request reify

次のコマンドでダウンロード開始。

$ node --require reify download.js

メモ: reifyimport文の変換のために入れています。require文で書き直せば不要です。


まとめ

ストリームの便利なところは、途中にAPIの問い合わせや、ファイルの保存など時間のかかる処理があっても、スピードを全体でうまいこと調整して、よしなにしてくれることです。上記のサンプルをストリームなしで書こうとすると、プロミスかコールバックで一つ一つの処理を順番にこなすことになり、時間的にもメモリ的にも非効率になってしまいます。

一方で、ストリームを直接扱おうとすると、上記のmapbatchといった部分まで自前でコントロール必要があり、ちょいちょい細かいスクリプトを書く必要が生じて面倒です。Highlandはその手の「必要になりそうな」メソッドが全て事前に用意されているので、処理の「流れ」を書けば、基本OK。この点、非常に楽です。特に、接続するストリームの数が増えて来て、複雑になってくるとありがたいですね。

途中にも書きましたが、メソッド数が多いので、事前にドキュメントを流し読みしておくと何ができるかあたりをつけやすくなると思います。細かく読むのは実際に必要になったときで遅くないので。