consumers
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"sync"
)
func main() {
testNSQ()
}
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
url := "134.175.118.1:4150"
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=1
//for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("a-test", "a2", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD(url)
if nil != err {
fmt.Println("err", err)
return
}
//}
select{}
}()
waiter.Wait()
}
producer
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"io/ioutil"
"log"
"strconv"
)
var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)
var producer *nsq.Producer
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:")
return err
}
return nil
}
func main() {
err := initProducer("134.175.118.1:4150")
if err != nil {
fmt.Printf("create producer failed, err:")
return
}
// 2. 生产者ping
errPing := producer.Ping()
if errPing != nil {
log.Fatalln("fail to ping 127.0.0.1:4151", errPing)
}
// 3. 设置不输出info级别的日志
producer.SetLogger(nullLogger, nsq.LogLevelInfo)
// 4. 生产者发布消息
for i := 0; i < 100; i ++ {
message := "msg index "+ strconv.Itoa(i + 10000) + "六大件";
err2 := producer.Publish("a-test", []byte(message))
if err2 != nil {
log.Panic("Producer could not publish!")
}
}
fmt.Println("iver ")
// 5. 生产者停止执行
producer.Stop()
}
原文:https://www.cnblogs.com/dennylau/p/12675675.html