2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

続・Jedisパイプライン処理を(無理やり)Redis Cluster対応させる

Last updated at Posted at 2015-07-17

Twitter で @todesking さんからいただいたアドバイスを元に改良を試みる。

JedisPipelinedCluster.scala
package com.zaneli.redis.jedis

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, Protocol}
import redis.clients.jedis.exceptions.{JedisClusterMaxRedirectionsException, JedisMovedDataException}
import scalikejdbc.using

import scala.annotation.tailrec
import scala.util.{Failure, Try}

class JedisPipelinedCluster(
    nodes: Set[HostAndPort],
    config: GenericObjectPoolConfig = new GenericObjectPoolConfig(),
    timeout: Int = Protocol.DEFAULT_TIMEOUT,
    maxRedirections: Int = 5) {

  def pipelined[A](cmd: ClusteredPipeline => A): Try[A] =
    pipelined(cmd, maxRedirections)

  @tailrec
  private[this] def pipelined[A](cmd: ClusteredPipeline => A, redirections: Int): Try[A] = {
    val result = Try {
      using(new ClusteredPipeline(nodes, config, timeout)) { cp =>
        cmd(cp)
      }
    }
    result match {
      case Failure(e: JedisMovedDataException) =>
        if (redirections <= 0) {
          Failure(new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"))
        } else {
          pipelined(cmd, redirections - 1)
        }
      case r => r
    }
  }
}

JedisPipelinedCluster前回とあまり変わらないが、
キーを別途渡してここでslot生成、コネクション取得、直接Pipeline操作、というのをやめ、
ClusteredPipeline に処理を任せるようにした。

ClusteredPipeline.scala
package com.zaneli.redis.jedis

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, Jedis, JedisSlotBasedConnectionHandler, JedisPool, Pipeline, Response}
import redis.clients.util.JedisClusterCRC16

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.reflectiveCalls
import scala.util.Try

class ClusteredPipeline(nodes: Set[HostAndPort], config: GenericObjectPoolConfig, timeout: Int) extends AutoCloseable {

  private[this] case class Connection(jedis: Jedis, pipeline: Pipeline)

  private[this] val connections: mutable.Map[JedisPool, Connection] = mutable.Map.empty

  private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout) {
    def getSlotPool(slot: Int): JedisPool = {
      cache.getSlotPool(slot)
    }
    def getConnection(pool: JedisPool): Jedis = {
      if (pool != null) pool.getResource() else getConnection()
    }
  }

  def exec[A](key: String)(cmd: Pipeline => String => Response[A]): Response[A] = {
    val slot = JedisClusterCRC16.getSlot(key)
    val pool = handler.getSlotPool(slot)
    val pipeline = getPipeline(pool)
    cmd(pipeline)(key)
  }

  private[this] def getPipeline(pool: JedisPool): Pipeline = {
    connections.get(pool).map(_.pipeline).getOrElse {
      val j = handler.getConnection(pool)
      val p = j.pipelined()
      connections.put(pool, Connection(j, p))
      p
    }
  }

  def sync(): Unit = {
    connections.values.foreach(_.pipeline.sync())
  }

  override def close(): Unit = {
    connections.values.foreach(j => Try(j.jedis.close()))
    connections.clear()
  }
}

JedisSlotBasedConnectionHandlerprotected で持っている cache に触りたかったので少々強引だがこのように。
(JedisPool を直接取りたかったので、この辺り相当の処理を個別に行っている。)

同じ JedisPool からすでに Jedis を取得済みであれば connections に保持した Pipeline を使い回し、
まだ取得していなければ新たに Jedis を取得してそこから Pipeline を取る。

mutable.MapJedisPipeline も保持するというややこしい事をしているが、
前者は最終的に close するために、後者は exec で使い回すために保持するようにした。

Mapのキーが JedisPool なのもイマイチな気がするが…。
一応、同じノードに書かれた key2key3 では cache.getSlotPool で同じ JedisPool が取得される事は確認した。

JedisTest4.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

import scala.util.{Failure, Success}

object JedisTest4 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  val cluster = new JedisPipelinedCluster(nodes)
  val result = cluster.pipelined { cp =>
    val v1 = cp.exec("key1")(_.get)
    val v2 = cp.exec("key2")(_.get)
    val v3 = cp.exec("key3")(_.get)
    cp.sync()
    (v1.get, v2.get, v3.get)
  }
  result match {
    case Success((v1, v2, v3)) => println((v1, v2, v3))
    case Failure(e) => e.printStackTrace()
  }
}

このように使う。
前回記事の通り、key1key2``key3 では別のノードに書かれるため同一 Pipeline では処理できないが、
ClusteredPipeline 内で適宜 Pipeline の生成・使い回しを行っているので
呼び出し側はこのように一度に処理できる。

補足としては、以下のように汎用的な execClusteredPipeline に用意したつもりだったが、

def exec[A](key: String)(cmd: Pipeline => String => Response[A]): Response[A]

結局 get くらいしかシグネチャが合うものがないので、
set, hget など Pipeline が持つ Response[_] を返すメソッド全てを
ClusteredPipeline にも持たせるほうが都合が良かったかな、と思う。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?