Scala
Client
Akka-HTTP

Akka HTTP Client の基本メモ

いつか Dispatch から移行する日に備えて、試行錯誤したメモを残しておく。

目標

サンプルとして、複数のサイトからフィードを並列ダウンロードし、それぞれの記事のタイトルをひとつの Seq にまとめてみる。今回は最も高レベルな Request-Level Client-Side API を使う。

サンプルプロジェクトは github に上げてあるのでどうぞ。

準備

まずプロジェクトを作成する。新しめの sbt を使う。

sbt new scala/scala-seed.g8

適当なプロジェクト名を入れる。

name [Scala Seed Project]: Akka-HTTP Client Sample Project
cd ./akka-http-client-sample-project
vi build.sbt

必要なライブラリを追記する。

    libraryDependencies ++= Seq(
      "org.scala-lang.modules" %% "scala-xml" % "1.0.6",
      "com.typesafe.akka" %% "akka-http" % "10.0.11",
      "com.typesafe.akka" %% "akka-stream" % "2.5.8",
      "com.typesafe.akka" %% "akka-actor" % "2.5.8"
    )

ライブラリをインストールする。

sbt update

ライブラリと環境の準備

大枠のコードは次のようになる。

vi src/main/scala/Main.scala
import scala.concurrent._
import scala.concurrent.duration._
import scala.xml._
import java.util.concurrent.TimeoutException

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.coding.{ Gzip, Deflate, NoCoding }
import akka.http.scaladsl.unmarshalling.Unmarshal

object Main {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val uriList = Seq(
    "https://news.yahoo.co.jp/pickup/rss.xml",
    "http://rss.asahi.com/rss/asahi/national.rdf",
    "http://www.sankeibiz.jp/rss/news/points.xml"
  )

  def main(args: Array[String]): Unit = {
    ???
  }
}

以下、内容を実装していく。

HTTP リクエストを生成する

HTTP リクエスト部分。

  def request(uri: String): Future[HttpResponse] = {
    val req = HttpRequest(GET, uri = Uri(uri))
      .withHeaders(
        RawHeader("Accept-Encoding", "gzip,deflate")
      )

    Http().singleRequest(req)
  }

実践に即して Accept-Encoding を指定した。メジャーなヘッダエレメントについては RawHeader を使わなくても指定できるようになっている

ベーシック認証を使う例

    val req = HttpRequest(GET, uri = Uri(uri))
      .withHeaders(
        Authorization(BasicHttpCredentials("admin", "password"))
      )

POST 等でデータを指定する例

こちらは未検証。

    import akka.http.scaladsl.model.HttpCharsets._
    import akka.http.scaladsl.model.MediaTypes._
    import akka.util.ByteString

    val userData = ByteString("abc")
    val req = HttpRequest(POST, uri = Uri(uri))
      .withEntity(`text/plain` withCharset `UTF-8`, userData)

RequestEntity作成する方法ファイルアップロードをする例もあるようだ。

プロトコルを指定する例

    import akka.http.scaladsl.model.HttpProtocols._

    val req = HttpRequest(GET, uri = Uri(uri))
      .withProtocol(`HTTP/1.1`)

gzip/deflate デコーダを用意する

現在のところ組み込みのデコーダは用意されていないので自前で処理する必要があるようだ。

  def decodeResponse(res: HttpResponse): HttpResponse = {
    val decoder = res.encoding match {
      case HttpEncodings.gzip 
        Gzip
      case HttpEncodings.deflate 
        Deflate
      case HttpEncodings.identity 
        NoCoding
      case _ 
        NoCoding
    }

    decoder.decodeMessage(res)
  }

レスポンスから body 部を抜き出す

HttpResponse から、肝心の body 部を抜き出すコード。この情報がなかなか見つからなかった。Future で返るが今回はさっさと文字列にする。

  def extractBody(res: HttpResponse): String = {
    val body: Future[String] = Unmarshal(res.entity).to[String]

    Await.result(body, Duration.Inf)
  }

次のような回りくどい例がネットで見つかるが、上記のように素直に Unmarshal を使えばよいと思う。

    import akka.util.ByteString

    val body: Future[String] = res.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String)

フィードからタイトル部分を抜き出す

さくっと抜き出すだけのコード。

  def postProcess(res: HttpResponse): Seq[String] = {
    val content = extractBody(res)

    val xml = XML.loadString(content)
    val titles = for (t <- xml \\ "title") yield t.text

    println("success: %d titles found (%d bytes)".format(titles.size, content.length))

    titles.toSeq
  }

メインの処理を実装する

メインのコード、前半。

  def main(args: Array[String]): Unit = {
    val timeout = 10.0

    val reqList: Seq[Future[HttpResponse]] = for (uri <- uriList) yield request(uri)

    val futureSeq = reqList.map { req =>
      req.map(decodeResponse(_)).map(postProcess(_)).recover {
        case e =>
          println(e.getMessage)
          Seq()
      }
    }

    ???
  }

結果を decodeResponse に渡してから postProcess に放り込む。いちおう recover() でエラーを拾っている。

メインのコード、後半。

    val result: Seq[String] = try {
      Await.result(scala.concurrent.Future.sequence(futureSeq), Duration(timeout, SECONDS))
        .flatMap(_.asInstanceOf[Seq[String]])
    } catch {
      case e: TimeoutException => {
        println("timed out !: %s".format(e))
        Seq()
      }
    }

    println("result: total %d entries found.".format(result.size))

Await.result でそれぞれの Future の実行結果を待ち合わせる。flatMapSeq[String] に変換して終了。

sbt> run    
[info] Running Main
success: 13 titles found (10089 bytes)
success: 43 titles found (21760 bytes)
success: 9 titles found (2852 bytes)
result: total 65 entries found.
[success] Total time: 2 s, completed Dec 30, 2017 12:58:00 PM

これで、複数のサイトの情報をパラレルに取得して結果を統合することができた。

プロキシを使う

10.0.9 以降は https プロキシを使えるようだが unstable とのこと。私の環境ではうまく動かせなかったが参考までに。

  def request(uri: String): Future[HttpResponse] = {
    ...

    Http().singleRequest(req, settings = connectionPoolSettings)
  }

  lazy val connectionPoolSettings: ConnectionPoolSettings = {
    import akka.http.scaladsl.ClientTransport
    import java.net.InetSocketAddress

    case class ProxyConfig(host: String, port: Int)

    val proxyConfig = Some(ProxyConfig("localhost", 8888))
    val clientTransport =
      proxyConfig.map(p => ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(p.host, p.port)))
        .getOrElse(ClientTransport.TCP)

    ConnectionPoolSettings(system).withTransport(clientTransport)
  }

まとめ

  • 今回のような単純なユースケースだとわりとカジュアルに使えそう
  • 公式ドキュメントは読みにくいが探せばネットに情報はある

これからは Dispatch を使う場面で Akka HTTP Client を採用することを積極的に検討してよいと思う。