在之前内容中我们通过一个队列实现了消息的发送跟接收。接下来我们创建工作队列(Work Queue),用于在多个工作者之间分配耗时的任务
工作队列(任务队列)背后的核心主要是避免立即执行资源密集型的任务,必须等待其工作完成。我们将任务封装为消息后将其发送到队列,后台的工作进程将弹出任务并最终执行,当我们运行很多Worker时候,任务将在它们之间共享
为了确保消息不会丢失,RabbitMQ支持消息确认,消费者消费了一个消息之后会发送一个ack给RabbitMQ,这样RabbitMQ就可以删除掉这个消息
如果一个消费者异常(通道关闭或链接关闭或TCP链接丢失)没有发送ACK给rabbitMQ,rabbitMQ会将该消息重新放入队列当中。此时如果有其他消费者在线,rabbitMQ会重新将该消息再次投递到另一个消费者
msgs,err := ch.Consume(
q.Name,
"",
false,//将autoAck设置为false,则需要在消费者每次消费完成
// 消息的时候调用d.Ack(false)来告诉RabbitMQ该消息已经消费
false,
false,
false,
nil,
)
FailError(err,"Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
//multiple为true的时候:此次交付和之前没有确认的交付都会在通过同一个通道交付,这在批量处理的时候很有用
//为false的时候只交付本次。只有该方法执行了,RabbitMQ收到该确认才会将消息删除
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
使用以上设置后,我们可以保证即使worker在执行任务的时候意外退出也不会丢失消息。在worker意外退出的不久之后消息将会被重新投递。确认ack必须使用接收到消息的通道,如果使用不同的通道将会导致一个通道协议异常
忘记确认ack
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
hello 0 1
我们已经知道如何确保即使消费者意外退出的情况下保证任务不会丢失。但是如果RabbitMQ服务停止的话任务还是会丢失。当RabbitMQ退出或异常的时候,它将会丢失队列和消息,除非你设置RabbitMQ的两个地方:将队列和消息进行标记为持久的
首先设置队列durable为true
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
RabbitMQ不允许使用不同参数重新定义一个已经存在的队列,所以队列已经存在的话修改了上面的配置后运行程序是不会改变已经存在的队列的
然后设置消息为持久化存储:
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
注意:设置消息持久化并不能保证消息不会丢失,因为仍然有一小段时间片处于RabbitMQ收到消息但是还没保存,它可能只是保存在内存当中。但是已经满足我们的基本使用,如果你需要强保证的话可以使用publisher confirms
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
注意:消费者必须要设置,生产者不用设置
func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failError(err,"send:Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
failError(err,"Failed to open a channel")
defer ch.Close()
q,err := ch.QueueDeclare(
"task_queue",
true,// 设置为true之后RabbitMQ将永远不会丢失队列,否则重启或异常退出的时候会丢失
false,
false,
false,
nil,
)
failError(err,"Failed to declare a queue")
fmt.Println(q.Name)
body := bodyFrom(os.Args)
//生产者将消息发送到默认交换器中,不是发送到队列中
ch.Publish(
"",//默认交换器
q.Name,//使用队列的名字来当作route-key是因为声明的每一个队列都有一个隐式路由到默认交换器
false,
false,
amqp.Publishing{
DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
Body:[]byte(body),
})
failError(err,"Failed to publish a message")
log.Printf(" [x] Sent %s",body)
}
func bodyFrom(args []string)string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "hello"
}else {
s = strings.Join(args[1:]," ")
}
return s
}
func failError(err error,msg string) {
if err != nil {
log.Fatal("%s : %s",msg,err)
}
}
func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailError1(err,"receive:Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
FailError1(err,"receive:Failed to open a channel")
defer ch.Close()
q,err := ch.QueueDeclare(
"task_queue",
true,
false,
false,
false,
nil,
)
err = ch.Qos(
1, //// 在没有返回ack之前,最多只接收1个消息
0,
false,
)
FailError1(err,"Failed to set Qos")
msgs,err := ch.Consume(
q.Name,
"",
false,//将autoAck设置为false,则需要在消费者每次消费完成
// 消息的时候调用d.Ack(false)来告诉RabbitMQ该消息已经消费
false,
false,
false,
nil,
)
FailError1(err,"Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
fmt.Println()
time.Sleep(t * time.Second)
log.Printf("Done")
//multiple为true的时候:此次交付和之前没有确认的交付都会在通过同一个通道交付,这在批量处理的时候很有用
//为false的时候只交付本次。只有该方法执行了,RabbitMQ收到该确认才会将消息删除
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func FailError1(err error,msg string) {
if err != nil {
log.Fatal("%s : %s",msg,err)
}
}
原文:https://www.cnblogs.com/develop-SZT/p/10706561.html