Categorygithub.com/kerkerj/delayedqueue
modulepackage
1.0.1
Repository: https://github.com/kerkerj/delayedqueue.git
Documentation: pkg.go.dev

# README

Go DelayedQueue

Go Reference Actions Status codecov

Delayed queue with Redis implementation

ref: https://redislabs.com/ebook/part-2-core-concepts/chapter-6-application-components-in-redis/6-4-task-queues/6-4-2-delayed-tasks/

Basic concept:

  1. User redis sorted set as the delayed queue
  2. When enqueuing data to the delayed queue, we use execution timestamp as the score, e.g. use 1620199248 as the score (ZSet)
  3. When de-queuing data, we use ZRangeByScore and ZRem with current timestamp to check if there is data.
  4. When there is nothing in the queue, sleep for a period.
Workflow Chart:
+----------+           +--------------+                   +--------------+
|          |           |              |                   |              |
|  Caller  |           | DelayedQueue |                   | WorkerQueue  |
|          |           | Consumer     +----------------+  | Consumers    |
|          |           |              |                |  |              |
+----+-----+           +---+-^--------+                |  +-----+-^------+
     |                     | |                         |        | |
     |1.Add jobs with      | |2.Consume and add jobs   |        | |3.Consume and process the jobs
     | a delayed time      | | to working queue        |        | |
     |                     | |                         |        | |
+----v---------------------v-+-----------------------+ |  +-----v-+-----------------------------------+
|                                                    | |  |                                           |
|  DelayedQueue(Redis Sorted Set)                    | +-->  WorkingQueue(Redis List)[work_q]         |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
| | score     | member                             | |    | | member                             |    |
| |(timestamp)|[queueName,funcName,{"arg1":"1"}]   | |    | |[queueName,funcName,{"arg1":"1"}]   |    |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
| |1620299103 |["work_q","CallAPI","{"arg1":"1"}"] | |    | |["work_q","CallAPI","{"arg1":"1"}"] |    |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
| |1620100100 | ...                                | |    | | ...                                |    |
| +-----------+------------------------------------+ |    | +------------------------------------+    |
|                                                    |    |                                           |
+----------------------------------------------------+    +-------------------------------------------+

Chart created by: https://asciiflow.com/

Example is in example folder.

# Packages

No description provided by the author

# Functions

New initializes the delayed queue.
WithCustomZapLogger allows to use custom zap logger.
WithPollingCount overwrites the polling count of delayed queue.
WithPollingInterval overwrites the polling interval of delayed queue.
WithRedisKeyDelayedQueue overwrites the redis key of delayed queue.
WithRedisKeyWorkingQueue overwrites the redis key of working queue.
WithWorkerCount overwrites the worker count (the number of goroutines which poll items from working queue).

# Constants

DefaultMaxPopCount is the number of pulling items from Redis worker queue on each time.

# Structs

DelayedItem see WorkingItem.
Service is the delayed queue service.
WorkingItem [queueName, funcName, {"arg1":"1"}] Reasons why we use custom json Marshaller: 1.

# Type aliases

ServiceOption allows to customize the service's options.