Categorygithub.com/proxima-one/streamdb-client-go

# README

Golang client for proxima streams:

1) Proxima streams client (ProximaClient)

  • Provides basic access to grpc endpoint (no additional logic for handling errors and retries or state management)
  • GetTransitionsAfter(...) returns requested amount transitions after given state
  • GetStreams(...) return channel for transitions and channel for errors based on GRPC stream
  • GetStreamBasedOnRpc(...) same as GetStreams(...) but based on RPC calls instead of GRPC stream
	//setup connection
	cfg := config.NewConfigFromFileOverwriteOptions(
		"config.yaml",
		config.WithChannelSize(10000),
		config.WithState(model.Genesis()),
	)
    client := NewProximaClient(cfg)
    if /*request-response model*/() {
             stream, errc, err = reader.client.GetStreamBasedOnRpc(ctx, model.StreamState{
             StreamID: reader.config.GetStreamID(),
             State:    reader.lastState,
            })
        }
	if /*streaming model*/() {
             stream, errc, err = reader.client.GetStream(ctx, model.StreamState{
             StreamID: reader.config.GetStreamID(),
             State:    reader.lastState,
	    })
        }
		
    //read stream
	for {
        select {
        case <-ctx.Done():
            return
        case err := <-errc:
            if err != nil {
                return
            }
        case transition, ok := <-stream:
            //process transition
        }
    }

2) Proxima streams reader (StreamReader)

  • Provide easy access to streams based on ProximaClient
  • Handle state management (state is stored in memory)
  • can easily apply async function to every transition for example: Json Parsing for payload
  • trying to restore connection in case of not getting messages from stream for some time period

//StreamConnectionOption struct {
//Type            string //type of connection (grpc, rpc) StreamConnectionOptionTypeRpc, StreamConnectionOptionTypeStream
//ReconnectTime   time.Duration // restart connection every ReconnectTime
//WatchDogTimeout time.Duration // restart connection if WatchDogTimeout is exceeded
//}

reader := client.NewStreamReader(cfg, client.JsonParsingPreprocessFunc)
//you can write your own map function instead JsonParsingPreprocessFunc  fn(transition *model.Transition) -> (any, error)
startParams := client.NewDefaultStreamConnectionOption()
startErr := reader.Start(context.Background(), startParams)


for {
    data, err := reader.ReadNext()
	// check error 
	// if error is not nil then processing stream is stopped 
	// you need to call Start again if error is not critical
	// continueErr := reader.Start(context.Background(), startParams)
	mapValue, err := data.Preprocess.PreprocessingResult()
	//get access to required data from stream item
	//process data
	//...
}

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author