Categorygithub.com/linfeip/gomq
module
0.0.0-20241119032417-3bd38eb94bbc
Repository: https://github.com/linfeip/gomq.git
Documentation: pkg.go.dev

# README

简介

使用golang实现极简的消息队列, 数据传输用的grpc, 数据根据topic分组, 每个组中有多个queue, 每个queue进行顺序消费, 多个queue并发消费

特性

  • 高可用
  • 消费订阅
  • 支持Topic*订阅
  • 主从复制
  • 使用gRPC通信
  • 指标收集 (Prometheus)

实现多节点主从复制

func TestBroker(t *testing.T) {
	var b0 = NewBroker("node0", "./node0", "localhost:13000")
	err := b0.Open(true)
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(3 * time.Second)

	var b1 = NewBroker("node1", "./node1", "localhost:13001")
	err = b1.Open(false)
	if err != nil {
		t.Fatal(err)
	}

	err = b0.Join("node1", "localhost:13001")
	if err != nil {
		t.Fatal(err)
	}

	for {
		time.Sleep(time.Second)

		err := b0.EnqueueTo(0, &protocol.CommitLogPacket{
			Topic:    "testTopic",
			Magic:    1,
			Body:     []byte("HelloBroker"),
			MsgId:    "1",
			Checksum: 1,
		})

		if err != nil {
			t.Fatal(err)
		}

		v, err := b0.Dequeue("testTopic", 0)
		if err != nil {
			t.Fatal(err)
		}

		logger.Infof("dequeue packet: %s queue: %d", v.GetTopic(), v.GetQueue())
	}
}

压测

普通的进行发布消费


func BenchmarkRocket(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			err := testProducer.SendOneWay(context.Background(), testMsg)
			if err != nil {
				panic(err)
			}
		}
	})
}

func BenchmarkGoMQ(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			_, err := testGomqPubsub.Publish(context.Background(), testPublishArgs)
			if err != nil {
				panic(err)
			}
		}
	})
}
goos: windows
goarch: amd64
pkg: github.com/linfeip/gomq/example
cpu: Intel(R) Core(TM) i7-7700K CPU @ 4.20GHz
BenchmarkRocket
BenchmarkRocket-8   	   77319	     71759 ns/op	    3587 B/op	      61 allocs/op
BenchmarkGoMQ
BenchmarkGoMQ-8     	  201670	     29778 ns/op	    5184 B/op	      94 allocs/op
PASS

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author