RabbitMQ是一个消息代理,用来负责接收和转发消息。
生产者,消费者,代理可以驻留在不同主机或同一主机,一个应用可以是生产者也可以是消费者
接下来我们来实现RabbitMQ的“Hello World”,生产者将“Hello World”发送进队列中,消费者将其接收并打印
**go get github.com/streadway/amqp
连接RabbitMQ
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
RabbitMQ的连接已经为我们抽象了socket的连接,同时为我们处理了协议版本号和身份认证等等
创建通道
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
在使用其他API完成任务的时候我们首先通过以上方式创建通道
在开始发送消息之前我们首先应该声明一个队列。声明队列之后我们就可以将消息发送至队列当中
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatal(err)
}
队列的声明是一个幂等性操作,如果不存在该队列的话则会创建。此处注意,如果队列存在,修改了队列参数并不会影响已经存在的队列,并且会返回错误。消息内容是一个字节数组,所以我们必须进行编码
连接,创建通道,队列
在接收端我们同样需要像发送端一样连接RabbitMQ,创建通道后再创建队列,注意此处队列的创建是跟发送端的队列完全匹配的。队列在接收端也创建是因为我们接收端有可能比发送端先启动,所以为了保证我们要消费的队列存在我们在此处也进行创建
消费消息
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err)
}
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
使用通道消费队列中的消息,当队列有消息的时候将会异步的推送给我们
原文:https://www.cnblogs.com/develop-SZT/p/10706553.html