首页 > 其他 > 详细

RabbitMQ学习笔记之五种模式及消息确认机制

时间:2019-11-12 16:53:05      阅读:96      评论:0      收藏:0      [点我收藏+]

本文详细介绍简单模式Simple、工作模式Work、发布订阅模式Publish/Subscribe、Topic、Routing。

Maven依赖引用

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>

连接RabbitMQ服务公用方法

package com.test.testboot.mq;


import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {
    /**
     * 获取MQ的连接
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConection() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置服务端口号
        factory.setPort(5672);
        //设置Host
        factory.setVirtualHost("/");
        //设置用户名
        factory.setUsername("");
        //设置密码
        factory.setPassword("123456");
        return factory.newConnection() ;
    }
}

模式1:简单队列模式(Simple)

消息生产者p将消息放入队列
消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除
(隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复)

应用场景:聊天室

生产者

package com.test.testboot.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Sender {

    private static String QUEUE_NAME="test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection=ConnectionUtil.getConection();
        //获取一个通道
        Channel channel=connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String msg="Hello World!";
        System.out.println("sendMsg:"+msg);
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

        channel.close();
        connection.close();


    }
}

消费者

package com.test.testboot.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv {
    private static String QUEUE_NAME="test_simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection=ConnectionUtil.getConection();
        //创建通道
        Channel channel = connection.createChannel();
        //定义队列消费者
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            //获取到达的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recvMsg = new String(body, "UTF-8");
                System.out.println("recvMsg:" + recvMsg);
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);;
        //老版本api
      /*  QueueingConsumer comsumer= new QueueingConsumer(channel);

         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,comsumer);

        while(true){
            QueueingConsumer.Delivery delivery=comsumer.nextDelivery();
            String recvMsg=new String(delivery.getBody());
            System.out.println("recvMsg:"+recvMsg);
        }*/

    }
}

模式2:工作模式(Work)

生产者将消息放入队列
多个消费者同时监听同一个队列,消息如何被消费?(与具体的分发方式有关系)
C1,C2共同争抢当前消息队列的内容,谁先拿到消息,谁来负责消费
应用场景:红包;大型项目中的资源调度过程(直接由最空闲的系统争抢到资源处理任务)

轮询分发(Round-Robin)
生产者

package com.test.testboot.mq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    /*
     *               |--C1
     * p-----Queue---|
     *               |--C2
     */

    private static final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明队列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        for (int i = 0; i <10 ; i++) {
            String  msg="Hello"+i;
            System.out.println("[WorkQueue] Send :"+msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*10);
        }
        channel.close();
        conection.close();
    }

}

消费者1

package com.test.testboot.mq.work;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明队列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("Recv[1] recv msg:"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("Recv[1] down");
                }
            }
        };
        /*
            true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.
            false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
            如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,
            直到该消费者反馈.
        */
        boolean autoAck=true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

package com.test.testboot.mq.work;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明队列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("Recv[2] recv msg:"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("Recv[2] down");
                }
            }
        };
        boolean autoAck=true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

公平分发(Work Fair)
生产者

package com.test.testboot.mq.workfair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    /*
     *               |--C1
     * p-----Queue---|
     *               |--C2
     */

    private static final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
        //限制发送给同一个消费者不得超过一条消息
        channel.basicQos(1);

        //声明队列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        for (int i = 0; i <10 ; i++) {
            String  msg="Hello"+i;
            System.out.println("[WorkQueue] Send :"+msg);
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            Thread.sleep(i*5);
        }
        channel.close();
        conection.close();
    }

}

消费者1

package com.test.testboot.mq.workfair;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明队列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //每次只分发一个
        channel.basicQos(1);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("Recv[1] recv msg:"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("Recv[1] down");
                    //手动回执
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false; //自动应答
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

package com.test.testboot.mq.workfair;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
       final Channel channel = conection.createChannel();
        channel.basicQos(1);
        //声明队列

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("Recv[2] recv msg:"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("Recv[2] down");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;//自动应答
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

模式三:订阅模式(publish/subscribe)

生产者将消息交给交换机
有交换机根据发布订阅的模式设定将消息同步到所有的绑定队列中;
后端的消费者都能拿到消息

应用场景:短信、邮件群发,群聊天,广告

生产者

package com.test.testboot.mq.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    /*
     *                           |--C1
     * p---exchange----Queue-----|
     *                           |--C2
     */

    private static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明交换机
       channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        String  msg="Hello";
        System.out.println("Send :"+msg);
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());


        channel.close();
        conection.close();
    }

}

消费者1

package com.test.testboot.mq.ps;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME="test_queue_fanout_email";
    private static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //保证一次只分发一个
        channel.basicQos(1);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("[1] recv msg:"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] down");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

package com.test.testboot.mq.ps;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME="test_queue_fanout_sms";
    private static final String EXCHANGE_NAME="test_exchange_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();

        //获取Channel
        Channel channel = conection.createChannel();

        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //保证每次只分发一个
        channel.basicQos(1);


        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("[2]recv msg:"+msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] down");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

模式四:路由模式(Routing)

生产者发送消息到交换机,同时绑定一个路由Key,交换机根据路由key对下游绑定的队列进行路
由key的判断,满足路由key的队列才会接收到消息,消费者消费消息

应用场景:?项目中的error报错

生产者

package com.test.testboot.mq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @description: 路由模式生产者
 * @author: Mr.ADiao
 * @create: 2019-10-21 11:04
 **/
public class Send {
    private  static final String EXCHANGE_NAME="test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conection = ConnectionUtil.getConection();
        //创建频道
        Channel channel = conection.createChannel();

        channel.basicQos(1);

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String msg="Hello Routing";
        String routingKey="warning";

        channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
        System.out.println("send:"+msg);
        channel.close();
        conection.close();
    }
}

消费者1

package com.test.testboot.mq.routing;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Recv1 {
    private static final String  EXCHANGE_NAME="test_exchange_direct";
    private static final String QUEUE_NAME="test_queue_direct1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");


        channel.basicQos(1);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("[1] recv msg:"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] down");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }
}

消费者2

package com.test.testboot.mq.routing;

import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @description: 路由模式消费者2
 * @author: Mr.ADiao
 * @create: 2019-10-21 11:05
 **/
public class Recv2 {
    private static final String  EXCHANGE_NAME="test_exchange_direct";
    private static final String QUEUE_NAME="test_queue_direct2";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);


        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        channel.basicQos(1);

        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //消息到达触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String  msg=new String(body,"UTF-8");

                System.out.println("[2] recv msg:"+msg);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] down");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }
}

模式五:主题模式(Topics)

topic模式也称为通配符模式,其实他相对于routing模式最大的好处就是他多了一种匹配模式的路由,怎么理解匹配呢,其实就相当于我们之前正则的.*这种,不过他的匹配机制可能不是这种(其实除了匹配规则外,他的作用就和routing模式一样?)

匹配规则:

绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:?
①*(星号)仅代表一个单词?
②#(井号)代表任意个单词

示例:

.apple. :? 匹配以 任意一个单词字符开头中间包含 .orange.?以任意一个单词字符结尾 的字符串。比如?a.apple.b, asd.apple.qewf?等(注意是一个单词)。

log.# :只要一lay.开头的都匹配,他可以匹配product.a,product.a.b, product.b.c等。

生产者

package com.adiao.rabbitmq.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private static final String EXCHANGE_NAME="test_exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        
        channel.basicQos(1);
        
        String msg="This is Topic Msg";
        
        channel.basicPublish(EXCHANGE_NAME, "myTopic.key2", null, msg.getBytes());
        
        System.out.println("Send:"+msg);
        
        channel.close();
        
        connection.close();
    }

}

消费者1

package com.adiao.rabbitmq.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv1 {
    private static final String EXCHANGE_NAME="test_exchange_topic";
    private static final String QUEUE_NAME="test_queue_topic1";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "myTopic.#");
        
        channel.basicQos(1);
        
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                
                System.out.println("[1] Recv:"+new String(body,"UTF-8"));
                
                //手动回执
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}

消费者2

package com.adiao.rabbitmq.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv2 {
    
    
    private static final String EXCHANGE_NAME="test_exchange_topic";
    private static final String QUEUE_NAME="test_queue_topic2";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "myTopic.key1");
        
        channel.basicQos(1);
        
        DefaultConsumer consumer = new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                
                System.out.println("[2]Recv:"+new String(body,"UTF-8"));
                
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}

消息确认机制之事务机制
AMQP协议自带机制

弊端:降低RabibtMQ的吞吐量

生产者

package com.adiao.rabbitmq.tx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private static final String QUEUE_NAME="test_queue_tx";
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        String msg="This is tx";
        
        
        try {
            //开启事务
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            int i=1/0;
            System.out.println("Send"+msg);
            //提交事务
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
            //回滚事务
            channel.txRollback();
            System.out.println("rollback");
        }
        channel.close();
        connection.close();
        
    }

}

消费者

package com.adiao.rabbitmq.tx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv {

    private static final String QUEUE_NAME = "test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                
                System.out.println("Recv:"+new String(body,"UTF-8"));
            }
        });
    }

}

消息确认机制之Confirm同步确认
生产者

package com.adiao.rabbitmq.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
    private static final String QUEUE_NAME="test_queue_confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //单条开启
        channel.confirmSelect();
        
        String msg="This is Confirm";
        
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        
        if(!channel.waitForConfirms()){
            System.out.println("Send Faile");   
        }else{
            System.out.println("Send Success"); 
        }
        
        channel.close();
        connection.close();
    }

}

消费者

package com.adiao.rabbitmq.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv {

    private static final String QUEUE_NAME = "test_queue_confirm";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Recv:" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}

批量确认消费者

package com.adiao.rabbitmq.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendManyMsg {
    private static final String QUEUE_NAME="test_queue_confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        //单条开启
        channel.confirmSelect();
        
        String msg="This is Confirm";
        for (int i = 0; i < 10; i++) {
            if(i==5){
                int j=i/0;
            }
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        
        channel.waitForConfirmsOrDie();
        System.out.println("全部发送完成");
        channel.close();
        connection.close();
    }

}

异步确认消费者

package com.adiao.rabbitmq.confirm;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class SendAsyn {

    private static final String QUEUE_NAME="test_queue_confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("未确认消息标识:"+deliveryTag);
                
            }
            
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println(String.format("已确认消息标识:%d (%b)", deliveryTag,multiple));
            }
        });
        for (int i = 0; i <10; i++) {
            String msg=new Date().getTime()+ ":This is Asyn Confirm";
            System.out.println("Send:"+msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(1000);
            
        }
        
        
        
        channel.close();
        
        connection.close();
    }

}

RabbitMQ学习笔记之五种模式及消息确认机制

原文:https://www.cnblogs.com/mradiao/p/11842880.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!