Categorygithub.com/danclive/mqtt
modulepackage
0.4.0
Repository: https://github.com/danclive/mqtt.git
Documentation: pkg.go.dev

# README

中文文档

mqtt Mentioned in Awesome Go Build Status codecov Go Report Card

mqtt provides:

  • MQTT broker that fully implements the MQTT protocol V3.1.1.
  • Golang MQTT broker package for secondary development.
  • MQTT protocol pack/unpack package for implementing MQTT clients or testing.

Installation

$ go get -u github.com/DrmagicE/mqtt

Features

  • Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See hooks.go for more details
  • Support tls/ssl and websocket
  • Enable user to write plugins. See plugin.go and /plugin for more details.
  • Provide abilities for extensions to interact with the server. See Server interface in server.go and example_test.go for more details.
  • Provide metrics (by using Prometheus). (plugin: prometheus)
  • Provide restful API to interact with server. (plugin:management)

Limitations

  • The retained messages are not persisted when the server exit.
  • Cluster is not supported.

Get Started

Build-in MQTT broker

$ cd cmd/broker
$ go run main.go

The broker will listen on port 1883 for TCP and 8080 for websocket. The broker loads the following plugins:

  • management: Listens on port 8081, provides restful api service
  • prometheus: Listens on port 8082, serve as a prometheus exporter with /metrics path.

Docker

$ docker build -t mqtt .
$ docker run -p 1883:1883 -p  8081:8081 -p 8082:8082 mqtt

Build with external source code

The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. But It is easy to write your own plugins to extend the broker.

func main() {
	// listener
	ln, err := net.Listen("tcp", ":1883")
	if err != nil {
		log.Fatalln(err.Error())
		return
	}
	// websocket server
	ws := &mqtt.WsServer{
		Server: &http.Server{Addr: ":8080"},
		Path:   "/ws",
	}
	if err != nil {
		panic(err)
	}

	l, _ := zap.NewProduction()
	// l, _ := zap.NewDevelopment()
	s := mqtt.NewServer(
		mqtt.WithTCPListener(ln),
		mqtt.WithWebsocketServer(ws),
		// Add your plugins
		mqtt.WithPlugin(management.New(":8081", nil)),
		mqtt.WithPlugin(prometheus.New(&http.Server{
			Addr: ":8082",
		}, "/metrics")),
		mqtt.WithLogger(l),
	)

	s.Run()
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
	<-signalCh
	s.Stop(context.Background())
}

See /examples for more details.

Documentation

godoc

Hooks

mqtt implements the following hooks:

  • OnAccept (Only for tcp/ssl, not for ws/wss)
  • OnConnect
  • OnConnected
  • OnSessionCreated
  • OnSessionResumed
  • OnSessionTerminated
  • OnSubscribe
  • OnSubscribed
  • OnUnsubscribed
  • OnMsgArrived
  • OnAcked
  • OnMsgDropped
  • OnDeliver
  • OnClose
  • OnStop

See /examples/hook for more detail.

Stop the Server

Call server.Stop() to stop the broker gracefully:

  1. Close all open TCP listeners and shutting down all open websocket servers
  2. Close all idle connections
  3. Wait for all connections have been closed
  4. Trigger OnStop().

Test

Unit Test

$ go test -race . && go test -race packets
$ cd packets
$ go test -race .

Integration Test

Pass paho.mqtt.testing.

TODO

  • Support MQTT V3 and V5.
  • Support bridge mode and maybe cluster.

Breaking changes may occur when adding this new features.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

LoggerWithField add fields to a new logger.
NewMessage creates a message for publish service.
NewServer returns a mqtt server instance with the given options.
No description provided by the author
Retained sets retained flag to the message.
WithConfig set the config of the server.
WithHook set hooks of the server.
No description provided by the author
WithPlugin set plugin(s) of the server.
WithTCPListener set tcp listener of the server.
WithWebsocketServer set websocket server(s) of the server.

# Constants

No description provided by the author
Client status.
Client status.
Default configration.
Default configration.
Default configration.
Client status.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Client status.

# Variables

DefaultConfig default config used by NewServer().
Error.
Error.
ErrInvalWsMsgType [MQTT-6.0.0-1].

# Structs

ClientStats provides the statistics of client connections.
No description provided by the author
No description provided by the author
MessageStats represents the statistics of PUBLISH packet, separated by QOS.
PacketBytes represents total bytes of each packet type have been received or sent.
PacketCount represents total number of each packet type have been received or sent.
PacketStats represents the statistics of MQTT Packet.
ServerStats is the collection of global statistics.
SessionStats the collection of statistics of each session.
WsServer is used to build websocket server.

# Interfaces

Client represent.
ClientOptionsReader is mainly used in callback functions.
插件.
PublishService provides the ability to publish messages to the broker.
Server interface represents a mqtt server instance.
SessionStatsManager interface provides the ability to access the statistics of the session.
StatsManager interface provides the ability to access the statistics of the server.

# Type aliases

No description provided by the author
OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接 OnAccept will be called after a new connection established in TCP server.
OnAcked 当客户端对qos1或qos2返回确认的时候调用 OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.
OnClose tcp连接关闭之后触发 OnClose will be called after the tcp connection of the client has been closed.
OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码 OnConnect will be called when a valid connect packet is received.
OnConnected 当客户端成功连接后触发 OnConnected will be called when a mqtt client connect successfully.
OnDeliver 分发消息时触发 OnDeliver will be called when publishing a message to a client.
OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发 OnMsgArrived returns whether the publish packet will be delivered or not.
OnMessageDropped 丢弃消息后触发 OnMsgDropped will be called after the msg dropped.
OnSessionCreated 新建session时触发 OnSessionCreated will be called when session created.
OnSessionResumed 恢复session时触发 OnSessionResumed will be called when session resumed.
OnSessionTerminated session 终止时触发 OnSessionTerminated will be called when session terminated.
OnStop will be called on server.Stop().
OnSubscribe 返回topic允许订阅的最高QoS等级 OnSubscribe returns the maximum available QoS for the topic: 0x00 - Success - Maximum QoS 0 0x01 - Success - Maximum QoS 1 0x02 - Success - Maximum QoS 2 0x80 - Failure */.
OnSubscribed will be called after the topic subscribe successfully.
OnUnsubscribed will be called after the topic has been unsubscribed.
No description provided by the author
No description provided by the author