Categorygithub.com/just1689/entity-sync
module
5.2.0+incompatible
Repository: https://github.com/just1689/entity-sync.git
Documentation: pkg.go.dev

# README

Distributed Entity Sync

 codebeat badge  version
Push entities to websocket clients onchange to keep clients in sync.

Features

  • Stateless server. Servers do not need to know about each other or which clients are connected to other servers. This allows the server to scale without synchronizing them.
  • When you change something on the server side, provide the EntityKey to the bridge and all clients will be pushed the entity.
  • Multiple subscriptions. Each client can subscribe to multiple entities and multiples keys in each entity.
  • Multiple responses. You can send back several rows. This is great if updating the client means sending them rows from tables in foreign keys etc.
  • Database / repository agnostic. This library can take a function that you implement to use whichever database, driver, client or interface you choose to implement.
  • A helper package for a simple one call setup (see entitysync/entitysync.go)
  • Queue agnostic. Comes with working NSQ integration but you can choose to provide anything you can wrap in shared.EntityHandler and shared.EntityByteHandler.
  • Add a secret to a client. Accept a secret from the ws and set in client state. Pass secret to the handler to ensure the user is permitted to request the KeyEntity or filter the results they ask for.
  • Function for incoming websocket requests that don't match any concern for this library to pass through.

Roadmap

  • Providing the websocket context.
  • Only one lookup per server on change.
  • Consider improving the security model.

Example

Server setup

Connect the server to EntitySync. Wire the your mux to the bridge and provide a method that can resolve an EntityKey.

// Provide a configuration
config := es.Config{
    Mux:     mux.NewRouter(),
    NSQAddr: *nsqAddr,
}
//Setup entitySync with that configuration
entitySync := es.Setup(config)

//Register an entity and tell the library how to fetch and what to write to the client
entitySync.RegisterEntityAndDBHandler(entityType, func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
    item := fetch(entityKey)
    b, _ := json.Marshal(item)
    handler(b)
})

//Start a listener and provide the mux for routes / handling
l, _ = net.Listen("tcp", *listenLocal)
http.Serve(l, config.Mux)

Connect clients

Connect any number of clients:

  1. Connect to the server over websocket ws://host:port/ws/entity-sync/
  2. Send a subscription request
{
    "action": "subscribe",
    "body": {
        "id": "100",
        "entity": "items"
    }
}

Mutate entity & notify

Make some change to the item in question where it is persisted and then call bridge.NotifyAllOfChange(entityKey) where entityKey is a shared.EntityKey.

All connected clients over websockets will receive messages for the EntityKey/s to which they are subscribed.

Sending a secret from the client

{
    "action": "secret",
    "body": "my-super-secret-secret-123"
}

Other examples

Sending multiple rows to the client

entitySync.RegisterEntityAndDBHandler("report", func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
    item := controller.GetReportByID(entityKey.ID)
    b, _ := json.Marshal(item)
    handler(b)
        
    user := controller.GetUserByID(item.CreatedBy)
    b, _ = json.Marshal(user)
    handler(b)

    department := controller.GetDeparmentByID(item.departmentID)
    b, _ = json.Marshal(department)
    handler(b)
    
})

Checking if a user is allowed to receive the push notification

entitySync.RegisterEntityAndDBHandler("report", func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
    session, err := contoller.GetSessionBySecret(secret)
    if err != nil {
        logrus.Errorln(err)
        return
    }
	
    item := controller.GetReportByID(entityKey.ID)
    b, _ := json.Marshal(item)
    handler(b)
    
})

Provide your own queue

EntitySync is built using NSQ. In theory you can use whatever you like. You will need to provide two functions to the library. The one will allow it to produce a publisher and return a method that will be called to publish. The other is a subscriber and is provided a method for sending.


var queueAddr = "localhost:4000"

func setup() {
    mux := http.NewServeMux()
    databaseHub := esdb.NewDatabaseHub()
    
    // The bridge matches communication from ws to nsq and from nsq to ws.
    // It also calls on the db to resolve entityKey
    bridge := esbridge.BuildBridge(
        BuildPublisher(queueAddr),
        BuildSubscriber(queueAddr),
        databaseHub.PullDataAndPush,
    )
    
    //Pass the mux and a client builder to the libraries handlers
    esweb.SetupMuxBridge(mux, bridge.ClientBuilder)
    
    ...
}

var BuildPublisher shared.AddressableEntityHandler = func(addr string) shared.EntityHandler {
	return func(entityType shared.EntityType) shared.ByteHandler {
		return func(b []byte) {
			//TODO: setup the publisher client
			...
			qPublisher.publish(entityType.GetQueueName(), b)
		}
	}
}

var BuildSubscriber shared.AddressableEntityByteHandler = func(addr string) shared.EntityByteHandler {
	return func(entityType shared.EntityType, callback shared.ByteHandler) {
		func subscribeNSQ(qAddr string, entityType shared.EntityType, f shared.ByteHandler) {
		}(qArr, entityType, f func(b []byte {
			//TODO: connect to the nats client
			...
			natsHandler(in []byte) {
				f(in)
			}
		}))
	}
}

Pass through ws

You can provide a method that will allow for pass-through handling of websocket messages.

// Provide a configuration
config := es.Config{
    Mux:            mux.NewRouter(),
    NSQAddr:        *nsqAddr,
    WSPassThrough:  func(secret string, b []byte) {
    	//TODO: handle incoming websocket message
    } 
}
//Setup entitySync with that configuration
entitySync := es.Setup(config)

//Register an entity and tell the library how to fetch and what to write to the client
entitySync.RegisterEntityAndDBHandler(entityType, func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
    item := fetch(entityKey)
    b, _ := json.Marshal(item)
    handler(b)
})

//Start a listener and provide the mux for routes / handling
l, _ = net.Listen("tcp", *listenLocal)
http.Serve(l, config.Mux)

# Packages

No description provided by the author
No description provided by the author