0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS SQS的なキュー操作をRedis上で

Posted at

はじめに

AWS SQSは、可視性タイムアウトや、キューやメッセージなど、分散システムでキューを利用しやすい仕組みが構築されている。分散処理をするのは何もAWS上だけではなく、同コンピュータ内のアプリケーション間連携、オンプレ環境など、使いたいシチュエーションが意外とある。今回は、Reidsを使って、SQS的なキュー操作が可能なPythonを用意した。PythonからRedisへアクセスするには、reids-pyを利用した。

作ったもの

キュー操作

1. キューの作成、データのセット(dict型に対応)

redisObj = RediQ()    
for i in range(1000):
    redisObj.set_message({'value':"a"+str(i),'number':1000-i})

2. キューの長さの確認(可視キュー/不可視キュー)

print(redisObj.check_queue_length())
print(redisObj.check_holdqueue_length())

3. メッセージの取り出し、消費

for i in range((redisObj.check_queue_length())):
    mes = redisObj.get_message()
    mesId = mes['messageID']
    #---something job----
    redisObj.consume_message(messageID=mesId)

参考:メッセージ実体

{'queueID': 'rediQ:b4a9e49ccac14106bcc3feaa11a08c6d',
 'setTime': '2025-04-05T17:09:54.182529',
 'messageID': 'rediQ:b4a9e49ccac14106bcc3feaa11a08c6d_38bdd7576ca446439cd587bff71e818c',
 'content': {'value': 'a999', 'number': 1},
 'visibleTime': '100'}

今回作ったもの

ReDiQ.py
import redis
import uuid
from datetime import datetime
import json
class RediQ:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        name = 'rediQ:' + str(uuid.uuid4()).replace("-","")
        self.QueueName = name
        self.holdQueueName = name + "_hold"
        self.visibleTime = 100 #visible time

    def set_message(self, dictValue):
        u_ = self.QueueName + "_" + str(uuid.uuid4()).replace("-","")
        self.redis.rpush(self.QueueName, u_)
        hv_ = {"content":json.dumps(dictValue)}
        hv_["setTime"] = datetime.now().isoformat()
        hv_["messageID"] = u_
        hv_["queueID"] = self.QueueName
        hv_["visibleTime"] = self.visibleTime
        for ky in list(hv_.keys()):
            self.redis.hset(u_, ky, str(hv_[ky]))

    def get_message(self):
        self._reset_visiblity()
        b_ = self.redis.lpop(self.QueueName)
        self.redis.rpush(self.holdQueueName, b_)
        r_ = {}
        hv_ = self.redis.hgetall(b_)
        for ky in list(hv_.keys()):
            r_[ky.decode('utf-8')] = hv_[ky].decode('utf-8')
        r_["content"] = json.loads(r_["content"])
        return r_
    
    def consume_message(self,messageID):
        self._reset_visiblity()
        if self._check_message_visibility(messageID):    
            if not(self.redis.exists(messageID) == 0):
                self.redis.delete(messageID)
                self.redis.lrem(self.holdQueueName, 0, messageID)
        else:
            pass
    
    def _reset_visiblity(self):
        def _reset_visiblity_func():
            b_ = self.redis.rpop(self.holdQueueName)
            t_ = self.redis.hget(b_, "setTime").decode('utf-8')
            v_ = int(self.redis.hget(b_, "visibleTime").decode('utf-8'))
            dt = datetime.now() - datetime.fromisoformat(t_)
            secFromLastUpdate = dt.total_seconds()
            if secFromLastUpdate > v_:
                self.redis.rpush(self.QueueName, b_)
            else:
                self.redis.rpush(self.holdQueueName, b_)
        len_ = self.redis.llen(self.holdQueueName)
        for _ in range(len_):                
            _reset_visiblity_func()

    def _check_message_visibility(self,mesID):
        if self.redis.exists(mesID) == 0:
            return False
        else:
            hv_ = self.redis.hgetall(mesID)
            dt = datetime.now() - datetime.fromisoformat(hv_[b"setTime"].decode('utf-8'))
            secFromLastUpdate = dt.total_seconds()
            if secFromLastUpdate > int(hv_[b"visibleTime"].decode('utf-8')):
                return False
            else:
                return True

    def check_queue_length(self):
        self._reset_visiblity()
        return self.redis.llen(self.QueueName)
    
    def check_holdqueue_length(self):
        self._reset_visiblity()
        return self.redis.llen(self.holdQueueName)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?