Categorygithub.com/chinagocoder/go-queue
module
0.2.0
Repository: https://github.com/chinagocoder/go-queue.git
Documentation: pkg.go.dev

# README

go-queue

Pulsar Pub/Sub framework. Reference: https://github.com/zeromicro/go-queue

installation

go get -u github.com/chinagocoder/go-queue

pulsar

Pulsar Pub/Sub framework

consumer example

config.yaml

Name: pulsar
Brokers:
  - 127.0.0.1:6650
Topic: topic_message
Conns: 1
Processors: 1
SubscriptionName: topic_message

consumer code

package main

import (
	"fmt"
	"github.com/chinagocoder/go-queue/pq"
	"github.com/zeromicro/go-zero/core/conf"
)

func main() {
	var c pq.Conf
	conf.MustLoad("config.yaml", &c)

	fmt.Println(c)

	q := pq.MustNewQueue()
	q.AddQueue(c, pq.WithHandle(func(k, v string) error {
		fmt.Printf("=> %s\n", v)
		return nil
	}))
	defer q.Stop()
	q.Start()
}

producer code

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/chinagocoder/go-queue/pq"
	"github.com/zeromicro/go-zero/core/cmdline"
	"math/rand"
	"strconv"
	"time"
)

type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}

func main() {
	pusher := pq.NewPusher(
		[]string{
			"127.0.0.1:6650",
		}, "topic_message",
	)

	ticker := time.NewTicker(time.Millisecond)
	for round := 0; round < 3; round++ {
		<-ticker.C

		count := rand.Intn(100)
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, _ := json.Marshal(m)

		fmt.Println(string(body))

		t := time.Now().Add(10 * time.Second)

		if _, err := pusher.Push(
			context.Background(), []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), body, pq.WithDeliverAt(t),
		); err != nil {
			fmt.Println(err)
		}
	}

	cmdline.EnterToContinue()
}

# Packages

No description provided by the author
No description provided by the author