# README
Go DelayedQueue
Delayed queue with Redis implementation
Basic concept:
- User redis
sorted set
as thedelayed queue
- When enqueuing data to the delayed queue, we use execution timestamp as the score,
e.g. use 1620199248 as the score (
ZSet
) - When de-queuing data, we use
ZRangeByScore
andZRem
with current timestamp to check if there is data. - When there is nothing in the queue, sleep for a period.
Workflow Chart:
+----------+ +--------------+ +--------------+
| | | | | |
| Caller | | DelayedQueue | | WorkerQueue |
| | | Consumer +----------------+ | Consumers |
| | | | | | |
+----+-----+ +---+-^--------+ | +-----+-^------+
| | | | | |
|1.Add jobs with | |2.Consume and add jobs | | |3.Consume and process the jobs
| a delayed time | | to working queue | | |
| | | | | |
+----v---------------------v-+-----------------------+ | +-----v-+-----------------------------------+
| | | | |
| DelayedQueue(Redis Sorted Set) | +--> WorkingQueue(Redis List)[work_q] |
| +-----------+------------------------------------+ | | +------------------------------------+ |
| | score | member | | | | member | |
| |(timestamp)|[queueName,funcName,{"arg1":"1"}] | | | |[queueName,funcName,{"arg1":"1"}] | |
| +-----------+------------------------------------+ | | +------------------------------------+ |
| |1620299103 |["work_q","CallAPI","{"arg1":"1"}"] | | | |["work_q","CallAPI","{"arg1":"1"}"] | |
| +-----------+------------------------------------+ | | +------------------------------------+ |
| |1620100100 | ... | | | | ... | |
| +-----------+------------------------------------+ | | +------------------------------------+ |
| | | |
+----------------------------------------------------+ +-------------------------------------------+
Chart created by: https://asciiflow.com/
Example is in example folder.
# Packages
No description provided by the author
# Functions
New initializes the delayed queue.
WithCustomZapLogger allows to use custom zap logger.
WithPollingCount overwrites the polling count of delayed queue.
WithPollingInterval overwrites the polling interval of delayed queue.
WithRedisKeyDelayedQueue overwrites the redis key of delayed queue.
WithRedisKeyWorkingQueue overwrites the redis key of working queue.
WithWorkerCount overwrites the worker count (the number of goroutines which poll items from working queue).
# Constants
DefaultMaxPopCount is the number of pulling items from Redis worker queue on each time.
# Structs
DelayedItem see WorkingItem.
Service is the delayed queue service.
WorkingItem [queueName, funcName, {"arg1":"1"}] Reasons why we use custom json Marshaller: 1.
# Type aliases
ServiceOption allows to customize the service's options.