Categorygithub.com/lytics/grid/v3
modulepackage
3.2.15
Repository: https://github.com/lytics/grid.git
Documentation: pkg.go.dev

# README

grid

Grid is a library for doing distributed processing. It's main goal is to help in scheduling fine-grain stateful computations, which grid calls actors, and sending data between them. Its only service dependency is an Etcd v3 server, used for discovery and coordination. Grid uses gRPC for communication, and sends Protobuf messages.

Example

Below is a basic example of starting your grid application. If a "leader" definition is registered, the leader actor will be started for you when Serve is called. The "leader" actor can be thought of as an entry-point into you distributed application. You don't have to use it, but it is often convenient.

No matter how many processes are participating in the grid, only one leader actor is started per namespace, it is a singleton. The actor named "leader" is also special in that if the process currently running the leader dies, the leader will be started on another peer, if more than one peer is participating in the grid.

func main() {
    etcd, err := etcdv3.New(...)
    ...

    server, err := grid.NewServer(etcd, grid.ServerCfg{Namespace: "mygrid"})
    ...

    server.RegisterDef("leader", func(_ []byte) (grid.Actor, error) { return &LeaderActor{...}, nil })
    server.RegisterDef("worker", func(_ []byte) (grid.Actor, error) { return &WorkerActor{...}, nil })

    lis, err := net.Listen("tcp", ...)
    ...

    err = server.Serve(lis)
    ...
}

Actor

Anything that implements the Actor interface is an actor. Actors typically represent the central work of you application.

type Actor interface {
    Act(ctx context.Context)
}

Example Actor, Part 1

Below is an actor that starts other actors, this is a typical way of structuring an application with grid. Here the leader actor starts a worker on each peer in the grid. Actors are started by sending an ActorStart message to a peer. Each actor must have a unique name, per namespace. The name is registered in Etcd to make sure that it is unique across all the processes of a grid.

const timeout = 2 * time.Second

type LeaderActor struct {
    client *grid.Client
}

func (a *LeaderActor) Act(ctx context.Context) {
    // Discover participating peers.
    peers, err := a.client.Query(timeout, grid.Peers)
    ...

    for _, peer := range peers {
        // Actor names are unique, registered in etcd.
        // There can never be more than one actor with
        // a given name. When an actor exits or panics
        // its record is removed from etcd.
        start := grid.NewActorStart("worker-for-%v", peer.Peer())
        start.Type = "worker"

        // Start a new actor on the given peer. The message
        // "ActorStart" is special. When sent to the mailbox
        // of a peer, that peer will start an actor based on
        // the definition.
        res, err := a.client.Request(timeout, peer.Name(), start)
        ...
    }

    ...
}

Example Actor, Part 2

An actor will typically need to receive data to work on. This may come from the filesystem or a database, but it can also come from messages sent to a mailbox. Just like actors, a mailbox is unique by name. Etcd is used to register the name and guarantee that only one such mailbox exists.

const size = 10

type WorkerActor struct {
    server *grid.Server
}

func (a *WorkerActor) Act(ctx context.Context) {
    name, err := grid.ContextActorName(ctx)
    ...

    // Listen to a mailbox with the same
    // name as the actor.
    mailbox, err := grid.NewMailbox(a.server, name, size)
    ...
    defer mailbox.Close()

    for {
        select {
        case req := <-mailbox.C:
            switch req.Msg().(type) {
            case PingMsg:
                err := req.Respond(&PongMsg{
                    ...
                })
        }
    }
}

Example Actor, Part 3

Each actor receives a context as a parameter in its Act method. That context is created by the peer that started the actor. The context contains several useful values, they can be extracted using the Context* functions.

func (a *WorkerActor) Act(ctx context.Context) {
    // The ID of the actor in etcd.
    id, err := grid.ContextActorID(ctx)

    // The name of the actor, as given in ActorStart.
    name, err := grid.ContextActorName(ctx)

    // The namespace of the grid this actor is associated with.
    namespace, err := grid.ContextActorNamespace(ctx)
}

Example Actor, Part 4

An actor can exit whenever it wants, but it must exit when its context signals done. An actor should always monitor its context Done channel.

func (a *WorkerActor) Act(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            // Stop requested, clean up and exit.
            return
        case ...
        }
    }
}

Example Actor, Part 5

Each actor is registered into etcd. Consequently each actor's name acts like a mutex. If code requests the actor to start twice the second request will receive an error indicating that the actor is already registered.

const timeout = 2 * time.Second

type LeaderActor struct {
    client *grid.Client
}

func (a *LeaderActor) Act(ctx context.Context) {
    start := grid.NewActorStart("worker-%d", 0)
    start.Type = "worker"

    // First request to start.
    err := a.client.Request(timeout, peer, start)

    // Second request will fail, if the first succeeded.
    err = a.client.Request(timeout, peer, start)
}

Testing

With out running the following setup commands, you'll get a panic because the init function for the package golang.org\x\net\trace will be run twice and cause an http already registered panic.

$ go test
panic: /debug/requests is already registered. You may have two independent copies of golang.org/x/net/trace in your binary, trying to maintain separate state. This may involve a vendored copy of golang.org/x/net/trace.

The work around is to create a vendor directory:

go mod vendor

Note vendor/is included in the .gitignore file.

Kubernetes + Grid

The examples above are meant to give some intuitive sense of what the grid library does. Howevery what it does not do is:

  1. Package up your configuration and binaries
  2. Start your VMs
  3. Start your processes on those VMs
  4. Autoscale your VMs when resources run low
  5. Reschedule crashed processes
  6. etc...

This is intentional as other tools already do these things. At the top of our list is Kubernetes and Docker, which between the two perform all of the above.

Grid comes into the picture once you start building out your application logic and need things like coordination and messaging, which under the hood in grid is done with Etcd and gRPC - taking care of some boilerplate code for you.

Sending Messages

Sending messages is always done through the client. The client configuration has only one required parameter, the namespace of the grid to connect to. Different namespaces can communicate by simply creating clients to the namespace they wish to send messages.

const timeout = 2 * time.Second


func Example() {
    etcd, err := etcdv3.New(...)
    ...

    client, err := grid.NewClient(etcd, grid.ClientCfg{Namespace: "myapp"})
    ...

    res, err := client.Request(timeout, "some-mailbox-name", &MyMsg{
        ...
    })

    ... process the response ...
}

Broadcasting Messages

Broadcasting messages is a way for the client to send messages to a group of actors. There are currently two different strategies for message broadcasting:

  • First-one-wins, where the request context is canceled as soon as one actor responds to the message.
  • Delivery to all actors, waits for all responses or timeouts
const (
 timeout = 2 * time.Second
 numRetry = 3
)

func Example() {
 etcd, err := etcdv3.New(...)
 ...

 client, err := grid.NewClient(etcd, grid.ClientCfg{Namespace: "myapp"})
 ...

 grp := grid.NewListGroup("actor-1", "actor-2", "actor-3")

 // Make a request to each actor in the group in parallel, first result
 // back cancels all the other requests.
 res, err := client.Broadcast(timeout, grp.Fastest(), &MyMsg{...})


 // Deliver to all actors in the group, retry just those that
 // were not successful in the previous try, and fold new
 // results into previous results.
 var res, tmp BroadcastResult
 var err error
 retry.X(numRetry, func() bool  {
    tmp, err = client.Broadcast(timeout, grp.ExceptSuccesses(res), &MyMsg{...})
    res.Add(tmp)
    return err != nil
 })
}

Registering Messages

Every type of message must be registered before use. Each message must be a Protobuf message. See the Go Protobuf Tutorial for more information, or the example below:

syntax = "proto3";
package msg;

message Person {
    string name = 1;
    string email = 2;
    ...
}

Before using the message it needs to be registered, which can be done inside init functions, the main function, or just before first sending and receiving the message.

func main() {
    grid.Register(msg.Person{})

    ...
}

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

ContextActorID returns the ID that is used to register the actor in etcd.
ContextActorName returns just the actor name, ie: no namespace, associated with this context.
ContextActorNamespace returns the namespace of the grid this actor is associated with.
NewActorStart message with the name of the actor to start, its type will be equal to its name unless its changed: start := NewActorStart("worker") Format names can also be used for more complicated names, just remember to override the type: start := NewActorStart("worker-%d-group-%d", i, j) start.Type = "worker" .
NewClient using the given etcd client and configuration.
NewListGroup creates a new Group.
NewQueryEvent does what it says.
NewServer for the grid.
No description provided by the author
Register a message so it may be sent and received.
No description provided by the author

# Constants

Actors filter for query.
No description provided by the author
No description provided by the author
No description provided by the author
Mailboxes filter for query.
Peers filter for query.
No description provided by the author

# Variables

Enum value maps for Delivery_Ver.
Enum value maps for Delivery_Ver.
ErrAlreadyRegistered when a mailbox is created but someone else has already created it.
ErrAlreadyResponded when respond is called multiple times on a request.
ErrContextFinished when the context signals done before the request could receive a response from the receiver.
ErrDefNotRegistered when a actor type which has never been registered is requested for start.
ErrIncompleteBroadcast when the Broadcast cannot successfully request an actor in the Group.
ErrInvalidActorName when the actor name contains invalid character codes.
ErrInvalidActorType when the actor type contains invalid character codes.
ErrInvalidContext when a context does not contain the requested values.
ErrInvalidMailboxName when a mailbox name contains invalid character codes.
ErrInvalidName when name contains invalid character codes.
ErrInvalidNamespace when namespace contains invalid character codes.
ErrNilActor when an actor definition has been registered but returns a nil actor and nil error when creating an actor.
ErrNilClient when a client method is called on a nil client.
ErrNilEtcd when the etcd argument is nil.
ErrNilGroup when a method that requires a group is called with a nil group.
ErrReceiverBusy when the message buffer of a mailbox is full, conisder a larger size when creating the mailbox.
ErrServerNotRunning when an operation which requires the server be running, but is not, is requested.
ErrUnknownMailbox when a message is received by a peer for a mailbox the peer does not serve, likely the mailbox has moved between the time of discovery and the message receive.
ErrUnregisteredMailbox when a mailbox name does not exist in the registry, likely it was never created or has died.
ErrWatchClosedUnexpectedly when a query watch closes before it was requested to close, likely do to some etcd issue.
No description provided by the author
Wire_ServiceDesc is the grpc.ServiceDesc for Wire service.

# Structs

No description provided by the author
No description provided by the author
Client for grid-actors or non-actors to make requests to grid-actors.
ClientCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.
No description provided by the author
No description provided by the author
Group defines a group of actors.
GRPCMailbox for receiving messages.
QueryEvent indicating that an entity has been discovered, lost, or some error has occured with the watch.
Result stores the result of a Request.
Server of a grid.
ServerCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.
UnimplementedWireServer must be embedded to have forward compatible implementations.

# Interfaces

Actor that does work.
Logger hides the logging function Printf behind a simple interface so libraries such as logrus can be used.
No description provided by the author
Request which must receive an ack or response.
UnsafeWireServer may be embedded to opt out of forward compatibility for this service.
WireClient is the client API for Wire service.
WireServer is the server API for Wire service.

# Type aliases

BroadcastResult is used to store the results of the Broadcast.
No description provided by the author
No description provided by the author
EventType categorizing the event.
MakeActor using the given data to parameterize the making of the actor; the data is optional.