以下の2エントリの続きもの。
redis.clients.jedis.Response についておさらい
Jedisの Pipeline
を使うと、get
, set
などの各処理は Response
に包まれた返り値として取得され、sync
後に get
することで実際の値が取得される。
using(new Jedis("192.168.53.52", 6379)) { jedis =>
val p = jedis.pipelined()
val v2 = p.get("key2")
val v3 = p.get("key3")
p.sync()
println((v2.get, v3.get))
}
sync
呼び出し前に get
しようとすると JedisDataException: Please close pipeline or multi block before calling this method.
が発生する。
ClusteredPipeline リファクタリング案
さて、前回からの続きで、ClusteredPipeline
に各読み書きメソッドの追加を検討する。
こんな感じにしたかったが…
package com.zaneli.redis.jedis
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, Jedis, JedisSlotBasedConnectionHandler, JedisPool, Pipeline, Response}
import redis.clients.util.JedisClusterCRC16
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.reflectiveCalls
import scala.util.Try
class ClusteredPipeline(nodes: Set[HostAndPort], config: GenericObjectPoolConfig, timeout: Int) extends AutoCloseable {
private[this] case class Connection(jedis: Jedis, pipeline: Pipeline)
private[this] val connections: mutable.Map[JedisPool, Connection] = mutable.Map.empty
private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout) {
def getSlotPool(slot: Int): JedisPool = {
cache.getSlotPool(slot)
}
def getConnection(pool: JedisPool): Jedis = {
if (pool != null) pool.getResource() else getConnection()
}
}
def get(key: String): Response[String] = {
exec(key)(_.get(key))
}
def set(key: String, value: String): Response[String] = {
exec(key)(_.set(key, value))
}
def lrange(key: String, start: Long, end: Long): Response[JList[String]] = {
exec(key)(_.lrange(key, start, end))
}
def hgetAll(key: String): Response[Map[String, String]] = {
exec(key)(_.hgetAll(key))
}
private[this] def exec[A](key: String)(cmd: Pipeline => Response[A]): Response[A] = {
val slot = JedisClusterCRC16.getSlot(key)
val pool = handler.getSlotPool(slot)
val pipeline = getPipeline(pool)
cmd(pipeline)
}
private[this] def getPipeline(pool: JedisPool): Pipeline = {
connections.get(pool).map(_.pipeline).getOrElse {
val j = handler.getConnection(pool)
val p = j.pipelined()
connections.put(pool, Connection(j, p))
p
}
}
def sync(): Unit = {
connections.values.foreach(_.pipeline.sync())
}
override def close(): Unit = {
connections.values.foreach(j => Try(j.jedis.close()))
connections.clear()
}
}
これは以下のコンパイルエラーになる。
[error] found : redis.clients.jedis.Response[java.util.List[String]]
[error] required: redis.clients.jedis.Response[scala.collection.immutable.List[String]]
[error] exec(key)(_.lrange(key, start, end))
[error] ^
[error] found : redis.clients.jedis.Response[java.util.Map[String,String]]
[error] required: redis.clients.jedis.Response[scala.collection.immutable.Map[String,String]]
[error] exec(key)(_.hgetAll(key))
[error] ^
[error] two errors found
仕方なく、import java.util.{List => JList, Map => JMap}
みたいなインポートを追加して、
def lrange(key: String, start: Long, end: Long): Response[JList[String]]
def hgetAll(key: String): Response[JMap[String, String]]
とすることでコンパイルは通るが、呼び出し側でScalaのコレクションに変換したりゴニョゴニョするのも不便なので
もう少し何とかできないものか。
Functor
にして、map
で値の変換を行えないか試みることにする。
scalaz.Functor の導入
こんな package object を作る。
package com.zaneli.redis
import redis.clients.jedis.{Builder, Response}
import scalaz.Functor
package object jedis {
implicit val responseFunctor = new Functor[Response] {
def map[A, B](fa: Response[A])(f: A => B): Response[B] = {
val r = new Response[B](new Builder[B] {
override def build(o: scala.Any): B = f(o.asInstanceOf[Response[A]].get)
})
r.set(fa)
r
}
}
}
ClusteredPipeline
はこうなる。
...
import scala.collection.JavaConversions._
...
class ClusteredPipeline(nodes: Set[HostAndPort], config: GenericObjectPoolConfig, timeout: Int) extends AutoCloseable {
import scalaz.Scalaz._
...
def lrange(key: String, start: Long, end: Long): Response[List[String]] = {
exec(key)(_.lrange(key, start, end).map(_.toList))
}
def hgetAll(key: String): Response[Map[String, String]] = {
exec(key)(_.hgetAll(key).map(_.toMap))
}
...
}
これでScalaの List
, Map
として扱うことができる。
また、呼び出し元でさらに値を加工したい場合にも Option
などでお馴染みの書き方ができる。
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
import scala.util.{Failure, Success}
object JedisTest5 extends App {
import scalaz.Scalaz._
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
val cluster = new JedisPipelinedCluster(nodes)
val result = cluster.pipelined { cp =>
val v1 = cp.get("key1")
val v2 = cp.get("key2")
val v3 = cp.get("key3")
cp.sync()
List(v1, v2, v3).map(_.map(_ + "!!").get)
}
result match {
case Success(xs) => xs.foreach(println) // 「value1!!」「value2!!」「value3!!」出力
case Failure(e) => e.printStackTrace()
}
}
scalaz.Monad の導入
package object の定義をこのように変更。
package com.zaneli.redis
import redis.clients.jedis.{Builder, Response}
import scalaz.Monad
package object jedis {
implicit val responseMonad = new Monad[Response] {
override def point[A](a: => A): Response[A] = {
val r = new Response(new Builder[A] {
override def build(o: scala.Any): A = o.asInstanceOf[A]
})
r.set(a)
r
}
override def bind[A, B](fa: Response[A])(f: A => Response[B]): Response[B] = {
val r = new Response[B](new Builder[B] {
override def build(o: scala.Any): B = f(o.asInstanceOf[Response[A]].get).get
})
r.set(fa)
r
}
}
}
これで for {...} yield {...}
の中で使える。
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
import scala.util.{Failure, Success}
object JedisTest6 extends App {
import scalaz.Scalaz._
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
val cluster = new JedisPipelinedCluster(nodes)
val result = cluster.pipelined { cp =>
val r1 = cp.get("key1")
val r2 = cp.get("key2")
val r3 = cp.get("key3")
cp.sync()
val r = for {
v1 <- r1
v2 <- r2
v3 <- r3
} yield {
v1 + v2 + v3
}
r.get
}
result match {
case Success(x) => println(x) // 「value1value2value3」出力
case Failure(e) => e.printStackTrace()
}
}
List
の Response
を Response
の List
にできる。
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
object JedisTest7 extends App {
import scalaz.Scalaz._
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
val cluster = new JedisPipelinedCluster(nodes)
val result = cluster.pipelined { cp =>
val r1 = cp.get("key1")
val r2 = cp.get("key2")
val r3 = cp.get("key3")
cp.sync()
responseMonad.sequence(List(r1, r2, r3)).get()
}
result match {
case scala.util.Success(x) => println(x) // 「List(value1, value2, value3)」出力
case scala.util.Failure(e) => e.printStackTrace()
}
}