はじめに
AWS SQSは、可視性タイムアウトや、キューやメッセージなど、分散システムでキューを利用しやすい仕組みが構築されている。分散処理をするのは何もAWS上だけではなく、同コンピュータ内のアプリケーション間連携、オンプレ環境など、使いたいシチュエーションが意外とある。今回は、Reidsを使って、SQS的なキュー操作が可能なPythonを用意した。PythonからRedisへアクセスするには、reids-pyを利用した。
作ったもの
キュー操作
1. キューの作成、データのセット(dict型に対応)
redisObj = RediQ()
for i in range(1000):
redisObj.set_message({'value':"a"+str(i),'number':1000-i})
2. キューの長さの確認(可視キュー/不可視キュー)
print(redisObj.check_queue_length())
print(redisObj.check_holdqueue_length())
3. メッセージの取り出し、消費
for i in range((redisObj.check_queue_length())):
mes = redisObj.get_message()
mesId = mes['messageID']
#---something job----
redisObj.consume_message(messageID=mesId)
参考:メッセージ実体
{'queueID': 'rediQ:b4a9e49ccac14106bcc3feaa11a08c6d',
'setTime': '2025-04-05T17:09:54.182529',
'messageID': 'rediQ:b4a9e49ccac14106bcc3feaa11a08c6d_38bdd7576ca446439cd587bff71e818c',
'content': {'value': 'a999', 'number': 1},
'visibleTime': '100'}
今回作ったもの
ReDiQ.py
import redis
import uuid
from datetime import datetime
import json
class RediQ:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
name = 'rediQ:' + str(uuid.uuid4()).replace("-","")
self.QueueName = name
self.holdQueueName = name + "_hold"
self.visibleTime = 100 #visible time
def set_message(self, dictValue):
u_ = self.QueueName + "_" + str(uuid.uuid4()).replace("-","")
self.redis.rpush(self.QueueName, u_)
hv_ = {"content":json.dumps(dictValue)}
hv_["setTime"] = datetime.now().isoformat()
hv_["messageID"] = u_
hv_["queueID"] = self.QueueName
hv_["visibleTime"] = self.visibleTime
for ky in list(hv_.keys()):
self.redis.hset(u_, ky, str(hv_[ky]))
def get_message(self):
self._reset_visiblity()
b_ = self.redis.lpop(self.QueueName)
self.redis.rpush(self.holdQueueName, b_)
r_ = {}
hv_ = self.redis.hgetall(b_)
for ky in list(hv_.keys()):
r_[ky.decode('utf-8')] = hv_[ky].decode('utf-8')
r_["content"] = json.loads(r_["content"])
return r_
def consume_message(self,messageID):
self._reset_visiblity()
if self._check_message_visibility(messageID):
if not(self.redis.exists(messageID) == 0):
self.redis.delete(messageID)
self.redis.lrem(self.holdQueueName, 0, messageID)
else:
pass
def _reset_visiblity(self):
def _reset_visiblity_func():
b_ = self.redis.rpop(self.holdQueueName)
t_ = self.redis.hget(b_, "setTime").decode('utf-8')
v_ = int(self.redis.hget(b_, "visibleTime").decode('utf-8'))
dt = datetime.now() - datetime.fromisoformat(t_)
secFromLastUpdate = dt.total_seconds()
if secFromLastUpdate > v_:
self.redis.rpush(self.QueueName, b_)
else:
self.redis.rpush(self.holdQueueName, b_)
len_ = self.redis.llen(self.holdQueueName)
for _ in range(len_):
_reset_visiblity_func()
def _check_message_visibility(self,mesID):
if self.redis.exists(mesID) == 0:
return False
else:
hv_ = self.redis.hgetall(mesID)
dt = datetime.now() - datetime.fromisoformat(hv_[b"setTime"].decode('utf-8'))
secFromLastUpdate = dt.total_seconds()
if secFromLastUpdate > int(hv_[b"visibleTime"].decode('utf-8')):
return False
else:
return True
def check_queue_length(self):
self._reset_visiblity()
return self.redis.llen(self.QueueName)
def check_holdqueue_length(self):
self._reset_visiblity()
return self.redis.llen(self.holdQueueName)