Twitter で @todesking さんからいただいたアドバイスを元に改良を試みる。
- https://twitter.com/todesking/status/622017179328774144
- https://twitter.com/todesking/status/622023114793201665
package com.zaneli.redis.jedis
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, Protocol}
import redis.clients.jedis.exceptions.{JedisClusterMaxRedirectionsException, JedisMovedDataException}
import scalikejdbc.using
import scala.annotation.tailrec
import scala.util.{Failure, Try}
class JedisPipelinedCluster(
nodes: Set[HostAndPort],
config: GenericObjectPoolConfig = new GenericObjectPoolConfig(),
timeout: Int = Protocol.DEFAULT_TIMEOUT,
maxRedirections: Int = 5) {
def pipelined[A](cmd: ClusteredPipeline => A): Try[A] =
pipelined(cmd, maxRedirections)
@tailrec
private[this] def pipelined[A](cmd: ClusteredPipeline => A, redirections: Int): Try[A] = {
val result = Try {
using(new ClusteredPipeline(nodes, config, timeout)) { cp =>
cmd(cp)
}
}
result match {
case Failure(e: JedisMovedDataException) =>
if (redirections <= 0) {
Failure(new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"))
} else {
pipelined(cmd, redirections - 1)
}
case r => r
}
}
}
JedisPipelinedCluster
は前回とあまり変わらないが、
キーを別途渡してここでslot
生成、コネクション取得、直接Pipeline
操作、というのをやめ、
ClusteredPipeline
に処理を任せるようにした。
package com.zaneli.redis.jedis
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, Jedis, JedisSlotBasedConnectionHandler, JedisPool, Pipeline, Response}
import redis.clients.util.JedisClusterCRC16
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.reflectiveCalls
import scala.util.Try
class ClusteredPipeline(nodes: Set[HostAndPort], config: GenericObjectPoolConfig, timeout: Int) extends AutoCloseable {
private[this] case class Connection(jedis: Jedis, pipeline: Pipeline)
private[this] val connections: mutable.Map[JedisPool, Connection] = mutable.Map.empty
private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout) {
def getSlotPool(slot: Int): JedisPool = {
cache.getSlotPool(slot)
}
def getConnection(pool: JedisPool): Jedis = {
if (pool != null) pool.getResource() else getConnection()
}
}
def exec[A](key: String)(cmd: Pipeline => String => Response[A]): Response[A] = {
val slot = JedisClusterCRC16.getSlot(key)
val pool = handler.getSlotPool(slot)
val pipeline = getPipeline(pool)
cmd(pipeline)(key)
}
private[this] def getPipeline(pool: JedisPool): Pipeline = {
connections.get(pool).map(_.pipeline).getOrElse {
val j = handler.getConnection(pool)
val p = j.pipelined()
connections.put(pool, Connection(j, p))
p
}
}
def sync(): Unit = {
connections.values.foreach(_.pipeline.sync())
}
override def close(): Unit = {
connections.values.foreach(j => Try(j.jedis.close()))
connections.clear()
}
}
JedisSlotBasedConnectionHandler
が protected
で持っている cache
に触りたかったので少々強引だがこのように。
(JedisPool
を直接取りたかったので、この辺り相当の処理を個別に行っている。)
同じ JedisPool
からすでに Jedis
を取得済みであれば connections
に保持した Pipeline
を使い回し、
まだ取得していなければ新たに Jedis
を取得してそこから Pipeline
を取る。
mutable.Map
で Jedis
も Pipeline
も保持するというややこしい事をしているが、
前者は最終的に close
するために、後者は exec
で使い回すために保持するようにした。
Mapのキーが JedisPool
なのもイマイチな気がするが…。
一応、同じノードに書かれた key2
と key3
では cache.getSlotPool
で同じ JedisPool
が取得される事は確認した。
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
import scala.util.{Failure, Success}
object JedisTest4 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
val cluster = new JedisPipelinedCluster(nodes)
val result = cluster.pipelined { cp =>
val v1 = cp.exec("key1")(_.get)
val v2 = cp.exec("key2")(_.get)
val v3 = cp.exec("key3")(_.get)
cp.sync()
(v1.get, v2.get, v3.get)
}
result match {
case Success((v1, v2, v3)) => println((v1, v2, v3))
case Failure(e) => e.printStackTrace()
}
}
このように使う。
前回記事の通り、key1
と key2``key3
では別のノードに書かれるため同一 Pipeline
では処理できないが、
ClusteredPipeline
内で適宜 Pipeline
の生成・使い回しを行っているので
呼び出し側はこのように一度に処理できる。
補足としては、以下のように汎用的な exec
を ClusteredPipeline
に用意したつもりだったが、
def exec[A](key: String)(cmd: Pipeline => String => Response[A]): Response[A]
結局 get
くらいしかシグネチャが合うものがないので、
set
, hget
など Pipeline
が持つ Response[_]
を返すメソッド全てを
ClusteredPipeline
にも持たせるほうが都合が良かったかな、と思う。