Categorygithub.com/Agent-Plus/go-grpc-broker
modulepackage
1.0.0-pre.8
Repository: https://github.com/agent-plus/go-grpc-broker.git
Documentation: pkg.go.dev

# README

go-grpc-broker

go-grpc-broker is plain module to construct exchange service, like publisher-subscriber, where publisher can push message to all subscribers in fanout mode, or communicate with one consumer in RPC mode.

RPC mode

RPC mode is designed to run function on the remote service and wait the result. Topic in exclusive mode combines only two members. If one side produces message and the other side not ready to operate its, the broker returns error.

                  +-------------+
                  |Exchange     |
                  ++-----------++
           -----> | Topic A     | ------->
Publisher         | (exclusive) |         Subscriber
           <----- |             | <-------
                  ++-----------++
                  |             |
                  +-------------+

Simple ping-pong

package main

import (
    cbr "github.com/Agent-Plus/go-grpc-broker/client"

    "flag"
    "fmt"
    "time"
)

var (
    guid = flag.String("guid", "", "client identifier")
    ping = flag.Bool("ping", false, "start ping request")
)

func main() {
    flag.Parse()

    cli := cbr.New(cbr.WithAuthentication(*guid, "secret"))
    if err := cli.Dial("127.0.0.1:8090"); err != nil {
        panic(err)
    }

    hdFunc := func(msg *cbr.Message) {
        resp := cbr.NewMessage()
        switch msg.Id {
        case "ping":
            resp.Id = "pong"
        case "pong":
            resp.Id = "ping"
        default:
            return
        }

        time.Sleep(1 * time.Second)
        fmt.Println("received: ", msg.Id, ", send: ", resp.Id)
        if err := cli.Publish("foo-ping", resp, nil); err != nil {
            panic(err)
        }
    }

    cc := cli.StartServe(hdFunc, "foo-ping", "", true)
    defer cc.Close()

    cc := cli.StartServe(hdFunc, "foo-ping", "", true)
    defer cc.Close()

    if *ping {
        go func() {
            for {
                timer := time.NewTimer(1 * time.Second)
                select {
                case <-timer.C:
                    msg := cbr.NewMessage()
                    msg.Id = "ping"
                    if err := cli.Publish("foo-ping", msg, nil); err == nil {
                        return
                    } else {
                        fmt.Println(err, ", retry at 1 sec")
                    }
                }
                timer.Stop()
            }
        }()
    }

    stop := make(chan struct{})
    <-stop
}

Start Sender A

go run . -ping -guid GUID

Start Sender B

go run . -guid GUID

Besides this one wants to pay attention on the client extension which helps to realize well known http handler style

package main

import (
    cbr "github.com/Agent-Plus/go-grpc-broker/client"

    "flag"
    "fmt"
    "strconv"
    "time"
)

var (
    guid = flag.String("guid", "", "client identifier")
    ping = flag.Bool("ping", false, "start ping request")
)

func main() {
    flag.Parse()

    cli := cbr.NewServeMux(cbr.New(cbr.WithAuthentication(*guid, "secret")))
    if err := cli.Dial("127.0.0.1:8090"); err != nil {
        panic(err)
    }

    cc := cli.StartServe("foo-ping", "", true)
    defer cc.Close()

    if *ping {
        // sender mode
        go func() {
            for {
                timer := time.NewTimer(1 * time.Second)
                select {
                case <-timer.C:
                    msg := cbr.NewMessage()
                    // set message header that points action type for the resource
                    cbr.SetMessageActionGet(msg, "ping/pong")
                    // message must have id
                    msg.Id = strconv.FormatInt(time.Now().Unix(), 10)
                    msg.Body = []byte("ping")

                    if res, err := cli.PublishRequest("foo-ping", msg, nil); err == nil {
                        fmt.Println("sent ping with id", msg.Id, "received response: ", res.Id)
                    } else {
                        fmt.Println(err, ", retry at 1 sec")
                    }
                }
                timer.Stop()
            }
        }()
    } else {
        // receive/response mode
        cli.Get("ping/pong", cbr.HandlerFunc(
            func(w cbr.ResponseWriter, msg *cbr.Message) {
                time.Sleep(1 * time.Second)
                id := strconv.FormatInt(time.Now().Unix(), 10)
                w.SetId(id)
                fmt.Println("received: ", msg.Id, ", send: ", id)

                if string(msg.Body) == "ping" {
                    w.SetBody([]byte("pong"))
                }

                if err := w.Publish("foo-ping", nil); err != nil {
                    panic(err)
                }
            }))
    }

    stop := make(chan struct{})
    <-stop
}

Start Sender A

go run . -ping -guid GUID

Start Sender B

go run . -guid GUID

Fanout mode

Fanout mode idea is to deliver to all subscribers of the topic mentioned by the producer without acknowledgement warranty.

                           +----------------+ 
                           |Exchange        |         +-----------+ 
                           |                |      -- |Subscriber | 
                           |+--------------+|  ---/   +-----------+ 
 Publisher             --- || Topic A      ||-/
 (Topic=A, Message) --/    |+--------------+|---\     +-----------+ 
                           |                |    ---- |Subscriber | 
                           |                |         +-----------+ 
                           +----------------+ 

Run gRPC server

package main

import (
	gbr "github.com/Agent-Plus/go-grpc-broker"
)

func main() {
    users := map[string]string{
	    "6704be61-3d72-4241-a740-ffb0d6c56da8": "secret",
    }
	auth := gbr.NewDummyAuthentication(users)

	broker := gbr.NewExchangeServer(auth)

	broker.Run("127.0.0.1:8090")
}

Recreate api

Required plugin

go install google.golang.org/grpc/cmd/protoc-gen-go-grpc

Create

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative api/api.proto

# Packages

No description provided by the author
No description provided by the author

# Functions

ChainStreamInterceptor is short hand for grpc.ChainStreamInterceptor.
ChainUnaryInterceptor is short hand for ChainUnaryInterceptor.
No description provided by the author
No description provided by the author
New creates Exchange.
NewChannel allocates new channel to keep communication.
NewDummyAuthentication creates new DummyAuthentication, can be prefiiled with given users store.
NewExchangeServer creates new Exchange Server with given authenticator and additional options for the grpc.Server.

# Constants

ConnIdCtxKey is connection tag key.
ExclusiveMode is RPC where conversation one to one.
Fanout is default subscription mode where each consumer recieves message.
RPCMode is subscription where in the topic is one consumer and many publishers.

# Variables

ErrAttempExceeded is raised when consumer cannot receive message and attempts to deliver were exceeded.
ErrChangeTopicMode is raised when new subscriber tries to change existing topic mode.
ErrChannelNotConsumed is raised in the topics with none FanoutMode: - publisher is not consumed to the topic with ExclusiveMode - there is no consumer in the topic with RPCMode or ExclusiveMode.
ErrDeliveryTimeout is raised when consumer channel does not pull message in time.
ErrInvalidUserPass is raised on invalid authentication: unknown user or wrong password.
ErrSubscribeStandAloneChannel rejects `Subscribe` action for the channel which is not in the `Exchange` scope.
ErrSubscribeRPCFull is raised when subscription attempt is rejected at any reason.
ErrUknonwChannel is raised on attempt to retreive uknown channel by token identifier from registry.
ErrUknonwToken is raised metadata does not contain required token.
ErrUknonwTopic is raised on attempt to retreive uknown topic.
ErrWaitTimeout is raised on attempt to push to the blocked channel.

# Structs

Channel represents adapter to pull message or to push message to others channels of the exchange topic.
CircuitErrors represents the collection of the errors heppends during loop.
DummyAuthentication represents the authentication stub in the tests.
Exchange represents the collection of the client channels and topics with their subscribers.
ExchangeServer wraps Exchange collection and implements api.ExchangeServer.

# Interfaces

Authenticator describes credentials validator.
No description provided by the author