Categorygithub.com/jjeffcaii/reactor-go
modulepackage
0.5.6
Repository: https://github.com/jjeffcaii/reactor-go.git
Documentation: pkg.go.dev

# README

reactor-go 🚀🚀🚀

GitHub Workflow Status codecov GoDoc Go Report Card License GitHub Release

A golang implementation for reactive-streams.

Install

go get -u github.com/jjeffcaii/reactor-go

Example

Mono

package mono_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/mono"
)

func Example() {
	gen := func(ctx context.Context, sink mono.Sink) {
		sink.Success("World")
	}
	mono.
		Create(gen).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = "Hello " + input.(string) + "!"
			return
		}).
		DoOnNext(func(v reactor.Any) error {
			fmt.Println(v)
			return nil
		}).
		Subscribe(context.Background())
}

// Should print
// Hello World!

Flux

package flux_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/flux"
	"github.com/jjeffcaii/reactor-go/scheduler"
)

func Example() {
	gen := func(ctx context.Context, sink flux.Sink) {
		for i := 0; i < 10; i++ {
			v := i
			sink.Next(v)
		}
		sink.Complete()
	}
	done := make(chan struct{})

	var su reactor.Subscription
	flux.Create(gen).
		Filter(func(i interface{}) bool {
			return i.(int)%2 == 0
		}).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = fmt.Sprintf("#HELLO_%04d", input.(int))
			return
		}).
		SubscribeOn(scheduler.Elastic()).
		Subscribe(context.Background(),
			reactor.OnSubscribe(func(s reactor.Subscription) {
				su = s
				s.Request(1)
			}),
			reactor.OnNext(func(v reactor.Any) error {
				fmt.Println("next:", v)
				su.Request(1)
				return nil
			}),
			reactor.OnComplete(func() {
				close(done)
			}),
		)
	<-done
}
// Should print:
// next: #HELLO_0000
// next: #HELLO_0002
// next: #HELLO_0004
// next: #HELLO_0006
// next: #HELLO_0008

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

IsCancelledError returns true if given error is a cancelled subscribe error.
No description provided by the author
NewSubscriber creates a Subscriber with given options.
OnComplete specified a Subscriber.OnComplete action.
OnError specified a Subscriber.OnError action.
OnNext specified a Subscriber.OnNext action.
OnSubscribe specified a Subscriber.OnSubscribe action.

# Constants

RequestInfinite means request items indefinitely.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author

# Structs

Item is type of element.

# Interfaces

Any is an alias of interface{} which means a value of any type.
Disposable is a disposable resource.
Processor combines the Publisher and Subscriber.
Publisher is th basic type that can be subscribed.
RawPublisher is the basic low-level Publisher that can be subscribed with a Subscriber.
Subscriber is the basic type to subscribing the Publisher and consumes the items from upstream.
Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.

# Type aliases

A group of action functions.
A group of action functions.
A group of action functions.
A group of action functions.
A group of action functions.
A group of action functions.
A group of action functions.
A group of action functions.
No description provided by the author
SignalType is type of terminal signal.
SubscriberOption is used to create a Subscriber easily.
No description provided by the author