Categorygithub.com/pgujjeti/go-distr-ep
modulepackage
0.2.2-alpha
Repository: https://github.com/pgujjeti/go-distr-ep.git
Documentation: pkg.go.dev

# README

go-distr-ep

This project provides a distributed event processor framework for golang. This framework would be useful for cases where you need to process events sequentially, for a given key.

go-distr-ep uses redis for persisting events (fault-tolerance) and for distributing them among all the participating processors.

Features

  • Events are processed in order, for a given key
  • Events for a given key are processed sequentially (i.e. not concurrently)
  • Events can be scheduled to be executed after a delay
  • Fault-tolerance (Events are persisted to an external cache, redis)

Dependencies

Example

type TestCallbackImpl struct {
	callbackName string
}

func (t *TestCallbackImpl) ProcessEvent(key string, val interface{}, start bool) bool {
	if start {
		// start key processing - initialize things like cache, etc that you need
		log.Infof("Start Processing %s", key)
	}

	// process event
	return false
	// return true when processing is completed for this key
	// return true
}

func main() {
	client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	defer client.Close()

	callbackImpl := &TestCallbackImpl{callbackName: name}
	dep := &distr_ep.DistributedEventProcessor{
		RedisClient: client,
		Namespace:   "test1",
		// LockTTL:     time.Second * 1000,
		CleanupDur:  time.Second * 1,
		Callback:    callbackImpl,
		LogLevel:    log.InfoLevel,
		// AtLeastOnce: true,
		Scheduling:  true,
	}
	if err := dep.Init(); err != nil {
		log.Errorf("error initializing.. %v", err)
		return
	}

	// Add an event to processing
	// Start: true to indicate this event is a trigger event for processing 
	evt := &distr_ep.DistrEvent{Key: "key1", Val: val, Start: true}
	dep.AddEvent(evt))
	// Schedule an event for processing after 5 seconds
	dep.ScheduleEvent(evt, time.Second * 5)
}

Local Test

You can run a local redis cluster using grokzen/redis-cluster

docker run -d --name redis-cluster -e "IP=0.0.0.0" -p 7000-7005:7000-7005 grokzen/redis-cluster:latest

Note: On MacOS, port 7000 is alredy bound by AirPlay receiver

Design

The Distributed Event Processor (DEP) framework's goals are:

  • Distributed: Events can be submitted by multiple clients
  • Sequential: Events should be processed in-order, for a given key
  • Resilient: Event processing should be resilient (fail-over)
  • Sticky: Process events for a given key by a given processor, within the prescribed TTL

Components

DEP is designed to be a simple framework with minimal overhead. It uses Redis to store events and to manage distributed locks.

We use go-redis and redsync libraries for Redis client operations and distributed lock, respectively

Event Submission

When a new event is submitted for a given key

  • If this is a start event, the Key is added to list of PendingKeys
  • The event pushed to the end of a redis LIST specific to this Key
  • The TTL of the LIST key is renewed to the configured value

PendingKey Consumer

PendingKeys Consumer's job is to monitor for any pending keys that need to be processed. It runs on all the participating nodes/processors.

  • Move a Key from PendingKeys to a consumer specific LIST
  • Kick off the Key Processor for this Key
  • Remove the Key from the consumer specific LIST

Key Processor

Key processor attempts to acquire the lock for the specified key and if successful, processes the pending events

  • Attempt to acquire redis lock for this Key
  • If Key lock is NOT acquired: stops processing
  • If Key lock is acquired, adds this Key to the processor's list of active Keys
  • Start processing events from the Key's event LIST. It uses a blocking wait of EventPollTimeout (default 1 hour)
  • If there are no events after EventPollTimeout, stop the processor
  • For each event in the list (atmost-once sematics):
    • Pop the event
    • Invoke the Callback.ProcessEvent(key, val) as a go-routine
    • Rewew the redis Key lock, while the event is being processed
    • If ProcessEvent() returns true to indicate that this key is now completely processed, stop the processor
  • When processing stops for the Key:
    • Release the Key lock
    • Remove the Key from the processor's list of active keys

Key Lock

The redis key lock is the central piece to achieve sequential processing. It allows us to prevent concurrent execution for the same key. Only one event processor at a time can acquire the lock and thus process events for that key.

We use the open source library redsync for distributed locks

Client Monitor

Client Monitor is a background process that monitors any Clients that might have terminated midway.

  • Each Client does a periodic checkin, updating its TTL in a redis ZSET
  • If a Client is detected as inactive, all the Keys that it was processing shall be re-submitted to the Pending Keys list (which is de-queued by Pending Key Consumers)

Scheduled Events (Optional feature)

DEP also supports scheduled events. The Event Scheduler runs a periodic check for scheduled events. It also uses redis ZSET (like the Key Monitor) to monitor for events that are expriring.

Redis Keys

Following is a table of Redis keys used

ItemRedis KeyRemarks
Key Event Listdep:{DEP_NS}:key-el:{KEY}Redis LIST to store events by Key
Key Lockdep:{DEP_NS}:key-lk:{KEY}Redis lock name buy Key
Pending Keys{dep:{DEP_NS}:pk}-pendingRedis LIST for pending Keys that need to be processed
Client: Pending Keys Offload{dep:{DEP_NS}:pk}-ol:{CLIENT_ID}Redis LIST for offloaded pending Keys (Client specific)
Client: Active Keysdep:{DEP_NS}:pk-active:{CLIENT_ID}Redis List for Client's Active Keys
Client Monitor: Active Clients ZSetdep:{DEP_NS}:mon-zsetClient Monitor: Redis ZSet. Clients update their TTL periodically
Client Monitor: Lockdep:{DEP_NS}:mon-zset:lkClient Monitor Lock. Required to run the monitoring job
Scheduled Events ZSetdep:{DEP_NS}:sch-zsetScheduler Events ZSet: Expiry is determined by Epoch
Scheduled Event Payload HSetdep:{DEP_NS}:sch-hsetScheduler Events Payload
Scheduler Lockdep:{DEP_NS}:sch-zset:lkSchduler Job Lock

# Packages

No description provided by the author

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
https://redis.io/docs/reference/cluster-spec/#hash-tags REDIS_PK_HASH_PREFIX = "{dep:%s:pk-}".
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author