生产者,发送消息的一方,图中左侧的client。
消费者,接收消息的一方,图中后侧的client。
消息中间件的服务节点,一般一个RabbitMQ Broker看成一台RabbitMQ服务器。
消息包含两部分:消息体和标签。消息体(payload)是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。消息的标签用来表述这条消息 , 比如一个交换器的名称和一个路由键。 生产者把消息交由 RabbitMQ, RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者。
连接其实是一条TCP连接,如果是生产者还是消费者都需要和Broker建立连接。
信道是建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
为什么还要引入信道呢?试想这 样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那 么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和 销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 类似 NIO‘ (Non-blocking 1/0) 的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
队列是 RabbitMQ 的内部对象,用于存储消息,当多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询) 给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理。
生产者将消息发送到 Exchange (交换器,通常也 可以用大写的 "X" 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或 许会返回给生产者,或许直接丢弃。
生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey,用 来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。
RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 (BindingKey), 这样 RabbitMQ 就知道如何正确地将消息路由到队列了。
生产者将消息发送给交换器时, 需要一个 RoutingKey, 当 BindingKey 和 RoutingKey 相匹配时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定允许使用相同的 BindingKey。 BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型, 比 如 fanout 类型的交换器就会无视 BindingKey, 而是将消息路由到所有绑定到该交换器的队列中 。
RabbitMQ 常用的交换器类型有 fanout、 direct、 topic、 headers 这四种。
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,相当于广播模式。
把消息路由到那些 BindingKey 和 RoutingKey完全匹配的队列中。
将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中。
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = " routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection();// 创建连接 Channel channel = connection.createChannel(); // 创建信道 //创建一个 type="direct"、持久化的、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 创建一个持久化、非排他的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 将交换器与队列通过路由键绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送一条持久化的消息: Hello World! String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 关闭资源 channel.close(); connection.close(); } } package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitConsumer { private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[] { new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 这里的连接方式与生产者的 demo 略有不同, 注意辨别区别 Connection connection = factory.newConnection(addresses); // 创建连接 final Channel channel = connection.createChannel(); // 创建信道 channel.basicQos(64); // 设置客户端最多接收未被 ack 的消息的个数 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("recv message: " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); // 等待回调函数执行完毕之后, 关闭资源 TimeUnit.SECONDS.sleep(15); channel.close(); connection.close(); } }
原文:https://www.cnblogs.com/lostyears/p/10927131.html