# README
mqtt

mqtt provides:
- MQTT broker that fully implements the MQTT protocol V3.1.1.
- Golang MQTT broker package for secondary development.
- MQTT protocol pack/unpack package for implementing MQTT clients or testing.
Installation
$ go get -u github.com/DrmagicE/mqtt
Features
- Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See
hooks.go
for more details - Support tls/ssl and websocket
- Enable user to write plugins. See
plugin.go
and/plugin
for more details. - Provide abilities for extensions to interact with the server. See
Server
interface inserver.go
andexample_test.go
for more details. - Provide metrics (by using Prometheus). (plugin: prometheus)
- Provide restful API to interact with server. (plugin:management)
Limitations
- The retained messages are not persisted when the server exit.
- Cluster is not supported.
Get Started
Build-in MQTT broker
$ cd cmd/broker
$ go run main.go
The broker will listen on port 1883 for TCP and 8080 for websocket. The broker loads the following plugins:
- management: Listens on port
8081
, provides restful api service - prometheus: Listens on port
8082
, serve as a prometheus exporter with/metrics
path.
Docker
$ docker build -t mqtt .
$ docker run -p 1883:1883 -p 8081:8081 -p 8082:8082 mqtt
Build with external source code
The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. But It is easy to write your own plugins to extend the broker.
func main() {
// listener
ln, err := net.Listen("tcp", ":1883")
if err != nil {
log.Fatalln(err.Error())
return
}
// websocket server
ws := &mqtt.WsServer{
Server: &http.Server{Addr: ":8080"},
Path: "/ws",
}
if err != nil {
panic(err)
}
l, _ := zap.NewProduction()
// l, _ := zap.NewDevelopment()
s := mqtt.NewServer(
mqtt.WithTCPListener(ln),
mqtt.WithWebsocketServer(ws),
// Add your plugins
mqtt.WithPlugin(management.New(":8081", nil)),
mqtt.WithPlugin(prometheus.New(&http.Server{
Addr: ":8082",
}, "/metrics")),
mqtt.WithLogger(l),
)
s.Run()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
s.Stop(context.Background())
}
See /examples
for more details.
Documentation
Hooks
mqtt implements the following hooks:
- OnAccept (Only for tcp/ssl, not for ws/wss)
- OnConnect
- OnConnected
- OnSessionCreated
- OnSessionResumed
- OnSessionTerminated
- OnSubscribe
- OnSubscribed
- OnUnsubscribed
- OnMsgArrived
- OnAcked
- OnMsgDropped
- OnDeliver
- OnClose
- OnStop
See /examples/hook
for more detail.
Stop the Server
Call server.Stop()
to stop the broker gracefully:
- Close all open TCP listeners and shutting down all open websocket servers
- Close all idle connections
- Wait for all connections have been closed
- Trigger OnStop().
Test
Unit Test
$ go test -race . && go test -race packets
$ cd packets
$ go test -race .
Integration Test
Pass paho.mqtt.testing.
TODO
- Support MQTT V3 and V5.
- Support bridge mode and maybe cluster.
Breaking changes may occur when adding this new features.
# Functions
LoggerWithField add fields to a new logger.
NewMessage creates a message for publish service.
NewServer returns a mqtt server instance with the given options.
No description provided by the author
Retained sets retained flag to the message.
WithConfig set the config of the server.
WithHook set hooks of the server.
No description provided by the author
WithPlugin set plugin(s) of the server.
WithTCPListener set tcp listener of the server.
WithWebsocketServer set websocket server(s) of the server.
# Constants
No description provided by the author
Client status.
Client status.
Default configration.
Default configration.
Default configration.
Client status.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Client status.
# Variables
DefaultConfig default config used by NewServer().
Error.
Error.
ErrInvalWsMsgType [MQTT-6.0.0-1].
# Structs
ClientStats provides the statistics of client connections.
No description provided by the author
No description provided by the author
MessageStats represents the statistics of PUBLISH packet, separated by QOS.
PacketBytes represents total bytes of each packet type have been received or sent.
PacketCount represents total number of each packet type have been received or sent.
PacketStats represents the statistics of MQTT Packet.
ServerStats is the collection of global statistics.
SessionStats the collection of statistics of each session.
WsServer is used to build websocket server.
# Interfaces
Client represent.
ClientOptionsReader is mainly used in callback functions.
插件.
PublishService provides the ability to publish messages to the broker.
Server interface represents a mqtt server instance.
SessionStatsManager interface provides the ability to access the statistics of the session.
StatsManager interface provides the ability to access the statistics of the server.
# Type aliases
No description provided by the author
OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接
OnAccept will be called after a new connection established in TCP server.
OnAcked 当客户端对qos1或qos2返回确认的时候调用
OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.
OnClose tcp连接关闭之后触发
OnClose will be called after the tcp connection of the client has been closed.
OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码
OnConnect will be called when a valid connect packet is received.
OnConnected 当客户端成功连接后触发
OnConnected will be called when a mqtt client connect successfully.
OnDeliver 分发消息时触发
OnDeliver will be called when publishing a message to a client.
OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发
OnMsgArrived returns whether the publish packet will be delivered or not.
OnMessageDropped 丢弃消息后触发
OnMsgDropped will be called after the msg dropped.
OnSessionCreated 新建session时触发
OnSessionCreated will be called when session created.
OnSessionResumed 恢复session时触发
OnSessionResumed will be called when session resumed.
OnSessionTerminated session 终止时触发
OnSessionTerminated will be called when session terminated.
OnStop will be called on server.Stop().
OnSubscribe 返回topic允许订阅的最高QoS等级
OnSubscribe returns the maximum available QoS for the topic:
0x00 - Success - Maximum QoS 0
0x01 - Success - Maximum QoS 1
0x02 - Success - Maximum QoS 2
0x80 - Failure
*/.
OnSubscribed will be called after the topic subscribe successfully.
OnUnsubscribed will be called after the topic has been unsubscribed.
No description provided by the author
No description provided by the author