RabbitMqBase
消息队列小结虽然这个消息队列我只是知道应用场景在哪里,还没有实际操作到,但是原理还是要知道的。这些知识点就像珍珠,万一哪天就用到了。在没具体学习之前,我一直在想: 这玩意不就是个队列吗。还能玩出花来?结果,一研究,还真的打脸了。
首先,这玩意用的是AMQP
协议,并且只是占用了一个tcp
连接,然后就会问到,多个消息,那不得卡死啊,不。他用了channel
,这个东西可以有多个,可以理解为他开辟了线程,这样就避免了网络连接的资源浪费。
消息 => 交换机 => 通过绑定的key
=> 队列 => 消费者
交换机发布消息的方式有
direct
: 消息一根肠子到队列fanout
: 交换机发布消息,无需指定key
,绑定在交换机上面的队列就会知道消息topic
: 交换机发布消息根据规则匹配到满足规则的key
上, 和正则匹配差不多一个意思对于我们想发布一个定时任务,我们不想让这个任务占用我们自己的电脑资源,就要发布完任务,然后就关电脑睡觉。这时候,我们就需要一个mq
服务器,把消息塞进去,首先想的就是让消息自己根据时间戳排序,不好意思,想多了,很难实现。那就换一种思路,消息设置过期时间,然后就把消息发到别的转换机上,再去发到指定队列上,任务监听队列就好了。
代码实现
发布消息send.go
package dead_q
import (
"github.com/streadway/amqp"
"go-mq/conf"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func Send() {
url := conf.Host
conn, err := amqp.Dial(url)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
body := "dead-last"
// 将消息发送到延时队列上
err = ch.Publish(
"delay-exchange", // exchange 这里为空则不选择 exchange
"", // routing key
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Expiration: "5000", // 设置五秒的过期时间
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
第一个交换机为了设置超时时间和下一个交换机rec.go
package rec
import (
"github.com/streadway/amqp"
"go-mq/conf"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func Rec() {
url := conf.Host
conn, err := amqp.Dial(url)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个主要使用的 exchange
err = ch.ExchangeDeclare(
"delay-exchange", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
/**
* 注意,这里是重点!!!!!
* 声明一个延时队列, ?我们的延时消息就是要发送到这里
*/
q, errDelay := ch.QueueDeclare(
"delay-qune", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
// 当消息过期时把消息发送到 logs 这个 exchange
"x-dead-letter-exchange":"dead-exchange",
}, // arguments
)
failOnError(errDelay, "Failed to declare a delay_queue")
err = ch.QueueBind(
q.Name, // queue name, 这里指的是 test_logs
"", // routing key
"delay-exchange", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 这里监听的是 test_logs
msgs, err := ch.Consume(
q.Name, // queue name, 这里指的是 test_logs
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
最后的队列用于任务消费dedrec.go
package dead_rec
import (
"github.com/streadway/amqp"
"go-mq/conf"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func Rec() {
url := conf.Host
conn, err := amqp.Dial(url)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个主要使用的 exchange
err = ch.ExchangeDeclare(
"dead-exchange", // name
amqp.ExchangeFanout, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
/**
* 注意,这里是重点!!!!!
* 声明一个延时队列, ?我们的延时消息就是要发送到这里
*/
arg2 := make(map[string]interface{})
arg2["x-dead-letter-exchange"] = "dead-q"
arg2["x-max-length"] = 3
delayQune, _ := ch.QueueDeclare(
"dead-qune", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
arg2, // arguments
)
err = ch.QueueBind(
delayQune.Name, // queue name, 这里指的是 test_logs
"", // routing key
"dead-exchange", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 这里监听的是 test_logs
msgs, err := ch.Consume(
delayQune.Name, // queue name, 这里指的是 test_logs
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
消费者公平消费
if err := ch.Qos(1, 0, false); err != nil {
failOnError(err, "Failed to set QoS")
}
设置队列属性
arg2 := make(map[string]interface{})
arg2["x-dead-letter-exchange"] = "dead-q"
arg2["x-max-length"] = 3
delayQune, _ := ch.QueueDeclare(
"dead-qune", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
arg2, // arguments
)
原文:https://www.cnblogs.com/maomaomaoge/p/14129435.html