Categorygithub.com/memphisdev/memphis.go
modulepackage
0.2.2
Repository: https://github.com/memphisdev/memphis.go.git
Documentation: pkg.go.dev

# README

Memphis light logo

Memphis light logo

Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

Sandbox - Docs - Twitter - YouTube

Discord Code Of Conduct GitHub release (latest by date)

Memphis is a next-generation alternative to traditional message brokers.

A simple, robust, and durable cloud-native message broker wrapped with
an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.

Memphis enables the building of modern queue-based applications that require
large volumes of streamed and enriched data, modern protocols, zero ops, rapid development,
extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.

Installation

After installing and running memphis broker,
In your project's directory:

go get github.com/memphisdev/memphis.go

Importing

import "github.com/memphisdev/memphis.go"

Connecting to Memphis

c, err := memphis.Connect("<memphis-host>", 
	"<application type username>", 
	memphis.ConnectionToken("<connection-token>"), // you will get it on application type user creation
	memphis.Password("<password>")) // depends on how Memphis deployed - default is connection token-based authentication

It is possible to pass connection configuration parameters, as function-parameters.
// function params
c, err := memphis.Connect("<memphis-host>", 
	"<application type username>", 
	memphis.ConnectionToken("<connection-token>"), // you will get it on application type user creation
	memphis.Password("<password>"), // depends on how Memphis deployed - default is connection token-based authentication
  memphis.Port(<int>), // defaults to 6666       
	memphis.Reconnect(<bool>), // defaults to true
	memphis.MaxReconnect(<int>), // defaults to 10
  memphis.ReconnectInterval(<time.Duration>) // defaults to 1 second
  memphis.Timeout(<time.Duration>) // defaults to 15 seconds
	// for TLS connection:
	memphis.Tls("<cert-client.pem>", "<key-client.pem>",  "<rootCA.pem>"),
	)

Once connected, all features offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call Close() on the Memphis connection object.

c.Close();

Creating a Station

Stations can be created from Conn
Passing optional parameters using functions
If a station already exists nothing happens, the new configuration will not be applied

s0, err = c.CreateStation("<station-name>")

s1, err = c.CreateStation("<station-name>", 
 memphis.RetentionTypeOpt(<Messages/MaxMessageAgeSeconds/Bytes>),
 memphis.RetentionVal(<int>), 
 memphis.StorageTypeOpt(<Memory/Disk>), 
 memphis.Replicas(<int>), 
 memphis.IdempotencyWindow(<time.Duration>), // defaults to 2 minutes
 memphis.SchemaName(<string>),
 memphis.SendPoisonMsgToDls(<bool>), // defaults to true
 memphis.SendSchemaFailedMsgToDls(<bool>), // defaults to true
 memphis.TieredStorageEnabled(<bool>) // defaults to false
)

Retention Types

Memphis currently supports the following types of retention:

memphis.MaxMessageAgeSeconds

The above means that every message persists for the value set in the retention value field (in seconds).

memphis.Messages

The above means that after the maximum number of saved messages (set in retention value)
has been reached, the oldest messages will be deleted.

memphis.Bytes

The above means that after maximum number of saved bytes (set in retention value)
has been reached, the oldest messages will be deleted.

Retention Values

The retention values are directly related to the retention types mentioned above,
where the values vary according to the type of retention chosen.

All retention values are of type int but with different representations as follows:

memphis.MaxMessageAgeSeconds is represented in seconds, memphis.Messages in a number of messages
and finally memphis.Bytes in a number of bytes.

After these limits are reached oldest messages will be deleted.

Storage Types

Memphis currently supports the following types of messages storage:

memphis.Disk

The above means that messages persist on disk.

memphis.Memory

The above means that messages persist on the main memory.

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).

err := s.Destroy();

Attaching a Schema to an Existing Station

err := conn.AttachSchema("<schema-name>", "<station-name>")

Detaching a Schema from Station

err := conn.DetachSchema("<station-name>")

Produce and Consume Messages

The most common client operations are producing messages and consuming messages.

Messages are published to a station and consumed from it
by creating a consumer and calling its Consume function with a message handler callback function.
Consumers are pull-based and consume all the messages in a station
unless you are using a consumers group,
in which case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are byte slices, i.e []byte.

In order to stop receiving messages, you have to call consumer.StopConsume().
The consumer will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

// from a Conn
p0, err := c.CreateProducer(
	"<station-name>",
	"<producer-name>",
	memphis.ProducerGenUniqueSuffix()
) 

// from a Station
p1, err := s.CreateProducer("<producer-name>")

Producing a message

Without creating a producer (receiver function of the connection struct). In cases where extra performance is needed the recommended way is to create a producer first and produce messages by using the produce receiver function of it

c.Produce("station_name_c_produce", "producer_name_a", []byte("Hey There!"), []memphis.ProducerOpt{}, []memphis.ProduceOpt{})

Creating a producer first (receiver function of the producer struct).

p.Produce("<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds

Add headers

hdrs := memphis.Headers{}
hdrs.New()
err := hdrs.Add("key", "value")
p.Produce(
	"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
    memphis.AckWaitSec(15),
	memphis.MsgHeaders(hdrs) // defaults to empty
)

Async produce

Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss

p.Produce(
	"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
    memphis.AckWaitSec(15),
	memphis.AsyncProduce()
)

Message ID

Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

p.Produce(
	"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
    memphis.AckWaitSec(15),
	memphis.MsgId("343")
)

Destroying a Producer

p.Destroy();

Creating a Consumer

// creation from a Station
consumer0, err = s.CreateConsumer("<consumer-name>",
  memphis.ConsumerGroup("<consumer-group>"), // defaults to consumer name
  memphis.PullInterval(<pull interval time.Duration), // defaults to 1 second
  memphis.BatchSize(<batch-size> int), // defaults to 10
  memphis.BatchMaxWaitTime(<time.Duration>), // defaults to 5 seconds, has to be at least 1 ms
  memphis.MaxAckTime(<time.Duration>), // defaults to 30 sec
  memphis.MaxMsgDeliveries(<int>), // defaults to 10
  memphis.ConsumerGenUniqueSuffix(),
  memphis.ConsumerErrorHandler(func(*Consumer, error){})
  memphis.StartConsumeFromSeq(<uint64>)// start consuming from a specific sequence. defaults to 1
  memphis.LastMessages(<int64>)// consume the last N messages, defaults to -1 (all messages in the station)
)
  
// creation from a Conn
consumer1, err = c.CreateConsumer("<station-name>", "<consumer-name>", ...) 

Passing a context to a message handler

ctx := context.Background()
ctx = context.WithValue(ctx, "key", "value")
consumer.SetContext(ctx)

Processing Messages

First, create a callback function that receives a slice of pointers to memphis.Msg and an error.

Then, pass this callback into consumer.Consume function.

The consumer will try to fetch messages every pullInterval (that was given in Consumer's creation) and call the defined message handler.

func handler(msgs []*memphis.Msg, err error, ctx context.Context) {
	if err != nil {
		m := msgs[0]
		fmt.Println(string(m.Data()))
		m.Ack()
	}
}

consumer.Consume(handler)

Fetch a single batch of messages

msgs, err := conn.FetchMessages("<station-name>", "<consumer-name>",
  memphis.FetchBatchSize(<int>) // defaults to 10
  memphis.FetchConsumerGroup("<consumer-group>"), // defaults to consumer name
  memphis.FetchBatchMaxWaitTime(<time.Duration>), // defaults to 5 seconds, has to be at least 1 ms
  memphis.FetchMaxAckTime(<time.Duration>), // defaults to 30 sec
  memphis.FetchMaxMsgDeliveries(<int>), // defaults to 10
  memphis.FetchConsumerGenUniqueSuffix(),
  memphis.FetchConsumerErrorHandler(func(*Consumer, error){})
  memphis.FetchStartConsumeFromSeq(<uint64>)// start consuming from a specific sequence. defaults to 1
  memphis.FetchLastMessages(<int64>)// consume the last N messages, defaults to -1 (all messages in the station))

Fetch a single batch of messages after creating a consumer

msgs, err := consumer.Fetch(<batch-size> int)

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not
re-send the same message again to the same consumer or consumers group.

message.Ack();

Delay the message after a given duration

Delay the message and tell Memphis server to re-send the same message again to the same consumer group.
The message will be redelivered only in case Consumer.MaxMsgDeliveries is not reached yet.

message.Delay(<time.Duration>);

Get headers

Get headers per message

headers := msg.GetHeaders()

Get message sequence number

Get message sequence number

sequenceNumber, err := msg.GetSequenceNumber()

Destroying a Consumer

consumer.Destroy();

Check if broker is connected

conn.IsConnected()

# Packages

No description provided by the author

# Functions

AckWaitSec - max time in seconds to wait for an ack from memphis.
AsyncProduce - produce operation won't wait for broker acknowledgement.
BatchMaxWaitTime - max time to wait between pulls, defauls is 5 seconds.
BatchSize - pull batch size.
Connect - creates connection with memphis.
ConnectionToken - string connection token.
ConsumerErrorHandler - handler for consumer errors.
ConsumerGenUniqueSuffix - whether to generate a unique suffix for this consumer.
ConsumerGroup - consumer group name, default is "".
No description provided by the author
BatchMaxWaitTime - max time to wait between pulls, defauls is 5 seconds.
BatchSize - pull batch size.
FetchConsumerErrorHandler - handler for consumer errors.
ConsumerGenUniqueSuffix - whether to generate a unique suffix for this consumer.
ConsumerGroup - consumer group name, default is "".
MaxAckTime - max time for ack a message, in case a message not acked within this time period memphis will resend it.
MaxMsgDeliveries - max number of message deliveries, by default is 10.
GetStationDefaultOptions - returns default configuration options for the station.
IdempotencyWindow - time frame in which idempotency track messages, default is 2 minutes.
No description provided by the author
MaxAckTime - max time for ack a message, in case a message not acked within this time period memphis will resend it.
MaxMsgDeliveries - max number of message deliveries, by default is 10.
MaxReconnect - the amount of reconnect attempts.
MsgHeaders - set headers to a message.
MsgId - set an id for a message for idempotent producer.
Name - station's name.
Password - string password.
Port - default is 6666.
ProducerGenUniqueSuffix - whether to generate a unique suffix for this producer.
PullInterval - interval between pulls, default is 1 second.
Reconnect - whether to do reconnect while connection is lost.
ReconnectInterval - interval in miliseconds between reconnect attempts.
Replicas - number of replicas for the messages of the data, default is 1.
RetentionTypeOpt - retention type, default is MaxMessageAgeSeconds.
RetentionVal - number which represents the retention based on the retentionType, default is 604800.
SchemaName - shcema's name to attach.
SendPoisonMsgToDls - send poison message to dls, default is true.
SendSchemaFailedMsgToDls - send message to dls after schema validation fail, default is true.
No description provided by the author
StorageTypeOpt - persistance storage for messages of the station, default is storageTypes.FILE.
TieredStorageEnabled - enable tiered storage, default is false.
Timeout - connection timeout in miliseconds.
Tls - paths to tls cert, key and ca files.

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author

# Structs

Conn - holds the connection with memphis.
Consumer - memphis consumer object.
ConsumerOpts - configuration options for a consumer.
No description provided by the author
FetchOpts - configuration options for fetch.
No description provided by the author
No description provided by the author
Msg - a received message, can be acked.
No description provided by the author
No description provided by the author
No description provided by the author
ProduceOpts - configuration options for produce operations.
Producer - memphis producer object.
No description provided by the author
ProducerOpts - configuration options for producer creation.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Station - memphis station object.
StationsOpts - configuration options for a station.
No description provided by the author

# Type aliases

ConsumeHandler - handler for consumed messages.
ConsumerErrHandler is used to process asynchronous errors.
ConsumerOpt - a function on the options for consumers.
No description provided by the author
FetchOpt - a function on the options fetch.
Option is a function on the options for a connection.
ProduceOpt - a function on the options for produce operations.
ProducerOpt - a function on the options for producer creation.
No description provided by the author
RetentionType - station's message retention type.
No description provided by the author
No description provided by the author
StationOpt - a function on the options for a station.
StorageType - station's message storage type.