Categorygithub.com/ngicks/eventqueue
repositorypackage
0.3.0
Repository: https://github.com/ngicks/eventqueue.git
Documentation: pkg.go.dev

# Packages

No description provided by the author

# README

eventqueue

A generic FIFO event queue.

eventqueue queues up events (tasks) and send them to Sink in FIFO order.

Usage

eventqueue queues up event object E (whatever you want), and then write them into Sink once events are available.

Sink is an interface expressed as

// Sink is written once EventQueue receives events.
// Write is serialized in EventQueue. It can be goroutine-unsafe method.
type Sink[E any] interface {
	// Write writes the event object to Sink.
	// If Write returns error, the event is put back to the head of queue.
	Write(ctx context.Context, event E) error
}

eventqueue can be Push-ed arbitrary number as long as the system has enough memory space. It then tries to Write to Sink in FIFO order.

eventqueue also can Reserve happening of event after fn func(context.Context) (E, error), passed fn will be called in a new goroutine and once fn returns with nil error, returned event from fn enters eventqueue.

func main() {
	// unbuffered channel sink.
	sink := eventqueue.NewChannelSink[int](0)
	// sink is interface.
	q := eventqueue.New[int](sink)

	for i := 0; i < 10; i++ {
		q.Push(i)
	}

	// q also can Reserve happening of event after fn returns.
	// If fn returns with nil error, returned E enters queue.
	q.Reserve(func(ctx context.Context) (int, error) {
		time.Sleep(500 * time.Millisecond)
		return 999, nil
	})

	// q also can cancel too long fn.
	// Of course if and only if fn respects given ctx.
	q.Reserve(func(ctx context.Context) (int, error) {
		timer := time.NewTimer(time.Hour)
		select {
		case <-ctx.Done():
			return 0, ctx.Err()
		case <-timer.C:
			return 2999, nil
		}
	})

	var wg sync.WaitGroup

	ctx, cancel := context.WithCancel(context.Background())
	wg.Add(1)
	go func() {
		defer wg.Done()
		remaining, err := q.Run(ctx)
		fmt.Printf("tasks remaining in queue: %d, err = %v\n", remaining, err) // tasks remaining in queue: 0, err = <nil>
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 11; i++ {
			fmt.Printf("received: %d\n", <-sink.Outlet())
			/*
				received: 0
				received: 1
				received: 2
				received: 3
				received: 4
				received: 5
				received: 6
				received: 7
				received: 8
				received: 9
				received: 999
			*/
		}
		fmt.Println("done")
	}()

	// at this point
	queued, reserved := q.Len()
	fmt.Printf("queued = %d, reserved = %d\n", queued, reserved) // queued = 10, reserved = 2
	waitCh := q.WaitReserved()
	// can observe Reserved fn returned.
	<-waitCh

	// and now is
	queued, reserved = q.Len()
	fmt.Printf("queued = %d, reserved = %d\n", queued, reserved) // queued = 1, reserved = 1

	// requests cancellation.
	q.CancelReserved()
	for range waitCh {
	}
	// then
	queued, reserved = q.Len()
	fmt.Printf("queued = %d, reserved = %d\n", queued, reserved) // queued = 0, reserved = 0

	q.Drain()

	cancel()
	wg.Wait()
}