1
0

More than 3 years have passed since last update.

Memcached で メッセージキューをやってみた

Posted at

python と ruby で情報をやり取りするのになんか良い方法はないかなぁとネットを探してたら「Memcached Queue」というキーワードがありました。
探してみたのですが、随分昔にはあったけど今はもう無いみたいだったので、実装してみました。

Memcached とは

簡単に言うとデータをメモリ上に記憶する NoSQL キャッシュサーバです。
メモリ上なので高速です。ですが、メモリ上なのでデータ量の上限があるのでそんなに大きなデータは扱えません(あ、うちの環境だけ?)
TCP/IPのサービスなので、インストールしておけば開発言語によらず(場合によっては遠くからでも)アクセスできます。

インストールは Mac なら brew とかでします。

$ brew install memcached

非同期の情報共有

情報を共有するためには、共通でアクセスできる「何か」が必要です。
よく使われるのは「ファイル」であったり「データベース」であったりします。

今回は、RaspberryPi3 上で稼働させるシステムを想定していたので、「ファイル」だと SDカードへの読み書きが発生して劣化(最悪起動しなくなる)するし、「データベース」はちょっと面倒かなと思い、memcached にしました。#「ファイル」もなんか面倒なイメージ

あと、時系列的に取り扱いたかったので、メッセージキューという形にしました。これなら何も考えずに扱えそうです。#投げ込んで取り出すだけ。

Rubyで実装

今回は、メッセージキューを複数持てるようにしてみました。

$ gem install memcache-client
memcache_queue.rb
# -*- coding: utf-8 -*-
require "memcache"

class MemcacheQueue

  def initialize(**options)
    defaultops = { :servers => ["localhost"], :namespace => "memcache_queue" }
    options = defaultops.merge(options)

    # memcache configuration
    memcache_servers = options[:servers]
    memcache_namespace = options[:namespace]
    memcache_options = { :namespace => memcache_namespace, :timeout => 30 }
    @memcache = MemCache.new(memcache_servers, memcache_options)

    @expir = 0
  end

  # ---------------------------------------------------------------------------
  def create_queue(key, size = 0)
    if size.to_i.zero?
      raise MemcacheQueueError.new("Can not create zero size queue.")
    end
    maxsize = @memcache.get("#{key}_maxsize", true)
    unless maxsize.nil?
      #raise MemcacheQueueError.new("Exist '#{key}' queue already.")
      return true
    end
    @memcache.set("#{key}_maxsize", size, @expir, true)
    @memcache.set("#{key}_pointer", 0, @expir, true)
    @memcache.set("#{key}_counter", 0, @expir, true)
    size.times do |t|
      @memcache.set("#{key}_#{t}", nil, @expir, true)
    end
    return true
  end

  def delete_queue(key)
    size = @memcache.get("#{key}_maxsize", true)
    unless size.nil?
      size.to_i.times do |t|
        @memcache.delete("#{key}_#{t}")
      end
      @memcache.delete("#{key}_maxsize")
      @memcache.delete("#{key}_pointer")
      @memcache.delete("#{key}_counter")
      return true
    else
      raise MemcacheQueueError.new("No such '#{key}' queue.")
    end
  end
  alias :destroy_queue :delete_queue

  # ---------------------------------------------------------------------------
  # add data to queue
  def enqueue(key, value)
    maxsize = @memcache.get("#{key}_maxsize", true)
    if maxsize.nil?
      raise MemcacheQueueError.new("No such '#{key}' queue.")
    end
    maxsize = maxsize.to_i
    pointer = @memcache.get("#{key}_pointer", true).to_i
    counter = @memcache.get("#{key}_counter", true).to_i
    if counter >= maxsize
      raise MemcacheQueueError.new("Out of size '#{key}' qeueu.")
    end
    nextpoint = (pointer + counter) % maxsize
    counter += 1
    @memcache.set("#{key}_#{nextpoint}", value, @expir, true)
    @memcache.set("#{key}_counter", counter, @expir, true)
  end

  # get data from queue
  def dequeue(key)
    maxsize = @memcache.get("#{key}_maxsize", true)
    if maxsize.nil?
      raise MemcacheQueueError.new("No such '#{key}' queue.")
    end
    maxsize = maxsize.to_i
    pointer = @memcache.get("#{key}_pointer", true).to_i
    counter = @memcache.get("#{key}_counter", true).to_i   
    return nil if counter.to_i <= 0
    value = @memcache.get("#{key}_#{pointer}", true)
    @memcache.set("#{key}_#{pointer}", nil, @expir, true)
    pointer = (pointer + 1) % maxsize
    counter -= 1
    @memcache.set("#{key}_pointer", pointer, @expir, true)
    @memcache.set("#{key}_counter", counter, @expir, true)
    return value
  end

  def length(key)
    counter = @memcache.get("#{key}_counter", true)
    if counter.nil?
      raise MemcacheQueueError.new("No such '#{key}' queue.")
    end
    return counter.to_i
  end
  alias :size :length

end

class MemcacheQueueError < StandardError
end

ポイントは、 @memcache.get(key, raw) の raw を true にするところです。標準では false になっているので、それだと ruby の memcache-client はデータを Marshal.load して読み込むらしく、他の言語で保存したデータを正しく読み取れなくなります。
保存の時も同様に raw = true で保存します。

#あ、使うときは特に意識しなくても大丈夫ですよ。

使い方

test.rb
# -*- coding: utf-8 -*-
require_relative "./memcache_queue"

# run test :p
mq = MemcacheQueue.new

qkey = "hoge"
qsize = 10
mq.create_queue(qkey, qsize) # キー と サイズ を指定してキューを作成

mq.enqueue(qkey, "Hello") # キューに "Hello" を保存

val = mq.dequeue(qkey) # キューからデータを取り出す
puts "  dequeue -> #{val}"

# キューに保存されているデータを全部取得
while mq.size(qkey) > 0
  val = mq.dequeue(qkey)
  puts "  dequeue -> #{val}"
end

mq.delete_queue(qkey) # 使わなくなったキューは削除

Pythonで実装

続いて python での実装です。#python3 です。

$ pip install python-memcached
memcache_queue.py
# -*- coding: utf-8 -*-
import hashlib
import memcache

class MemcacheQueueError(Exception):
    pass

class MemcacheQueue:
    def __init__(self, ):
        memcache_servers = ["localhost:11211"]
        memcache_namespace = "memcache_queue"
        self.memcache = memcache.Client(memcache_servers)

        self.autofix_keys = True       # default is true
        self.namespace_separator = ":" # default ':'
        self.namespace = memcache_namespace

    def make_cache_key(self, key):
        kkey = "{}{}{}".format(self.namespace, self.namespace_separator, key)
        if self.autofix_keys and (key.find(" ") > -1 or len(kkey) > 250):
            hkey_obj = hashlib.sha1(b"{}".format(key))
            hkey = hkey_obj.hexdigest()
            key = "{}-autofixed".format(hkey)

        if self.namespace is None:
            return key
        else:
            return "{}{}{}".format(self.namespace,
                                   self.namespace_separator, key)

    # -----------------------------------------------------------------------
    def create_queue(self, key, size):
        if size == 0:
            raise MemcacheQueueError("Can not create zero size queue.")

        mkey = self.make_cache_key("{}_maxsize".format(key))
        maxsize = self.memcache.get("{}".format(mkey))
        if maxsize is not None:
            #raise MemcacheQueueError("Exist '{}' queue already.".format(key))
            return True

        self.memcache.set("{}".format(mkey), size)
        pkey = self.make_cache_key("{}_pointer".format(key))
        self.memcache.set("{}".format(pkey), 0)
        ckey = self.make_cache_key("{}_counter".format(key))
        self.memcache.set("{}".format(ckey), 0)
        for t in range(size):
            nkey = self.make_cache_key("{}_{}".format(key, t))
            self.memcache.set("{}".format(nkey), None)

        return True

    def delete_queue(self, key):
        mkey = self.make_cache_key("{}_maxsize".format(key))
        size = self.memcache.get("{}".format(mkey))
        if size is not None:
            for t in range(int(size)):
                nkey = self.make_cache_key("{}_{}".format(key, t))
                self.memcache.delete("{}".format(nkey))

            self.memcache.delete("{}".format(mkey))
            pkey = self.make_cache_key("{}_pointer".format(key))
            self.memcache.delete("{}".format(pkey))
            ckey = self.make_cache_key("{}_counter".format(key))
            self.memcache.delete("{}".format(ckey))
            return True
        else:
            raise MemcacheQueueError("No such '{}' queue.".format(key))

    # -----------------------------------------------------------------------
    # add data to queue
    def enqueue(self, key, value):
        mkey = self.make_cache_key(key)
        maxsize = self.memcache.get("{}_maxsize".format(mkey))
        if maxsize is None:
            raise MemcacheQueueError("No such '{}' queue.".format(key))

        maxsize = int(maxsize)
        pkey = self.make_cache_key("{}_pointer".format(key))
        pointer = int(self.memcache.get("{}".format(pkey)))
        ckey = self.make_cache_key("{}_counter".format(key))
        counter = int(self.memcache.get("{}".format(ckey)))
        if counter >= maxsize:
            raise MemcacheQueueError("Out of size '{}' qeueu.".format(key))

        nextpoint = (pointer + counter) % maxsize
        counter += 1
        nkey = self.make_cache_key("{}_{}".format(key, nextpoint))
        self.memcache.set("{}".format(nkey), value)
        self.memcache.set("{}".format(ckey), counter)

    # get data from queue
    def dequeue(self, key):
        mkey = self.make_cache_key("{}_maxsize".format(key))
        maxsize = self.memcache.get("{}".format(mkey))
        if maxsize is None:
            raise MemcacheQueueError.new("No such '{}' queue.".format(key))

        maxsize = int(maxsize)
        pkey = self.make_cache_key("{}_pointer".format(key))
        pointer = int(self.memcache.get("{}".format(pkey)))
        ckey = self.make_cache_key("{}_counter".format(key))
        counter = int(self.memcache.get("{}".format(ckey)))
        if counter <= 0:
            return None

        nkey = self.make_cache_key("{}_{}".format(key, pointer))
        value = self.memcache.get("{}".format(nkey))
        self.memcache.set("{}".format(nkey), None)
        pointer = (pointer + 1) % maxsize
        counter -= 1
        self.memcache.set("{}".format(pkey), pointer)
        self.memcache.set("{}".format(ckey), counter)
        return value

    def length(self, key):
        ckey = self.make_cache_key("{}_counter".format(key))
        counter = self.memcache.get("{}".format(ckey))
        if counter is None:
            raise MemcacheQueueError("No such '{}' queue.".format(key))
        return int(counter)

    def size(self, key):
        return self.length(key)    

ポイントはいくつかありまして、まず、python の memcache には namespace という概念が無いのですね。#というか memcached 自体にそう言うものがない。
memcached は ruby からしか使ったことがなかったので、ちょっと焦りましたが、キー を作る部分をごっそり持ってきて解決しました。
def make_cache_key(self, key): のところ。
この関数を通してキーを作成してアクセスすることで、Ruby側とスムーズにデータのやり取りを行えるようになります。

#あ、こちらも使うときは特に意識しなくても大丈夫ですよ。

使い方

test.py
# -*- coding: utf-8 -*-
import memcache_queue

# run test
mq = memcache_queue.MemcacheQueue()

qkey = "hoge"
qsize = 10
mq.create_queue(qkey, qsize) # キー と サイズ を指定してキューを作成

mq.enqueue(qkey, "Hello") # キューに "Hello" を保存

val = mq.dequeue(qkey) # キューから保存したデータを取得
print("  dequeue -> {}".format(val))

# キューに保存されているデータを全部取得
while mq.size(qkey) > 0:
    val = mq.dequeue(qkey)
    print("  dequeue -> {}".format(val))

mq.delete_queue(qkey) # 使わなくなったキューは削除

まとめ

これで memcached を介して ruby と python でデータのやり取りを行うことができるようになりました。あとはルールを決めて(例えば、Ruby→Python用、Python→Ruby用の二つのキューを作って使い分ける、など)アクセスすればより便利になるのではないでしょうか。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0