続きを書いた。
前提
host 192.168.53.52, port 6379, 6380, 6381 のRedis Cluster構成を作り、以下のデータを作成した。
本記事はこれを前提としている。
redis-cli -h 192.168.53.52 -p 6379 -c
192.168.53.52:6379> set key1 value1
-> Redirected to slot [9189] located at 192.168.53.52:6380
OK
192.168.53.52:6379> set key2 value2
-> Redirected to slot [4998] located at 192.168.53.52:6379
OK
192.168.53.52:6379> set key3 value3
OK
192.168.53.52:6379> get key1
-> Redirected to slot [9189] located at 192.168.53.52:6380
"value1"
192.168.53.52:6379> get key2
-> Redirected to slot [4998] located at 192.168.53.52:6379
"value2"
192.168.53.52:6379> get key3
"value3"
また、本題にはあまり関係ないが using
メソッドはscalikejdbcのLoanPatternをお借りした。
Jedis 2.7.2 現状の実装
JavaのクライアントライブラリJedisではstill under development ながらもRedis Clusterに対応している。
package com.zaneli.redis.jedis
import redis.clients.jedis.{HostAndPort, JedisCluster}
import scalikejdbc.using
import scala.collection.JavaConversions._
object JedisTest1 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
using(new JedisCluster(nodes)) { cluster =>
val v1 = cluster.get("key1")
val v2 = cluster.get("key2")
val v3 = cluster.get("key3")
println((v1, v2, v3))
}
}
しかし、パイプライン処理には対応していないらしく、JedisCluster
クラスには pipelined
的なメソッドがない。
ちなみにRedis Cluster対応していないクライアントでパイプライン処理しようとすると以下のExceptionが発生。
package com.zaneli.redis.jedis
import redis.clients.jedis.{HostAndPort, Jedis}
import scalikejdbc.using
object JedisTest2 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
using(new Jedis(nodes.head.getHost, nodes.head.getPort)) { jedis =>
val p = jedis.pipelined()
val v1 = p.get("key1")
val v2 = p.get("key2")
val v3 = p.get("key3")
p.sync()
println((v1.get, v2.get, v3.get))
}
}
Exception in thread "main" redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9189 192.168.53.52:6380
at redis.clients.jedis.Protocol.processError(Protocol.java:108)
at redis.clients.jedis.Protocol.process(Protocol.java:142)
at redis.clients.jedis.Protocol.read(Protocol.java:196)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288)
at redis.clients.jedis.Connection.getAll(Connection.java:258)
at redis.clients.jedis.Connection.getAll(Connection.java:250)
at redis.clients.jedis.Pipeline.sync(Pipeline.java:85)
at com.zaneli.redis.jedis.JedisTest2$$anonfun$2.apply(JedisTest2.scala:16)
(ところで今回の例では key2 と key3 はたまたま同じノードに書かれるため、上記の val v1 = p.get("key1")
, v1.get
を除くと読み取りに成功する。)
JedisPipelinedClusterを自作してがんばる
既存の JedisCluster での実装を参考にしつつ、強引気味ではあるが対応を試みる。
こちらのグループでも議論されているが、
そもそもパイプライン処理はRedis Cluster環境でも同じノード内でしか行えないため、
JedisTest2.scala
とあまり変わらずノードの特定をもう少しマシにできるくらいが落とし所か…。
package com.zaneli.redis.jedis
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisSlotBasedConnectionHandler, Pipeline, Protocol}
import redis.clients.jedis.exceptions.{JedisClusterMaxRedirectionsException, JedisMovedDataException}
import redis.clients.util.JedisClusterCRC16
import scalikejdbc.using
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.{Failure, Try}
class JedisPipelinedCluster(
nodes: Set[HostAndPort],
config: GenericObjectPoolConfig = new GenericObjectPoolConfig(),
timeout: Int = Protocol.DEFAULT_TIMEOUT,
maxRedirections: Int = 5) {
private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout)
def pipelined[A](key: String)(cmd: Pipeline => A): Try[A] =
pipelined(key, cmd, maxRedirections)
@tailrec
private[this] def pipelined[A](key: String, cmd: Pipeline => A, redirections: Int): Try[A] = {
val result = Try {
val slot = JedisClusterCRC16.getSlot(key)
using(handler.getConnectionFromSlot(slot)) { jedis =>
val pipeline = jedis.pipelined()
cmd(pipeline)
}
}
result match {
case Failure(e: JedisMovedDataException) =>
if (redirections <= 0) {
Failure(new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"))
} else {
pipelined(key, cmd, redirections - 1)
}
case r => r
}
}
}
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
import scala.util.{Failure, Success}
object JedisTest3 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
val cluster = new JedisPipelinedCluster(nodes)
val result = cluster.pipelined("key2") { p =>
val v2 = p.get("key2")
val v3 = p.get("key3")
p.sync()
(v2.get, v3.get)
}
result match {
case Success((v2, v3)) => println((v2, v3))
case Failure(e) => e.printStackTrace()
}
}
…うーん、結局ノード特定のためのキーを別途渡しているのが残念…。
cluster.pipelined("key1")
を渡して p.get("key2")
したりすると結局 JedisClusterMaxRedirectionsException
が発生する。
もうちょっと実用的に
Redis Clusterのslotは、キー全体ではなく {...}
で囲んだ部分に絞る事ができる。
つまり、複数キーを同じノードに書きたい場合、キーのフォーマットを {...}
とし、この中の文字列を一致させればいい。
こんなcase classがあり、それぞれの値をRedisに書き込みたいとする。
package com.zaneli.redis.jedis
import scala.collection.JavaConversions._
import scala.util.Try
case class SuperWrestler(id: Int, name: String, power: Int, favoriteHolds: List[String], family: Map[String, String])
object SuperWrestler extends RedisAccessor {
override def prefix(id: Int): String = s"sw$id"
def read(id: Int)(implicit cluster: JedisPipelinedCluster): Try[SuperWrestler] = {
cluster.pipelined(prefix(id)) { p =>
val name = p.get(key(id, "name"))
val power = p.get(key(id, "power"))
val favoriteHolds = p.lrange(key(id, "favoriteHolds"), 0, -1)
val family = p.hgetAll(key(id, "family"))
p.sync()
SuperWrestler(id, name.get, power.get.toInt, favoriteHolds.get.toList, family.get.toMap)
}
}
def write(sw: SuperWrestler)(implicit cluster: JedisPipelinedCluster): Boolean = {
cluster.pipelined(prefix(sw.id)) { p =>
p.set(key(sw.id, "name"), sw.name)
p.set(key(sw.id, "power"), sw.power.toString)
p.del(key(sw.id, "favoriteHolds"), key(sw.id, "family"))
sw.favoriteHolds.map(p.rpush(key(sw.id, "favoriteHolds"), _))
sw.family.map { case (k, v) => p.hset(key(sw.id, "family"), k, v) }
p.sync()
}.isSuccess
}
}
trait RedisAccessor {
def prefix(id: Int): String
def key(id: Int, name: String): String = s"{${prefix(id)}}$name"
}
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
object SuperWrestlerTest {
def main(args: Array[String]) {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
implicit val cluster = new JedisPipelinedCluster(nodes)
val kinnikuman = SuperWrestler(
1, "キン肉マン", 95, List("キン肉バスター", "キン肉ドライバー", "マッスルスパーク"), Map("父" -> "キン肉真弓", "母" -> "キン肉小百合"))
val terryman = SuperWrestler(
2, "テリーマン", 95, List("スピニングトゥホールド", "カーフブランディング", "テキサスコンドルキック"), Map("父" -> "ドリーマン"))
val robinMask = SuperWrestler(
3, "ロビンマスク", 96, List("タワーブリッジ", "ロビンスペシャル"), Map("父" -> "ロビンナイト"))
Seq(kinnikuman, terryman, robinMask) foreach SuperWrestler.write
(1 to 3) foreach (i => println(SuperWrestler.read(i)))
}
}
SuperWrestler
内でのキープレフィックスを固定して、一つの SuperWrestler
の情報は全て同じノードに書く。
こういう用途なら、今回作った JedisPipelinedCluster
を使うと SuperWrestler
読み書きの際には少しは便利、かな…?
ちなみに今回の例では、キン肉マンはport6381, テリーマンはport6379, ロビンマスクはport6380にバラけて書かれた。 :)