生产者:
public class SendLogDirect { private static final String EXCHANGE_NAME="exchange_demo"; private static final String ROUTING_KEY="routingkey_demo"; private static final String QUEUE_NAME="queue_demo"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 声明转发器和类型 可用的转发器类型Direct Topic Headers Fanout * Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 * Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。 * 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 * Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。 * 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。 */ channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); /** * 指定队列 * queue: 队列名称 * durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化, * 保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库 * exclusive:是否排外的,有两个作用, * 1:当连接关闭时connection.close()该队列是否会自动删除; * 2:该队列是否是私有的private, * 如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题, * 如果是排外的,会对当前队列加锁,其他通道channel是不能访问的, * 如果强制访问会报异常,一般等于true的话用于一个队列只能有一个消费者来消费的场景 * autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除, * 可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments:队列中的消息什么时候会自动被删除? */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); /** * 绑定交换器和队列 */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); String msg = "hello word11"; /** * exchange:转发器 * routingKey:指定routingKey * props:消息为持久化 —— MessageProperties.PERSISTENT_TEXT_PLAIN * body:msg字节 */ channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); System.out.println("send msg:"+msg); channel.close(); connection.close(); } }
消费者:
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME="exchange_demo"; private static final String ROUTING_KEY="routingkey_demo"; private static final String QUEUE_NAME="queue_demo"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); /** * 公平转发,设置客户端最多接收未被ack的消息的个数,只有在消费者空闲的时候会发送下一条信息,同一时间每次发给一个消息给一个worker。 * 一个生产者与多个消费者时,避免RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。 */ channel.basicQos(1); /** * 实现消费者 */ Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("recv msg:"+new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } /** * 发送应答 */ channel.basicAck(envelope.getDeliveryTag(), false); } }; /** * 为队列指定消费者 * queue: 队列 * ack: * true:Round-robin 转发 消费者被杀死,消息会丢失 * false:消息应答 ,为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。 * 消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。 * 如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。 * 通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。 * 消费者需要耗费特别特别长的时间是允许的。 * consumer:消费者 */ channel.basicConsume(QUEUE_NAME, false,consumer); //等待回调函数执行完毕之后,关闭资源 TimeUnit.SECONDS.sleep(5); /** * 不关闭channel、connection可以实现持续监听 */ // channel.close(); // connection.close(); } }
原文:https://www.cnblogs.com/yifanSJ/p/8988331.html