LoginSignup
1
0

More than 5 years have passed since last update.

redis.clients.jedis.Response<T> を Functor, Monad に仕立て上げる

Last updated at Posted at 2015-07-21

以下の2エントリの続きもの。

redis.clients.jedis.Response についておさらい

Jedisの Pipeline を使うと、get, set などの各処理は Response に包まれた返り値として取得され、sync 後に get することで実際の値が取得される。

using(new Jedis("192.168.53.52", 6379)) { jedis =>
  val p = jedis.pipelined()
  val v2 = p.get("key2")
  val v3 = p.get("key3")
  p.sync()
  println((v2.get, v3.get))
}

sync 呼び出し前に get しようとすると JedisDataException: Please close pipeline or multi block before calling this method. が発生する。

ClusteredPipeline リファクタリング案

さて、前回からの続きで、ClusteredPipeline に各読み書きメソッドの追加を検討する。
こんな感じにしたかったが…

package com.zaneli.redis.jedis

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, Jedis, JedisSlotBasedConnectionHandler, JedisPool, Pipeline, Response}
import redis.clients.util.JedisClusterCRC16

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.reflectiveCalls
import scala.util.Try

class ClusteredPipeline(nodes: Set[HostAndPort], config: GenericObjectPoolConfig, timeout: Int) extends AutoCloseable {

  private[this] case class Connection(jedis: Jedis, pipeline: Pipeline)

  private[this] val connections: mutable.Map[JedisPool, Connection] = mutable.Map.empty

  private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout) {
    def getSlotPool(slot: Int): JedisPool = {
      cache.getSlotPool(slot)
    }
    def getConnection(pool: JedisPool): Jedis = {
      if (pool != null) pool.getResource() else getConnection()
    }
  }

  def get(key: String): Response[String] = {
    exec(key)(_.get(key))
  }

  def set(key: String, value: String): Response[String] = {
    exec(key)(_.set(key, value))
  }

  def lrange(key: String, start: Long, end: Long): Response[JList[String]] = {
    exec(key)(_.lrange(key, start, end))
  }

  def hgetAll(key: String): Response[Map[String, String]] = {
    exec(key)(_.hgetAll(key))
  }

  private[this] def exec[A](key: String)(cmd: Pipeline => Response[A]): Response[A] = {
    val slot = JedisClusterCRC16.getSlot(key)
    val pool = handler.getSlotPool(slot)
    val pipeline = getPipeline(pool)
    cmd(pipeline)
  }

  private[this] def getPipeline(pool: JedisPool): Pipeline = {
    connections.get(pool).map(_.pipeline).getOrElse {
      val j = handler.getConnection(pool)
      val p = j.pipelined()
      connections.put(pool, Connection(j, p))
      p
    }
  }

  def sync(): Unit = {
    connections.values.foreach(_.pipeline.sync())
  }

  override def close(): Unit = {
    connections.values.foreach(j => Try(j.jedis.close()))
    connections.clear()
  }
}

これは以下のコンパイルエラーになる。

[error]  found   : redis.clients.jedis.Response[java.util.List[String]]
[error]  required: redis.clients.jedis.Response[scala.collection.immutable.List[String]]
[error]     exec(key)(_.lrange(key, start, end))
[error]                       ^
[error]  found   : redis.clients.jedis.Response[java.util.Map[String,String]]
[error]  required: redis.clients.jedis.Response[scala.collection.immutable.Map[String,String]]
[error]     exec(key)(_.hgetAll(key))
[error]                        ^
[error] two errors found

仕方なく、import java.util.{List => JList, Map => JMap} みたいなインポートを追加して、
def lrange(key: String, start: Long, end: Long): Response[JList[String]]
def hgetAll(key: String): Response[JMap[String, String]]
とすることでコンパイルは通るが、呼び出し側でScalaのコレクションに変換したりゴニョゴニョするのも不便なので
もう少し何とかできないものか。
Functorにして、mapで値の変換を行えないか試みることにする。

scalaz.Functor の導入

こんな package object を作る。

package.scala
package com.zaneli.redis

import redis.clients.jedis.{Builder, Response}

import scalaz.Functor

package object jedis {

  implicit val responseFunctor = new Functor[Response] {
    def map[A, B](fa: Response[A])(f: A => B): Response[B] = {
      val r = new Response[B](new Builder[B] {
        override def build(o: scala.Any): B = f(o.asInstanceOf[Response[A]].get)
      })
      r.set(fa)
      r
    }
  }
}

ClusteredPipeline はこうなる。

ClusteredPipeline.scala(一部抜粋)
...

import scala.collection.JavaConversions._

...

class ClusteredPipeline(nodes: Set[HostAndPort], config: GenericObjectPoolConfig, timeout: Int) extends AutoCloseable {

  import scalaz.Scalaz._

  ...

  def lrange(key: String, start: Long, end: Long): Response[List[String]] = {
    exec(key)(_.lrange(key, start, end).map(_.toList))
  }

  def hgetAll(key: String): Response[Map[String, String]] = {
    exec(key)(_.hgetAll(key).map(_.toMap))
  }

  ...
}

これでScalaの List, Map として扱うことができる。
また、呼び出し元でさらに値を加工したい場合にも Option などでお馴染みの書き方ができる。

JedisTest5.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

import scala.util.{Failure, Success}

object JedisTest5 extends App {

  import scalaz.Scalaz._

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  val cluster = new JedisPipelinedCluster(nodes)
  val result = cluster.pipelined { cp =>
    val v1 = cp.get("key1")
    val v2 = cp.get("key2")
    val v3 = cp.get("key3")
    cp.sync()
    List(v1, v2, v3).map(_.map(_ + "!!").get)
  }
  result match {
    case Success(xs) => xs.foreach(println) // 「value1!!」「value2!!」「value3!!」出力
    case Failure(e) => e.printStackTrace()
  }
}

scalaz.Monad の導入

package object の定義をこのように変更。

package.scala
package com.zaneli.redis

import redis.clients.jedis.{Builder, Response}

import scalaz.Monad

package object jedis {

  implicit val responseMonad = new Monad[Response] {
    override def point[A](a: => A): Response[A] = {
      val r = new Response(new Builder[A] {
        override def build(o: scala.Any): A = o.asInstanceOf[A]
      })
      r.set(a)
      r
    }

    override def bind[A, B](fa: Response[A])(f: A => Response[B]): Response[B] = {
      val r = new Response[B](new Builder[B] {
        override def build(o: scala.Any): B = f(o.asInstanceOf[Response[A]].get).get
      })
      r.set(fa)
      r
    }
  }
}

これで for {...} yield {...} の中で使える。

JedisTest6.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

import scala.util.{Failure, Success}

object JedisTest6 extends App {

  import scalaz.Scalaz._

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  val cluster = new JedisPipelinedCluster(nodes)
  val result = cluster.pipelined { cp =>
    val r1 = cp.get("key1")
    val r2 = cp.get("key2")
    val r3 = cp.get("key3")
    cp.sync()
    val r = for {
      v1 <- r1
      v2 <- r2
      v3 <- r3
    } yield {
      v1 + v2 + v3
    }
    r.get
  }
  result match {
    case Success(x) => println(x) // 「value1value2value3」出力
    case Failure(e) => e.printStackTrace()
  }
}

ListResponseResponseList にできる。

JedisTest7.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

object JedisTest7 extends App {

  import scalaz.Scalaz._

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  val cluster = new JedisPipelinedCluster(nodes)
  val result = cluster.pipelined { cp =>
    val r1 = cp.get("key1")
    val r2 = cp.get("key2")
    val r3 = cp.get("key3")
    cp.sync()
    responseMonad.sequence(List(r1, r2, r3)).get()
  }
  result match {
    case scala.util.Success(x) => println(x) // 「List(value1, value2, value3)」出力
    case scala.util.Failure(e) => e.printStackTrace()
  }
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0