LoginSignup
13
11

More than 5 years have passed since last update.

Jedisパイプライン処理を(無理やり)Redis Cluster対応させる

Last updated at Posted at 2015-07-17

続きを書いた。

前提

host 192.168.53.52, port 6379, 6380, 6381 のRedis Cluster構成を作り、以下のデータを作成した。
本記事はこれを前提としている。

redis-cli -h 192.168.53.52 -p 6379 -c

192.168.53.52:6379> set key1 value1
-> Redirected to slot [9189] located at 192.168.53.52:6380
OK
192.168.53.52:6379> set key2 value2
-> Redirected to slot [4998] located at 192.168.53.52:6379
OK
192.168.53.52:6379> set key3 value3
OK
192.168.53.52:6379> get key1
-> Redirected to slot [9189] located at 192.168.53.52:6380
"value1"
192.168.53.52:6379> get key2
-> Redirected to slot [4998] located at 192.168.53.52:6379
"value2"
192.168.53.52:6379> get key3
"value3"

また、本題にはあまり関係ないが using メソッドはscalikejdbcのLoanPatternをお借りした。

Jedis 2.7.2 現状の実装

JavaのクライアントライブラリJedisではstill under development ながらもRedis Clusterに対応している。

JedisTest1.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.{HostAndPort, JedisCluster}
import scalikejdbc.using

import scala.collection.JavaConversions._

object JedisTest1 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  using(new JedisCluster(nodes)) { cluster =>
    val v1 = cluster.get("key1")
    val v2 = cluster.get("key2")
    val v3 = cluster.get("key3")
    println((v1, v2, v3))
  }
}

しかし、パイプライン処理には対応していないらしくJedisClusterクラスには pipelined 的なメソッドがない。
ちなみにRedis Cluster対応していないクライアントでパイプライン処理しようとすると以下のExceptionが発生。

JedisTest2.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.{HostAndPort, Jedis}
import scalikejdbc.using

object JedisTest2 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  using(new Jedis(nodes.head.getHost, nodes.head.getPort)) { jedis =>
    val p = jedis.pipelined()
    val v1 = p.get("key1")
    val v2 = p.get("key2")
    val v3 = p.get("key3")
    p.sync()
    println((v1.get, v2.get, v3.get))
  }
}
Exception in thread "main" redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9189 192.168.53.52:6380
    at redis.clients.jedis.Protocol.processError(Protocol.java:108)
    at redis.clients.jedis.Protocol.process(Protocol.java:142)
    at redis.clients.jedis.Protocol.read(Protocol.java:196)
    at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288)
    at redis.clients.jedis.Connection.getAll(Connection.java:258)
    at redis.clients.jedis.Connection.getAll(Connection.java:250)
    at redis.clients.jedis.Pipeline.sync(Pipeline.java:85)
    at com.zaneli.redis.jedis.JedisTest2$$anonfun$2.apply(JedisTest2.scala:16)

(ところで今回の例では key2 と key3 はたまたま同じノードに書かれるため、上記の val v1 = p.get("key1"), v1.get を除くと読み取りに成功する。)

JedisPipelinedClusterを自作してがんばる

既存の JedisCluster での実装を参考にしつつ、強引気味ではあるが対応を試みる。

こちらのグループでも議論されているが、
そもそもパイプライン処理はRedis Cluster環境でも同じノード内でしか行えないため、
JedisTest2.scala とあまり変わらずノードの特定をもう少しマシにできるくらいが落とし所か…。

JedisPipelinedCluster.scala
package com.zaneli.redis.jedis

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisSlotBasedConnectionHandler, Pipeline, Protocol}
import redis.clients.jedis.exceptions.{JedisClusterMaxRedirectionsException, JedisMovedDataException}
import redis.clients.util.JedisClusterCRC16
import scalikejdbc.using

import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.{Failure, Try}

class JedisPipelinedCluster(
    nodes: Set[HostAndPort],
    config: GenericObjectPoolConfig = new GenericObjectPoolConfig(),
    timeout: Int = Protocol.DEFAULT_TIMEOUT,
    maxRedirections: Int = 5) {

  private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout)

  def pipelined[A](key: String)(cmd: Pipeline => A): Try[A] =
    pipelined(key, cmd, maxRedirections)

  @tailrec
  private[this] def pipelined[A](key: String, cmd: Pipeline => A, redirections: Int): Try[A] = {
    val result = Try {
      val slot = JedisClusterCRC16.getSlot(key)
      using(handler.getConnectionFromSlot(slot)) { jedis =>
        val pipeline = jedis.pipelined()
        cmd(pipeline)
      }
    }
    result match {
      case Failure(e: JedisMovedDataException) =>
        if (redirections <= 0) {
          Failure(new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"))
        } else {
          pipelined(key, cmd, redirections - 1)
        }
      case r => r
    }
  }
}

JedisTest3.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

import scala.util.{Failure, Success}

object JedisTest3 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  val cluster = new JedisPipelinedCluster(nodes)
  val result = cluster.pipelined("key2") { p =>
    val v2 = p.get("key2")
    val v3 = p.get("key3")
    p.sync()
    (v2.get, v3.get)
  }
  result match {
    case Success((v2, v3)) => println((v2, v3))
    case Failure(e) => e.printStackTrace()
  }
}

…うーん、結局ノード特定のためのキーを別途渡しているのが残念…。
cluster.pipelined("key1") を渡して p.get("key2") したりすると結局 JedisClusterMaxRedirectionsException が発生する。

もうちょっと実用的に

Redis Clusterのslotは、キー全体ではなく {...} で囲んだ部分に絞る事ができる。
つまり、複数キーを同じノードに書きたい場合、キーのフォーマットを {...} とし、この中の文字列を一致させればいい。

こんなcase classがあり、それぞれの値をRedisに書き込みたいとする。

SuperWrestler.scala
package com.zaneli.redis.jedis

import scala.collection.JavaConversions._
import scala.util.Try

case class SuperWrestler(id: Int, name: String, power: Int, favoriteHolds: List[String], family: Map[String, String])

object SuperWrestler extends RedisAccessor {
  override def prefix(id: Int): String = s"sw$id"

  def read(id: Int)(implicit cluster: JedisPipelinedCluster): Try[SuperWrestler] = {
    cluster.pipelined(prefix(id)) { p =>
      val name = p.get(key(id, "name"))
      val power = p.get(key(id, "power"))
      val favoriteHolds = p.lrange(key(id, "favoriteHolds"), 0, -1)
      val family = p.hgetAll(key(id, "family"))
      p.sync()
      SuperWrestler(id, name.get, power.get.toInt, favoriteHolds.get.toList, family.get.toMap)
    }
  }

  def write(sw: SuperWrestler)(implicit cluster: JedisPipelinedCluster): Boolean = {
    cluster.pipelined(prefix(sw.id)) { p =>
      p.set(key(sw.id, "name"), sw.name)
      p.set(key(sw.id, "power"), sw.power.toString)
      p.del(key(sw.id, "favoriteHolds"), key(sw.id, "family"))
      sw.favoriteHolds.map(p.rpush(key(sw.id, "favoriteHolds"), _))
      sw.family.map { case (k, v) => p.hset(key(sw.id, "family"), k, v) }
      p.sync()
    }.isSuccess
  }
}

trait RedisAccessor {
  def prefix(id: Int): String
  def key(id: Int, name: String): String = s"{${prefix(id)}}$name"
}

SuperWrestlerTest.scala
package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

object SuperWrestlerTest {

  def main(args: Array[String]) {
    val nodes = Set(6379, 6380, 6381).map { port =>
      new HostAndPort("192.168.53.52", port)
    }
    implicit val cluster = new JedisPipelinedCluster(nodes)

    val kinnikuman = SuperWrestler(
      1, "キン肉マン", 95, List("キン肉バスター", "キン肉ドライバー", "マッスルスパーク"), Map("父" -> "キン肉真弓", "母" -> "キン肉小百合"))

    val terryman = SuperWrestler(
      2, "テリーマン", 95, List("スピニングトゥホールド", "カーフブランディング", "テキサスコンドルキック"), Map("父" -> "ドリーマン"))

    val robinMask = SuperWrestler(
      3, "ロビンマスク", 96, List("タワーブリッジ", "ロビンスペシャル"), Map("父" -> "ロビンナイト"))

    Seq(kinnikuman, terryman, robinMask) foreach SuperWrestler.write

    (1 to 3) foreach (i => println(SuperWrestler.read(i)))
  }
}

SuperWrestler 内でのキープレフィックスを固定して、一つの SuperWrestler の情報は全て同じノードに書く。
こういう用途なら、今回作った JedisPipelinedCluster を使うと SuperWrestler 読み書きの際には少しは便利、かな…?

ちなみに今回の例では、キン肉マンはport6381, テリーマンはport6379, ロビンマスクはport6380にバラけて書かれた。 :)

13
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
11