python と ruby で情報をやり取りするのになんか良い方法はないかなぁとネットを探してたら「Memcached Queue」というキーワードがありました。
探してみたのですが、随分昔にはあったけど今はもう無いみたいだったので、実装してみました。
Memcached とは
簡単に言うとデータをメモリ上に記憶する NoSQL キャッシュサーバです。
メモリ上なので高速です。ですが、メモリ上なのでデータ量の上限があるのでそんなに大きなデータは扱えません(あ、うちの環境だけ?)
TCP/IPのサービスなので、インストールしておけば開発言語によらず(場合によっては遠くからでも)アクセスできます。
インストールは Mac なら brew とかでします。
$ brew install memcached
非同期の情報共有
情報を共有するためには、共通でアクセスできる「何か」が必要です。
よく使われるのは「ファイル」であったり「データベース」であったりします。
今回は、RaspberryPi3 上で稼働させるシステムを想定していたので、「ファイル」だと SDカードへの読み書きが発生して劣化(最悪起動しなくなる)するし、「データベース」はちょっと面倒かなと思い、memcached にしました。#「ファイル」もなんか面倒なイメージ
あと、時系列的に取り扱いたかったので、メッセージキューという形にしました。これなら何も考えずに扱えそうです。#投げ込んで取り出すだけ。
Rubyで実装
今回は、メッセージキューを複数持てるようにしてみました。
$ gem install memcache-client
# -*- coding: utf-8 -*-
require "memcache"
class MemcacheQueue
def initialize(**options)
defaultops = { :servers => ["localhost"], :namespace => "memcache_queue" }
options = defaultops.merge(options)
# memcache configuration
memcache_servers = options[:servers]
memcache_namespace = options[:namespace]
memcache_options = { :namespace => memcache_namespace, :timeout => 30 }
@memcache = MemCache.new(memcache_servers, memcache_options)
@expir = 0
end
# ---------------------------------------------------------------------------
def create_queue(key, size = 0)
if size.to_i.zero?
raise MemcacheQueueError.new("Can not create zero size queue.")
end
maxsize = @memcache.get("#{key}_maxsize", true)
unless maxsize.nil?
#raise MemcacheQueueError.new("Exist '#{key}' queue already.")
return true
end
@memcache.set("#{key}_maxsize", size, @expir, true)
@memcache.set("#{key}_pointer", 0, @expir, true)
@memcache.set("#{key}_counter", 0, @expir, true)
size.times do |t|
@memcache.set("#{key}_#{t}", nil, @expir, true)
end
return true
end
def delete_queue(key)
size = @memcache.get("#{key}_maxsize", true)
unless size.nil?
size.to_i.times do |t|
@memcache.delete("#{key}_#{t}")
end
@memcache.delete("#{key}_maxsize")
@memcache.delete("#{key}_pointer")
@memcache.delete("#{key}_counter")
return true
else
raise MemcacheQueueError.new("No such '#{key}' queue.")
end
end
alias :destroy_queue :delete_queue
# ---------------------------------------------------------------------------
# add data to queue
def enqueue(key, value)
maxsize = @memcache.get("#{key}_maxsize", true)
if maxsize.nil?
raise MemcacheQueueError.new("No such '#{key}' queue.")
end
maxsize = maxsize.to_i
pointer = @memcache.get("#{key}_pointer", true).to_i
counter = @memcache.get("#{key}_counter", true).to_i
if counter >= maxsize
raise MemcacheQueueError.new("Out of size '#{key}' qeueu.")
end
nextpoint = (pointer + counter) % maxsize
counter += 1
@memcache.set("#{key}_#{nextpoint}", value, @expir, true)
@memcache.set("#{key}_counter", counter, @expir, true)
end
# get data from queue
def dequeue(key)
maxsize = @memcache.get("#{key}_maxsize", true)
if maxsize.nil?
raise MemcacheQueueError.new("No such '#{key}' queue.")
end
maxsize = maxsize.to_i
pointer = @memcache.get("#{key}_pointer", true).to_i
counter = @memcache.get("#{key}_counter", true).to_i
return nil if counter.to_i <= 0
value = @memcache.get("#{key}_#{pointer}", true)
@memcache.set("#{key}_#{pointer}", nil, @expir, true)
pointer = (pointer + 1) % maxsize
counter -= 1
@memcache.set("#{key}_pointer", pointer, @expir, true)
@memcache.set("#{key}_counter", counter, @expir, true)
return value
end
def length(key)
counter = @memcache.get("#{key}_counter", true)
if counter.nil?
raise MemcacheQueueError.new("No such '#{key}' queue.")
end
return counter.to_i
end
alias :size :length
end
class MemcacheQueueError < StandardError
end
ポイントは、 @memcache.get(key, raw)
の raw を true
にするところです。標準では false
になっているので、それだと ruby の memcache-client はデータを Marshal.load して読み込むらしく、他の言語で保存したデータを正しく読み取れなくなります。
保存の時も同様に raw = true
で保存します。
#あ、使うときは特に意識しなくても大丈夫ですよ。
使い方
# -*- coding: utf-8 -*-
require_relative "./memcache_queue"
# run test :p
mq = MemcacheQueue.new
qkey = "hoge"
qsize = 10
mq.create_queue(qkey, qsize) # キー と サイズ を指定してキューを作成
mq.enqueue(qkey, "Hello") # キューに "Hello" を保存
val = mq.dequeue(qkey) # キューからデータを取り出す
puts " dequeue -> #{val}"
# キューに保存されているデータを全部取得
while mq.size(qkey) > 0
val = mq.dequeue(qkey)
puts " dequeue -> #{val}"
end
mq.delete_queue(qkey) # 使わなくなったキューは削除
Pythonで実装
続いて python での実装です。#python3 です。
$ pip install python-memcached
# -*- coding: utf-8 -*-
import hashlib
import memcache
class MemcacheQueueError(Exception):
pass
class MemcacheQueue:
def __init__(self, ):
memcache_servers = ["localhost:11211"]
memcache_namespace = "memcache_queue"
self.memcache = memcache.Client(memcache_servers)
self.autofix_keys = True # default is true
self.namespace_separator = ":" # default ':'
self.namespace = memcache_namespace
def make_cache_key(self, key):
kkey = "{}{}{}".format(self.namespace, self.namespace_separator, key)
if self.autofix_keys and (key.find(" ") > -1 or len(kkey) > 250):
hkey_obj = hashlib.sha1(b"{}".format(key))
hkey = hkey_obj.hexdigest()
key = "{}-autofixed".format(hkey)
if self.namespace is None:
return key
else:
return "{}{}{}".format(self.namespace,
self.namespace_separator, key)
# -----------------------------------------------------------------------
def create_queue(self, key, size):
if size == 0:
raise MemcacheQueueError("Can not create zero size queue.")
mkey = self.make_cache_key("{}_maxsize".format(key))
maxsize = self.memcache.get("{}".format(mkey))
if maxsize is not None:
#raise MemcacheQueueError("Exist '{}' queue already.".format(key))
return True
self.memcache.set("{}".format(mkey), size)
pkey = self.make_cache_key("{}_pointer".format(key))
self.memcache.set("{}".format(pkey), 0)
ckey = self.make_cache_key("{}_counter".format(key))
self.memcache.set("{}".format(ckey), 0)
for t in range(size):
nkey = self.make_cache_key("{}_{}".format(key, t))
self.memcache.set("{}".format(nkey), None)
return True
def delete_queue(self, key):
mkey = self.make_cache_key("{}_maxsize".format(key))
size = self.memcache.get("{}".format(mkey))
if size is not None:
for t in range(int(size)):
nkey = self.make_cache_key("{}_{}".format(key, t))
self.memcache.delete("{}".format(nkey))
self.memcache.delete("{}".format(mkey))
pkey = self.make_cache_key("{}_pointer".format(key))
self.memcache.delete("{}".format(pkey))
ckey = self.make_cache_key("{}_counter".format(key))
self.memcache.delete("{}".format(ckey))
return True
else:
raise MemcacheQueueError("No such '{}' queue.".format(key))
# -----------------------------------------------------------------------
# add data to queue
def enqueue(self, key, value):
mkey = self.make_cache_key(key)
maxsize = self.memcache.get("{}_maxsize".format(mkey))
if maxsize is None:
raise MemcacheQueueError("No such '{}' queue.".format(key))
maxsize = int(maxsize)
pkey = self.make_cache_key("{}_pointer".format(key))
pointer = int(self.memcache.get("{}".format(pkey)))
ckey = self.make_cache_key("{}_counter".format(key))
counter = int(self.memcache.get("{}".format(ckey)))
if counter >= maxsize:
raise MemcacheQueueError("Out of size '{}' qeueu.".format(key))
nextpoint = (pointer + counter) % maxsize
counter += 1
nkey = self.make_cache_key("{}_{}".format(key, nextpoint))
self.memcache.set("{}".format(nkey), value)
self.memcache.set("{}".format(ckey), counter)
# get data from queue
def dequeue(self, key):
mkey = self.make_cache_key("{}_maxsize".format(key))
maxsize = self.memcache.get("{}".format(mkey))
if maxsize is None:
raise MemcacheQueueError.new("No such '{}' queue.".format(key))
maxsize = int(maxsize)
pkey = self.make_cache_key("{}_pointer".format(key))
pointer = int(self.memcache.get("{}".format(pkey)))
ckey = self.make_cache_key("{}_counter".format(key))
counter = int(self.memcache.get("{}".format(ckey)))
if counter <= 0:
return None
nkey = self.make_cache_key("{}_{}".format(key, pointer))
value = self.memcache.get("{}".format(nkey))
self.memcache.set("{}".format(nkey), None)
pointer = (pointer + 1) % maxsize
counter -= 1
self.memcache.set("{}".format(pkey), pointer)
self.memcache.set("{}".format(ckey), counter)
return value
def length(self, key):
ckey = self.make_cache_key("{}_counter".format(key))
counter = self.memcache.get("{}".format(ckey))
if counter is None:
raise MemcacheQueueError("No such '{}' queue.".format(key))
return int(counter)
def size(self, key):
return self.length(key)
ポイントはいくつかありまして、まず、python の memcache には namespace
という概念が無いのですね。#というか memcached 自体にそう言うものがない。
memcached は ruby からしか使ったことがなかったので、ちょっと焦りましたが、キー を作る部分をごっそり持ってきて解決しました。
#def make_cache_key(self, key):
のところ。
この関数を通してキーを作成してアクセスすることで、Ruby側とスムーズにデータのやり取りを行えるようになります。
#あ、こちらも使うときは特に意識しなくても大丈夫ですよ。
使い方
# -*- coding: utf-8 -*-
import memcache_queue
# run test
mq = memcache_queue.MemcacheQueue()
qkey = "hoge"
qsize = 10
mq.create_queue(qkey, qsize) # キー と サイズ を指定してキューを作成
mq.enqueue(qkey, "Hello") # キューに "Hello" を保存
val = mq.dequeue(qkey) # キューから保存したデータを取得
print(" dequeue -> {}".format(val))
# キューに保存されているデータを全部取得
while mq.size(qkey) > 0:
val = mq.dequeue(qkey)
print(" dequeue -> {}".format(val))
mq.delete_queue(qkey) # 使わなくなったキューは削除
まとめ
これで memcached を介して ruby と python でデータのやり取りを行うことができるようになりました。あとはルールを決めて(例えば、Ruby→Python用、Python→Ruby用の二つのキューを作って使い分ける、など)アクセスすればより便利になるのではないでしょうか。