Categorygithub.com/ahmedkamals/eventually
modulepackage
1.0.0
Repository: https://github.com/ahmedkamals/eventually.git
Documentation: pkg.go.dev

# README

Eventually CircleCI

license release Travis CI Coverage Status codecov GolangCI Go Report Card Codacy Badge GoDoc DepShield Badge FOSSA Status Join the chat at https://gitter.im/ahmedkamals/eventually

 _____                 _               _ _
| ____|_   _____ _ __ | |_ _   _  __ _| | |_   _
|  _| \ \ / / _ \ '_ \| __| | | |/ _` | | | | | |
| |___ \ V /  __/ | | | |_| |_| | (_| | | | |_| |
|_____| \_/ \___|_| |_|\__|\__,_|\__,_|_|_|\__, |
                                           |___/

is a library that helps to implement CQRS by having an event bus that facilitates the Pub-Sub operations.

Table of Contents

✨ Features

  • Pub/Sub model.
  • Delayed publish.
  • Scheduled publish.

🏎️ Getting Started

Prerequisites

Installation

go get -u github.com/ahmedkamals/eventually

Examples

Main

// main.go
package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"time"

	"github.com/ahmedkamals/colorize"
	"github.com/ahmedkamals/eventually"
)

type (
	eventLogger struct {
		logChan chan string
	}

	errorQueue struct {
		errChan chan error
	}

	universalPayload struct{}
)

const (
	// MailboxDefaultSize defines size for the consumer mail box.
	MailboxDefaultSize = 10
	universal          = "universal"
)

var (
	colorized = colorize.NewColorable(os.Stdout)
)

func newEventLogger(logChan chan string) eventually.Logger {
	return &eventLogger{
		logChan: logChan,
	}
}

func (e *eventLogger) Log(message string) {
	select {
	case e.logChan <- message:
	// Drop any log message that exceeds the log queue size.
	default:
	}
}

func newErrorQueue(errChan chan error) eventually.ErrorQueue {
	return &errorQueue{
		errChan: errChan,
	}
}

func (e *errorQueue) Report(err error) {
	select {
	case e.errChan <- err:
	// Drop any error message that exceeds the error queue size.
	default:
	}
}

func main() {
	logChan := make(chan string, 10)
	errChan := make(chan error, 10)

	go monitorLogMessages(logChan)
	go monitorErrors(errChan)

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	eventBus := eventually.NewBus(eventually.NewEventStore(), newEventLogger(logChan), newErrorQueue(errChan), 100)
	eventBus.Run(ctx)

	topics := map[string]eventually.Descriptor{
		universal:         eventually.NewDescriptor(universal, new(universalPayload), 1),
		"example":         eventually.NewDescriptor("example", "examplePayload", 1),
		"anotherExample":  eventually.NewDescriptor("anotherExample", "anotherExamplePayload", 2),
		"publishAfter":    eventually.NewDescriptor("publishAfter", "publishAfterPayload", 1),
		"schedulePublish": eventually.NewDescriptor("schedulePublish", "schedulePublishPayload", 1),
	}

	topicsConsumersMap := map[eventually.Descriptor][]eventually.Consumer{
		topics[universal]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["example"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["anotherExample"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["publishAfter"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["schedulePublish"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
	}

	subscribe(eventBus, topicsConsumersMap, errChan)

	eventBus.Publish(topics["example"], topics["anotherExample"])
	eventBus.PublishAfter(ctx, time.Second, topics["publishAfter"])
	eventBus.SchedulePublish(ctx, time.NewTicker(800*time.Millisecond), topics["schedulePublish"])

	<-time.After(88 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.White("After publishing"), eventBus.Length(), eventBus.Topics())

	eventBus.Unsubscribe(topicsConsumersMap[topics["example"]][0], topics["example"])
	<-time.After(88 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.Magenta("After unsubscribing"), eventBus.Length(), eventBus.Topics())

	eventBus.Unregister(topics["anotherExample"])
	<-time.After(888 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.Yellow("After unregistering"), eventBus.Length(), eventBus.Topics())

	// Delay till all topics are published.
	<-time.After(3 * time.Second)
}

func subscribe(eventBus eventually.Bus, topicsConsumersMap map[eventually.Descriptor][]eventually.Consumer, errChan chan error) {

	for topic, consumers := range topicsConsumersMap {
		for _, consumer := range consumers {
			go func(consumer eventually.Consumer) {
				for {
					message := consumer.ReadMessage()
					if message == nil {
						continue
					}
					fmt.Printf(
						"%s[%s] Inbox: got message %s\n",
						colorized.Green(consumer.String()),
						consumer.AggregateID(),
						colorized.Orange(message.Payload().(string)),
					)
				}
			}(consumer)

			switch topic.Name() {
			case universal:
				errChan <- eventBus.SubscribeToAll(consumer)

			default:
				errChan <- eventBus.Subscribe(consumer, topic)
			}
		}
	}
	// Delay till subscription happens.
	<-time.After(8 * time.Millisecond)
}

func monitorLogMessages(logChan chan string) {
	for message := range logChan {
		fmt.Println(colorized.Cyan(message))
	}
}

func monitorErrors(errChan chan error) {
	for err := range errChan {
		if err != nil && !errors.Is(err, io.EOF) {
			fmt.Println(colorized.Red(err.Error()))
		}
	}
}

Consumer

// consumer.go
package main

import (
	"fmt"
	"github.com/ahmedkamals/eventually"
	"reflect"
)

type (
	exampleConsumer struct {
		id      eventually.UUID
		mailBox eventually.Inbox
	}
)

func newExampleConsumer(ID eventually.UUID, mailBox eventually.Inbox) eventually.Consumer {
	return &exampleConsumer{
		id:      ID,
		mailBox: mailBox,
	}
}

func (ec *exampleConsumer) AggregateID() eventually.UUID {
	return ec.id
}

func (ec *exampleConsumer) MatchCriteria() eventually.Match {
	return func(topic eventually.Descriptor) bool {
		return true
	}
}

func (ec *exampleConsumer) Drop(descriptor eventually.Descriptor) {
	ec.mailBox.Receive(descriptor)
}

func (ec *exampleConsumer) ReadMessage() eventually.Descriptor {
	return ec.mailBox.Read()
}

func (ec *exampleConsumer) OnSubscribe(topic eventually.Descriptor, callback eventually.Notification) {
	callback(topic, ec)
}

func (ec *exampleConsumer) OnUnsubscribe(topic eventually.Descriptor, callback eventually.Notification) {
	callback(topic, ec)
}

func (ec *exampleConsumer) Signout() {
	ec.mailBox.Signout()
}

func (ec *exampleConsumer) GoString() string {
	return fmt.Sprintf("%s[%s]", ec.String(), ec.id)
}

func (ec *exampleConsumer) String() string {
	return reflect.TypeOf(ec).Elem().Name()
}

Sample output

πŸ•ΈοΈ Tests

make test

Benchmarks

Benchmarks Flamegraph

πŸ”₯ Todo:
  • Stats collection about published messages.
  • Storing published messages on a persistence medium (memory, disk) for a defined period of time.

🀝 Contribution

Please refer to the CONTRIBUTING.md file.

Git Hooks

In order to set up tests running on each commit do the following steps:

ln -sf ../../assets/git/hooks/pre-commit.sh .git/hooks/pre-commit && \
ln -sf ../../assets/git/hooks/pre-push.sh .git/hooks/pre-push     && \
ln -sf ../../assets/git/hooks/commit-msg.sh .git/hooks/commit-msg

πŸ‘¨β€πŸ’» Credits

πŸ†“ LICENSE

Eventually is released under MIT license, please refer to the LICENSE.md file.

FOSSA Status

Happy Coding πŸ™‚

Analytics

# Packages

No description provided by the author

# Functions

NewBus creates new Bus.
NewDescriptor returns a new topic descriptor.
NewDispatcher creates a new Dispatcher.
NewEventStore creates a new EventStore.
NewInbox creates a new Inbox.
NewSerializer creates a new Serializer.
NewUUID creates new UUID.

# Interfaces

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author