いつか Dispatch から移行する日に備えて、試行錯誤したメモを残しておく。
目標
サンプルとして、複数のサイトからフィードを並列ダウンロードし、それぞれの記事のタイトルをひとつの Seq
にまとめてみる。今回は最も高レベルな Request-Level Client-Side API を使う。
サンプルプロジェクトは github に上げてあるのでどうぞ。
準備
まずプロジェクトを作成する。新しめの sbt を使う。
sbt new scala/scala-seed.g8
適当なプロジェクト名を入れる。
name [Scala Seed Project]: Akka-HTTP Client Sample Project
cd ./akka-http-client-sample-project
vi build.sbt
必要なライブラリを追記する。
libraryDependencies ++= Seq(
"org.scala-lang.modules" %% "scala-xml" % "1.0.6",
"com.typesafe.akka" %% "akka-http" % "10.0.11",
"com.typesafe.akka" %% "akka-stream" % "2.5.8",
"com.typesafe.akka" %% "akka-actor" % "2.5.8"
)
ライブラリをインストールする。
sbt update
ライブラリと環境の準備
大枠のコードは次のようになる。
vi src/main/scala/Main.scala
import scala.concurrent._
import scala.concurrent.duration._
import scala.xml._
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.coding.{ Gzip, Deflate, NoCoding }
import akka.http.scaladsl.unmarshalling.Unmarshal
object Main {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val uriList = Seq(
"https://news.yahoo.co.jp/pickup/rss.xml",
"http://rss.asahi.com/rss/asahi/national.rdf",
"https://www3.nhk.or.jp/rss/news/cat0.xml"
)
def main(args: Array[String]): Unit = {
???
}
}
以下、内容を実装していく。
HTTP リクエストを生成する
HTTP リクエスト部分。
def request(uri: String): Future[HttpResponse] = {
val req = HttpRequest(GET, uri = Uri(uri))
.withHeaders(
RawHeader("Accept-Encoding", "gzip,deflate")
)
Http().singleRequest(req)
}
実践に即して Accept-Encoding
を指定した。メジャーなヘッダエレメントについては RawHeader
を使わなくても指定できるようになっている。
ベーシック認証を使う例
val req = HttpRequest(GET, uri = Uri(uri))
.withHeaders(
Authorization(BasicHttpCredentials("admin", "password"))
)
POST 等でデータを指定する例
こちらは未検証。
import akka.http.scaladsl.model.HttpCharsets._
import akka.http.scaladsl.model.MediaTypes._
import akka.util.ByteString
val userData = ByteString("abc")
val req = HttpRequest(POST, uri = Uri(uri))
.withEntity(`text/plain` withCharset `UTF-8`, userData)
RequestEntity
を作成する方法やファイルアップロードをする例もあるようだ。
プロトコルを指定する例
import akka.http.scaladsl.model.HttpProtocols._
val req = HttpRequest(GET, uri = Uri(uri))
.withProtocol(`HTTP/1.1`)
gzip/deflate デコーダを用意する
現在のところ組み込みのデコーダは用意されていないので自前で処理する必要があるようだ。
def decodeResponse(res: HttpResponse): HttpResponse = {
val decoder = res.encoding match {
case HttpEncodings.gzip ⇒
Gzip
case HttpEncodings.deflate ⇒
Deflate
case HttpEncodings.identity ⇒
NoCoding
case _ ⇒
NoCoding
}
decoder.decodeMessage(res)
}
レスポンスから body 部を抜き出す
HttpResponse
から、肝心の body 部を抜き出すコード。この情報がなかなか見つからなかった。Future
で返るが今回はさっさと文字列にする。
def extractBody(res: HttpResponse): String = {
val body: Future[String] = Unmarshal(res.entity).to[String]
Await.result(body, Duration.Inf)
}
次のような回りくどい例がネットで見つかるが、上記のように素直に Unmarshal
を使えばよいと思う。
import akka.util.ByteString
val body: Future[String] = res.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String)
フィードからタイトル部分を抜き出す
さくっと抜き出すだけのコード。
def postProcess(res: HttpResponse): Seq[String] = {
val content = extractBody(res)
val xml = XML.loadString(content)
val titles = for (t <- xml \\ "title") yield t.text
println("success: %d titles found (%d bytes)".format(titles.size, content.length))
titles.toSeq
}
メインの処理を実装する
メインのコード、前半。
def main(args: Array[String]): Unit = {
val timeout = 10.0
val reqList: Seq[Future[HttpResponse]] = for (uri <- uriList) yield request(uri)
val futureSeq = reqList.map { req =>
req.map(decodeResponse(_)).map(postProcess(_)).recover {
case e =>
println(e.getMessage)
Seq()
}
}
???
}
結果を decodeResponse
に渡してから postProcess
に放り込む。いちおう recover()
でエラーを拾っている。
メインのコード、後半。
val result: Seq[String] = try {
Await.result(scala.concurrent.Future.sequence(futureSeq).mapTo[Seq[String]], Duration(timeout, SECONDS))
} catch {
case e: TimeoutException => {
println("timed out !: %s".format(e))
Seq()
}
}
println("result: total %d entries found.".format(result.size))
Await.result
でそれぞれの Future
の実行結果を待ち合わせる。mapTo
で Seq[String]
に変換して終了。
sbt> run
[info] Running Main
success: 13 titles found (10089 bytes)
success: 43 titles found (21760 bytes)
success: 9 titles found (2852 bytes)
result: total 65 entries found.
[success] Total time: 2 s, completed Dec 30, 2017 12:58:00 PM
これで、複数のサイトの情報をパラレルに取得して結果を統合することができた。
プロキシを使う
10.0.9 以降は https プロキシを使えるようだが unstable とのこと。私の環境ではうまく動かせなかったが参考までに。
def request(uri: String): Future[HttpResponse] = {
...
Http().singleRequest(req, settings = connectionPoolSettings)
}
lazy val connectionPoolSettings: ConnectionPoolSettings = {
import akka.http.scaladsl.ClientTransport
import java.net.InetSocketAddress
case class ProxyConfig(host: String, port: Int)
val proxyConfig = Some(ProxyConfig("localhost", 8888))
val clientTransport =
proxyConfig.map(p => ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(p.host, p.port)))
.getOrElse(ClientTransport.TCP)
ConnectionPoolSettings(system).withTransport(clientTransport)
}
まとめ
- 今回のような単純なユースケースだとわりとカジュアルに使えそう
- 公式ドキュメントは読みにくいが探せばネットに情報はある
これからは Dispatch
を使う場面で Akka HTTP Client
を採用することを積極的に検討してよいと思う。