Categorygithub.com/wind-c/comqtt
2.6.0
Repository: https://github.com/wind-c/comqtt.git
Documentation: pkg.go.dev

# README

Build Status contributions welcome codecov GoDoc

Comqtt

A lightweight, high-performance MQTT server in Go (v3.0|v3.1.1|v5.0)

Comqtt is an embeddable high-performance MQTT broker server written in Go, and supporting distributed cluster, and compliant with the MQTT v3.0 and v3.1.1 and v5.0 specification for the development of IoT and smarthome projects. The server can be used either as a standalone binary or embedded as a library in your own projects. Comqtt message throughput is comparable with everyone's favourites such as Mosquitto, Mosca, and VerneMQ.

:+1: Comqtt code is cleaner, easier to read, customize, and extend than other Mqtt Broker code! :heart_eyes:

:+1: If you like this project or it's useful to you, please give it a STAR, let more people know about it, and contribute in it's maintenance together! :muscle:

📦 💬 See Github Discussions for discussions about releases

Ongoing discussion about current and future releases can be found at https://github.com/wind-c/comqtt/discussions

Developers in China can join wechat group discussions at https://github.com/wind-c/comqtt/discussions/32

What is MQTT?

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. Learn more

When is this repo updated?

Unless it's a critical issue, new releases typically go out over the weekend. At some point in the future this repo may be converted to an organisation, or collaborators added if the project continues to grow.

Comqtt Features

  • Full MQTTv5 Feature Compliance, compatibility for MQTT v3.1.1 and v3.0.0.
  • TCP, Websocket, (including SSL/TLS) and Dashboard listeners.
  • File-based server, auth, storage and bridge configuration, Click to see config examples.
  • Auth and ACL Plugin is supported Redis, HTTP, Mysql and PostgreSql.
  • Packets are bridged to kafka according to the configured rule.
  • Single-machine mode supports local storage BBolt, Badger and Redis.
  • Hook design pattern makes it easy to develop plugins for Auth, Bridge, and Storage.
  • Cluster support is based on Gossip and Raft, Click to Cluster README.

Roadmap

  • Dashboard.
  • Rule engine.
  • Bridge(Other Mqtt Broker、RocketMQ、RabbitMQ).
  • Enhanced Metrics support.
  • CoAP.

Quick Start

Running the Broker with Go

Comqtt can be used as a standalone broker. Simply checkout this repository and run the cmd/single/main.go entrypoint in the cmd folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners.

Build

cd cmd
go build -o comqtt ./single/main.go

Start

./comqtt
or
./comqtt --conf=./config/single.yml

If you want to obtain the bridge and multiple authentication capabilities, you need to use the configuration file to start.Click to config example.

Using Docker

A simple Dockerfile is provided for running the cmd/single/main.go Websocket, TCP, and Stats server:

docker build -t comqtt:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 comqtt:latest

Developing with Comqtt

Importing as a package

Importing Comqtt as a package requires just a few lines of code to get started.

import (
  "log"

  "github.com/wind-c/comqtt/v2/mqtt"
  "github.com/wind-c/comqtt/v2/mqtt/hooks/auth"
  "github.com/wind-c/comqtt/v2/mqtt/listeners"
)

func main() {
  // Create the new MQTT Server.
  server := mqtt.New(nil)

  // Allow all connections.
  _ = server.AddHook(new(auth.AllowHook), nil)

  // Create a TCP listener on a standard port.
  tcp := listeners.NewTCP("t1", ":1883", nil)
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }

  err = server.Serve()
  if err != nil {
    log.Fatal(err)
  }
}

Examples of running the broker with various configurations can be found in the mqtt/examples folder.

Network Listeners

The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:

ListenerUsage
listeners.NewTCPA TCP listener
listeners.NewUnixSockA Unix Socket listener
listeners.NewNetA net.Listener listener
listeners.NewWebsocketA Websocket listener
listeners.NewHTTPStatsAn HTTP $SYS info dashboard
listeners.NewHTTPHealthCheckAn HTTP healthcheck listener to provide health check responses for e.g. cloud infrastructure

Use the listeners.Listener interface to develop new listeners. If you do, please let us know!

A *listeners.Config may be passed to configure TLS.

Examples of usage can be found in the mqtt/examples folder or cmd/single/main.go.

Server Options and Capabilities

A number of configurable options are available which can be used to alter the behaviour or restrict access to certain features in the server.

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 1024,
  ClientNetReadBufferSize: 1024,
  SysTopicResendInterval: 10,
})

Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options.

Event Hooks

A universal event hooks system allows developers to hook into various parts of the server and client life cycle to add and modify functionality of the broker. These universal hooks are used to provide everything from authentication, persistent storage, to debugging tools.

Hooks are stackable - you can add multiple hooks to a server, and they will be run in the order they were added. Some hooks modify values, and these modified values will be passed to the subsequent hooks before being returned to the runtime code.

TypeImportInfo
Access Controlmqtt/hooks/auth.AllowHookAllow access to all connecting clients and read/write to all topics.
Access Controlmqtt/hooks/auth.AuthRule-based access control ledger.
Persistencemqtt/hooks/storage/boltPersistent storage using BoltDB (deprecated).
Persistencemqtt/hooks/storage/badgerPersistent storage using BadgerDB.
Persistencemqtt/hooks/storage/redisPersistent storage using Redis.
Debuggingmqtt/hooks/debugAdditional debugging output to visualise packet flow.

Many of the internal server functions are now exposed to developers, so you can make your own Hooks by using the above as examples. If you do, please Open an issue and let everyone know!

Authentication

Currently, Auth and ACL support the following back-end storage: Redis, Mysql, Postgresql, and Http. User password supported encryption algorithm: 0 no encrypt, 1 bcrypt(cost=10), 2 md5, 3 sha1, 4 sha256, 5 sha512, 6 hmac-sha1, 7 hmac-sha256, 8 hmac-sha512.

The following uses the postgresql and bcrypt encryption algorithms as examples.

Postgresql

The schema required is as follows:

BEGIN;
CREATE TABLE mqtt_user (
    id serial PRIMARY KEY,
    username TEXT NOT NULL UNIQUE,
    password TEXT NOT NULL,
    allow smallint DEFAULT 1 NOT NULL,
    created timestamp with time zone DEFAULT NOW(),
    updated timestamp
);

CREATE TABLE mqtt_acl(
    id serial PRIMARY KEY,
    username TEXT NOT NULL,
    topic TEXT NOT NULL,
    access smallint DEFAULT 3 NOT NULL,
    created timestamp with time zone DEFAULT NOW(),
    updated timestamp
);
CREATE INDEX mqtt_acl_username_idx ON mqtt_acl(username);
COMMIT;

Note that password for MQTT clients stored in PostgreSQL is stored as bcrypt hashed passwords. Therefore, to create / update new MQTT clients you can use this Python snippet:

import bcrypt
salt = bcrypt.gensalt(rounds=10)
hashed = bcrypt.hashpw(b"VeryVerySecretPa55w0rd", salt)
print(f"Password hash for MQTT client: {hashed}")

Go snippet:

import "golang.org/x/crypto/bcrypt"
hashed, err := bcrypt.GenerateFromPassword(pwd, bcrypt.DefaultCost)
if err != nil {
	return 
}
println("Password hash for MQTT client: ", hashed)

MySQL

The schema required is as follows:

BEGIN;
CREATE TABLE auth (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(255) NOT NULL UNIQUE,
    password VARCHAR(255) NOT NULL,
    allow SMALLINT DEFAULT 1 NOT NULL,
    created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated TIMESTAMP NULL
);
CREATE TABLE acl (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(255) NOT NULL,
    topic VARCHAR(255) NOT NULL,
    access SMALLINT DEFAULT 3 NOT NULL,
    created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated TIMESTAMP NULL
);
CREATE INDEX acl_username_idx ON acl(username);
COMMIT;

Access Control

Allow Hook

By default, Comqtt uses a DENY-ALL access control rule. To allow connections, this must overwritten using an Access Control hook. The simplest of these hooks is the auth.AllowAll hook, which provides ALLOW-ALL rules to all connections, subscriptions, and publishing. It's also the simplest hook to use:

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

Don't do this if you are exposing your server to the internet or untrusted networks - it should really be used for development, testing, and debugging only.

Auth Ledger

The Auth Ledger hook provides a sophisticated mechanism for defining access rules in a struct format. Auth ledger rules come in two forms: Auth rules (connection), and ACL rules (publish subscribe).

Auth rules have 4 optional criteria and an assertion flag:

CriteriaUsage
Clientclient id of the connecting client
Usernameusername of the connecting client
Passwordpassword of the connecting client
Remotethe remote address or ip of the client
Allowtrue (allow this user) or false (deny this user)

ACL rules have 3 optional criteria and an filter match:

CriteriaUsage
Clientclient id of the connecting client
Usernameusername of the connecting client
Remotethe remote address or ip of the client
Filtersan array of filters to match

Rules are processed in index order (0,1,2,3), returning on the first matching rule. See hooks/auth/ledger.go to review the structs.

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth disallows all by default
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL allows all by default
      {Remote: "127.0.0.1:*"}, // local superuser allow all
      {
        // user melon can read and write to their own topic
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // can write to updates, but can't read updates from others
        },
      },
      {
        // Otherwise, no clients have publishing permissions
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

The ledger can also be stored as JSON or YAML and loaded using the Data field:

err = server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // build ledger from byte slice: yaml or json
})

See examples/auth/encoded/main.go for more information.

Persistent Storage

Redis

A basic Redis storage hook is available which provides persistence for the broker. It can be added to the server in the same fashion as any other hook, with several options. It uses github.com/redis/go-redis/v9 under the hook, and is completely configurable through the Options value.

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // default redis address
    Password: "",               // your password
    DB:       0,                // your redis db
  },
})
if err != nil {
  log.Fatal(err)
}

For more information on how the redis hook works, or how to use it, see the mqtt/examples/persistence/redis/main.go or hooks/storage/redis code.

Badger DB

There's also a BadgerDB storage hook if you prefer file based storage. It can be added and configured in much the same way as the other hooks (with somewhat less options).

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

For more information on how the badger hook works, or how to use it, see the mqtt/examples/persistence/badger/main.go or hooks/storage/badger code.

There is also a BoltDB hook which has been deprecated in favour of Badger, but if you need it, check mqtt/examples/persistence/bolt/main.go.

Developing with Event Hooks

Many hooks are available for interacting with the broker and client lifecycle. The function signatures for all the hooks and mqtt.Hook interface can be found in mqtt/hooks.go.

The most flexible event hooks are OnPacketRead, OnPacketEncode, and OnPacketSent - these hooks be used to control and modify all incoming and outgoing packets.

FunctionUsage
OnStartedCalled when the server has successfully started.
OnStoppedCalled when the server has successfully stopped.
OnConnectAuthenticateCalled when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed.
OnACLCheckCalled when a user attempts to publish or subscribe to a topic filter. As above.
OnSysInfoTickCalled when the $SYS topic values are published out.
OnConnectCalled when a new client connects, may return an error or packet code to halt the client connection process.
OnSessionEstablishCalled immediately after a new client connects and authenticates and immediately before the session is established and CONNACK is sent.
OnSessionEstablishedCalled when a new client successfully establishes a session (after OnConnect)
OnDisconnectCalled when a client is disconnected for any reason.
OnAuthPacketCalled when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification.
OnPacketReadCalled when a packet is received from a client. Allows packet modification.
OnPacketEncodeCalled immediately before a packet is encoded to be sent to a client. Allows packet modification.
OnPacketSentCalled when a packet has been sent to a client.
OnPacketProcessedCalled when a packet has been received and successfully handled by the broker.
OnSubscribeCalled when a client subscribes to one or more filters. Allows packet modification.
OnSubscribedCalled when a client successfully subscribes to one or more filters.
OnSelectSubscribersCalled when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification.
OnUnsubscribeCalled when a client unsubscribes from one or more filters. Allows packet modification.
OnUnsubscribedCalled when a client successfully unsubscribes from one or more filters.
OnPublishCalled when a client publishes a message. Allows packet modification.
OnPublishedCalled when a client has published a message to subscribers.
OnPublishDroppedCalled when a message to a client is dropped before delivery, such as if the client is taking too long to respond.
OnRetainMessageCalled then a published message is retained.
OnRetainPublishedCalled then a retained message is published to a client.
OnQosPublishCalled when a publish packet with Qos >= 1 is issued to a subscriber.
OnQosCompleteCalled when the Qos flow for a message has been completed.
OnQosDroppedCalled when an inflight message expires before completion.
OnPacketIDExhaustedCalled when a client runs out of unused packet ids to assign.
OnWillCalled when a client disconnects and intends to issue a will message. Allows packet modification.
OnWillSentCalled when an LWT message has been issued from a disconnecting client.
OnClientExpiredCalled when a client session has expired and should be deleted.
OnRetainedExpiredCalled when a retained message has expired and should be deleted.
StoredClientsReturns clients, eg. from a persistent store.
StoredSubscriptionsReturns client subscriptions, eg. from a persistent store.
StoredInflightMessagesReturns inflight messages, eg. from a persistent store.
StoredRetainedMessagesReturns retained messages, eg. from a persistent store.
StoredSysInfoReturns stored system info values, eg. from a persistent store.

If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need OnACLCheck and OnConnectAuthenticate.

Direct Publish

To publish basic message to a topic from within the embedding application, you can use the server.Publish(topic string, payload []byte, retain bool, qos byte) error method.

err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

The Qos byte in this case is only used to set the upper qos limit available for subscribers, as per MQTT v5 spec.

Packet Injection

If you want more control, or want to set specific MQTT v5 properties and other values you can create your own publish packets from a client of your choice. This method allows you to inject MQTT packets (no just publish) directly into the runtime as though they had been received by a specific client. Most of the time you'll want to use the special client flag inline=true, as it has unique privileges: it bypasses all ACL and topic validation checks, meaning it can even publish to $SYS topics.

Packet injection can be used for any MQTT packet, including ping requests, subscriptions, etc. And because the Clients structs and methods are now exported, you can even inject packets on behalf of a connected client (if you have a very custom requirements).

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT packets still need to be correctly formed, so refer our the test packets catalogue and MQTTv5 Specification for inspiration.

See the hooks example to see this feature in action.

Testing

Unit Tests

Comqtt tests over a thousand scenarios with thoughtfully hand written unit tests to ensure each function does exactly what we expect. You can run the tests using go:

go run --cover ./...

Paho Interoperability Test

You can check the broker against the Paho Interoperability Test by starting the broker using examples/paho/main.go, and then running the mqtt v5 and v3 tests with python3 client_test5.py from the interoperability folder.

Note that there are currently a number of outstanding issues regarding false negatives in the paho suite, and as such, certain compatibility modes are enabled in the paho/main.go example.

Performance Benchmarks

Comqtt performance is comparable with popular brokers such as Mosquitto, EMQX, and others.

Performance benchmarks were tested using MQTT-Stresser on a Apple Macbook Air M2, using cmd/main.go default settings. Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better.

The values presented in the benchmark are not representative of true messages per second throughput. They rely on an unusual calculation by mqtt-stresser, but are usable as they are consistent across all brokers. Benchmarks are provided as a general performance expectation guideline only. Comparisons are performed using out-of-the-box default configurations.

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Brokerpublish fastestmedianslowestreceive fastestmedianslowest
Comqtt v2.0.0124,772125,456124,614314,461313,186311,910
Mosquitto v2.0.15155,920155,919155,918185,485185,097184,709
EMQX v5.0.11156,945156,257155,56817,91817,78317,649
Rumqtt v0.21.0112,208108,480104,753135,784126,446117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Brokerpublish fastestmedianslowestreceive fastestmedianslowest
Comqtt v2.0.045,61530,12921,138232,71786,32350,402
Mosquitto v2.0.1542,72938,63329,87923,24119,71418,806
EMQX v5.0.1121,55317,41814,3564,2573,9803,756
Rumqtt v0.21.042,21323,15320,81449,46536,62619,283

Million Message Challenge (hit the server with 1 million messages immediately):

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Brokerpublish fastestmedianslowestreceive fastestmedianslowest
Comqtt v2.0.051,0444,6822,34572,6347,6452,464
Mosquitto v2.0.153,8263,3953,0321,2001,1501,118
EMQX v5.0.114,0862,4322,274434333311
Rumqtt v0.21.078,9725,0473,8044,2863,2492,027

Not sure what's going on with EMQX here, perhaps the docker out-of-the-box settings are not optimal, so take it with a pinch of salt as we know for a fact it's a solid piece of software.

Contributions

Contributions and feedback are both welcomed and encouraged! Open an issue to report a bug, ask a question, or make a feature request.

# Packages

No description provided by the author