Categorygithub.com/wisemonkeys-co/ocs-messaging
repository
0.0.5-beta.0
Repository: https://github.com/wisemonkeys-co/ocs-messaging.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# README

ocs-messaging

O que é

ocs-messaging é um framework que provê funcionalidades básicas de produção, consumo e manipulação de mensagens kafka.

Exemplos de uso

Envio de mensagem

  • Mensagem sem validação de schema

    package main
    
    import (
      "github.com/wisemonkeys-co/ocs-messaging/producer"
      "github.com/wisemonkeys-co/ocs-messaging/types"
    )
    
    func main() {
      producerConfig := map[string]interface{}{
        "bootstrap.servers": "localhost:9092",
      }
      logChannel := make(chan<- types.LogEvent)
      kp := producer.KafkaProducer{}
      kp.Init(producerConfig, nil, logChannel)
      kp.SendRawData("my-topic", []byte("foo"), []byte("bar")) // topic, key, value
    }
    
  • Mensagem com validação de schema

    package main
    
    import (
      "fmt"
    
      "github.com/wisemonkeys-co/ocs-messaging/producer"
      schemavalidator "github.com/wisemonkeys-co/ocs-messaging/schema-validator"
      "github.com/wisemonkeys-co/ocs-messaging/types"
    )
    
    /*
      Using the following json-schema
        - key (id 1)
          {
            "title": "lorem ipsum",
            "description": "dolor sit amet",
            "type": "string"
          }
        - value (id 13)
          {
            "description": "consectetur adipiscing elit",
            "type": "object",
            "properties": {
              "bar": {
                "type": "string"
              }
            },
            "required": ["bar"]
          }
    */
    
    type Foo struct {
      Bar string `json:"bar"`
    }
    
    func main() {
      fooKey := "key"
      fooValue := Foo{
        Bar: "value",
      }
      producerConfig := map[string]interface{}{
        "bootstrap.servers": "localhost:9092",
      }
      logChannel := make(chan<- types.LogEvent)
      schemaRegistryValidator := schemavalidator.SchemaRegistryValidator{}
      schemaRegistryValidator.Init("http://localhost:8081", "", "") // url, user, pass
      kp := producer.KafkaProducer{}
      kp.Init(producerConfig, &schemaRegistryValidator, logChannel)
      validationError := kp.SendSchemaBasedMessage("my-topic", fooKey, fooValue, 1, 13) // topic, key, value, key schema id, value schema id
      if validationError != nil {
        fmt.Println(validationError)
      }
    }
    
    

Consumo de mensagem

package main

import (
	"fmt"
	"os"
	"os/signal"

	"github.com/wisemonkeys-co/ocs-messaging/consumer"
	schemavalidator "github.com/wisemonkeys-co/ocs-messaging/schema-validator"
	"github.com/wisemonkeys-co/ocs-messaging/types"
)

type Foo struct {
	Bar string `json:"bar"`
}

func main() {
	// Handle "ctrl + C" signal
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	// Data type of messages
	var fooKey string
	var fooValue Foo
	// Consumer dependencies
	consumerConfig := map[string]interface{}{
		"bootstrap.servers": "localhost:9092",
		"group.id":          "my-consumer-group-0",
		"auto.offset.reset": "earliest", // optional
	}
	messageChannel := make(chan consumer.SimpleMessage)
	topicList := []string{"my-topic"}
	logChannel := make(chan types.LogEvent) // optional
	// Instance to decode schema-based messages
	schemaRegistryValidator := schemavalidator.SchemaRegistryValidator{}
	schemaRegistryValidator.Init("http://localhost:8081", "", "") // url, user, pass
	kc := consumer.KafkaConsumer{}
	// Logs handler (optional). Doesn't exists if logChannel is not defined
	go func() {
		for {
			logData := <-logChannel
			fmt.Printf("Received log from %s, %s, %s, %s\n\n", logData.InstanceName, logData.Type, logData.Tag, logData.Message)
		}
	}()
	// Message handler
	go func() {
		for {
			msg := <-messageChannel
			fmt.Printf("Topic %s, Offset %d\n", msg.Topic, msg.Offset)
			if msg.Key != nil {
				decodeError := schemaRegistryValidator.Decode(msg.Key, &fooKey)
				if decodeError == nil {
					fmt.Printf("Key with schema: %s\n", fooKey)
				} else {
					fmt.Printf("Raw key: %s\n", string(msg.Key))
				}
			}
			if msg.Value != nil {
				decodeError := schemaRegistryValidator.Decode(msg.Value, &fooValue)
				if decodeError == nil {
					fmt.Printf("Value with schema (fooValue.Bar): %s\n", fooValue.Bar)
				} else {
					fmt.Printf("Raw value: %s\n", string(msg.Value))
				}
			}
		}
	}()
	// Starts to consume messages
	startConsumerError := kc.StartConsumer(consumerConfig, topicList, messageChannel, logChannel)
	if startConsumerError != nil {
		fmt.Println(startConsumerError)
		return
	}
	// Wait "ctrl + C" signal
	<-c
	fmt.Println("Shutdown in progress")
	kc.StopConsumer()
}