Categorygithub.com/soypat/natiu-mqtt
modulepackage
0.5.1
Repository: https://github.com/soypat/natiu-mqtt.git
Documentation: pkg.go.dev

# README

Go Report Card GoDoc codecov

natiu-mqtt

A dead-simple, extensible and correct MQTT implementation.

Natiu: Means mosquito in the Guaraní language, a language spoken primarily in Paraguay. Commonly written as ñati'û or ñati'ũ.

Highlights

  • Modular

    • Client implementation leaves allocating parts up to the Decoder interface type. Users can choose to use non-allocating or allocating implementations of the 3 method interface.
    • RxTx type lets one build an MQTT implementation from scratch for any transport. No server/client logic defined at this level.
  • No uneeded allocations: The PUBLISH application message is not handled by this library, the user receives an io.Reader with the underlying transport bytes. This prevents allocations on natiu-mqtt side.

  • V3.1.1: Compliant with MQTT version 3.1.1 for QoS0 interactions. QoS1 and QoS2 are WIP.

  • No external dependencies: Nada. Nope.

  • Data oriented design: Minimizes abstractions or objects for the data on the wire.

  • Fuzz tested, robust: Decoding implementation fuzzed to prevent adversarial user input from crashing application (95% coverage).

  • Simplicity: A simple base package yields simple implementations for different transports. See Implementations section.

  • Runtime-what?: Unlike other MQTT implementations. No channels, no interface conversions, no goroutines- as little runtimey stuff as possible. You get the best of Go's concrete types when using Natiu's API. Why? Because MQTT deserialization and serialization are an embarrassingly serial and concrete problem.

Goals

This implementation will have a simple embedded-systems implementation in the package top level. This implementation will be transport agnostic and non-concurrent. This will make it far easier to modify and reason about. The transport dependent implementations will have their own subpackage, so one package for TCP transport, another for UART, PPP etc.

  • Minimal, if any, heap allocations.
  • Support for TCP transport.
  • User owns payload bytes.

Implementations

Examples

API subject to before v1.0.0 release.

Example use of Client

	// Create new client.
	client := mqtt.NewClient(mqtt.ClientConfig{
		Decoder: mqtt.DecoderNoAlloc{make([]byte, 1500)},
		OnPub: func(_ mqtt.Header, _ mqtt.VariablesPublish, r io.Reader) error {
			message, _ := io.ReadAll(r)
			log.Println("received message:", string(message))
			return nil
		},
	})

	// Get a transport for MQTT packets.
	const defaultMQTTPort = ":1883"
	conn, err := net.Dial("tcp", "127.0.0.1"+defaultMQTTPort)
	if err != nil {
		fmt.Println(err)
		return
	}

	// Prepare for CONNECT interaction with server.
	var varConn mqtt.VariablesConnect
	varConn.SetDefaultMQTT([]byte("salamanca"))
	ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
	err = client.Connect(ctx, conn, &varConn) // Connect to server.
	cancel()
	if err != nil {
		// Error or loop until connect success.
		log.Fatalf("connect attempt failed: %v\n", err)
	}

	// Ping forever until error.
	for {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		pingErr := client.Ping(ctx)
		cancel()
		if pingErr != nil {
			log.Fatal("ping error: ", pingErr, " with disconnect reason:", client.Err())
		}
		log.Println("ping success!")
	}

Example: Low level packet management with RxTx type

func main() {
    // Dial a TCP connection.
    const defaultMQTTPort = ":1883"
	conn, err := net.Dial("tcp", "127.0.0.1"+defaultMQTTPort)
	if err != nil {
		log.Fatal(err)
	}

    // Create the RxTx MQTT IO handler.
	rxtx, err := mqtt.NewRxTx(conn, mqtt.DecoderLowmem{UserBuffer: make([]byte, 1500)})
	if err != nil {
		log.Fatal(err)
	}
    // Add a handler on CONNACK packet.
	rxtx.OnConnack = func(rt *mqtt.RxTx, vc mqtt.VariablesConnack) error {
		log.Printf("%v received, SP=%v, rc=%v", rt.LastReceivedHeader.String(), vc.SessionPresent(), vc.ReturnCode.String())
		return nil
	}
	
    // Prepare to send first MQTT packet over wire.
	varConnect := mqtt.VariablesConnect{
		ClientID:      []byte("salamanca"),
		Protocol:      []byte("MQTT"),
		ProtocolLevel: 4,
		KeepAlive:     60,
		CleanSession:  true,
		WillMessage:   []byte("MQTT is okay, I guess"),
		WillTopic:     []byte("mqttnerds"),
		WillRetain:    true,
	}
    // Header set automatically for all packets that are not PUBLISH.
	err = rxtx.WriteConnect(&varConnect)
	if err != nil {
		log.Fatal(err)
	}
}

Why not just use paho?

Some issues with Eclipse's Paho implementation:

  • Inherent data races on API side. The implementation is so notoriously hard to modify this issue has been in a frozen state.
  • Calling Client.Disconnect when client is already disconnected blocks indefinetely and can cause deadlock or spin with Paho's implementation.
  • If there is an issue with the network and Reconnect is enabled then then Paho's Reconnect spins. There is no way to prevent this.
  • Interfaces used for ALL data types. This is not necessary and makes it difficult to work with since there is no in-IDE documentation on interface methods.
  • No lower level abstraction of MQTT for use in embedded systems with non-TCP transport.
  • Uses any interface for the payload, which could simply be a byte slice...

I found these issues after a 2 hour dev session. There will undoubtedly be more if I were to try to actually get it working...

# Functions

DecodeHeader receives transp, an io.ByteReader that reads from an underlying arbitrary transport protocol.
NewClient creates a new MQTT client with the configuration parameters provided.
NewHeader creates a new Header for a packetType and returns an error if invalid arguments are passed in.
NewPublishFlags returns PUBLISH packet flags and an error if the flags were to create a malformed packet according to MQTT specification.

# Constants

Accepted protocol as per MQTT v3.1.1.
Accepted protocol level as per MQTT v3.1.1.
The CONNACK Packet is the packet sent by the Server in response to a CONNECT Packet received from a Client.
A CONNECT packet is sent from Client to Server, it is a Client request to connect to a Server.
The DISCONNECT Packet is the final Control Packet sent from the Client to the Server.
Reserved flags for PUBREL, SUBSCRIBE and UNSUBSCRIBE packet types.
The PINGREQ Packet is sent from a Client to the Server.
A PINGRESP Packet is sent by the Server to the Client in response to a PINGREQ Packet.
A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1.
The PUBCOMP Packet is the response to a PUBREL Packet.
A PUBLISH Control Packet is sent from a Client to a Server or from Server to a Client to transport an Application Message.
A PUBREC Packet is the response to a PUBLISH Packet with QoS 2.
A PUBREL Packet is the response to a PUBREC Packet.
A SUBACK Packet is sent by the Server to the Client to confirm receipt and processing of a SUBSCRIBE Packet.
The SUBSCRIBE Packet is sent from the Client to the Server to create one or more Subscriptions.
The UNSUBACK Packet is sent by the Server to the Client to confirm receipt of an UNSUBSCRIBE Packet.
An UNSUBSCRIBE Packet is sent by the Client to the Server, to unsubscribe from topics.
QoS0 at most once delivery.
QoS1 at least once delivery.
QoS2 Exactly once delivery.
QoSSubfail marks a failure in SUBACK.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

ErrBadRemainingLen is passed to Rx's OnRxError after decoding a header with a remaining length that does not conform to MQTT v3.1.1 packet specifications.
natiu-mqtt depends on user provided buffers for string and byte slice allocation.

# Structs

Client is a asynchronous MQTT v3.1.1 client implementation which is safe for concurrent use.
ClientConfig is used to configure a new Client.
DecoderNoAlloc implements the [Decoder] interface for unmarshalling Variable headers of MQTT packets.
Header represents the bytes preceding the payload in an MQTT packet.
Rx implements a bare minimum MQTT v3.1.1 protocol transport layer handler.
RxCallbacks groups all functionality executed on data receipt, both successful and unsuccessful.
SubscribeRequest is relevant only to SUBSCRIBE packets where several SubscribeRequest each encode a topic filter that is to be matched on the server side and a desired QoS for each matched topic.
Tx implements a bare minimum MQTT v3.1.1 protocol transport layer handler for transmitting packets.
TxCallbacks groups functionality executed on transmission success or failure of an MQTT packet.
No description provided by the author
VariablesConnect all strings in the variable header must be UTF-8 encoded except password which may be binary data.
VariablesPublish represents the variable header of a PUBLISH packet.
VariablesSuback represents the variable header of a SUBACK packet.
VariablesSubscribe represents the variable header of a SUBSCRIBE packet.
VariablesUnsubscribe represents the variable header of a UNSUBSCRIBE packet.

# Interfaces

Decoder provides an abstraction for an MQTT variable header decoding implementation.

# Type aliases

ConnectReturnCode represents the CONNACK return code, which is the second byte in the variable header.
PacketFlags represents the LSB 4 bits in the first byte in an MQTT fixed header.
PacketType represents the 4 MSB bits in the first byte in an MQTT fixed header.
QoSLevel represents the Quality of Service specified by the client.