集群部署
RocketMq的最佳实践
Producer
1 import com.alibaba.fastjson.JSON; 2 import com.alibaba.fastjson.JSONArray; 3 import org.apache.rocketmq.client.exception.MQClientException; 4 import org.apache.rocketmq.client.producer.DefaultMQProducer; 5 import org.apache.rocketmq.client.producer.SendCallback; 6 import org.apache.rocketmq.client.producer.SendResult; 7 import org.apache.rocketmq.common.message.Message; 8 9 import java.io.ByteArrayInputStream; 10 import java.io.ObjectInputStream; 11 import java.io.Serializable; 12 import java.util.ArrayList; 13 import java.util.HashMap; 14 import java.util.List; 15 import java.util.Map; 16 17 /** 18 * @Author 18011618 19 * @Date 10:41 2018/7/17 20 * @Function 消息生产者 21 * 22 * Producer最佳最佳实践 23 * 1.?个应?尽可能??个Topic,消息?类型?tags来标识,tags可以由应??由设置。只有发送消息设置了tags,消费?在订阅消息时,才可以利?tags在broker做消息过滤。(完成) 24 * 2.每个消息在业务层?的唯?标识码,要设置到keys字段,?便将来定位消息丢失问题。(完成) 25 * 3.消息发送成功或者失败,要打印消息?志,务必打印sendResult和key字段(完成) 26 * 4.对于消息不可丢失应?,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者??触发重发。 27 * 5.某些应?如果不关注消息是否发送成功,请直接使?sendOneWay?法发送消息。 28 */ 29 public class Producer { 30 public static final List<Map<String,Object>> memList=new ArrayList<>(); 31 public static void main(String[] args) throws MQClientException { 32 DefaultMQProducer producer = new DefaultMQProducer("test-group"); 33 producer.setNamesrvAddr("192.168.83.128:9876"); 34 producer.setInstanceName("rmq-instance"); 35 36 producer.start(); 37 try { 38 for (int i = 0; i < 100; i++) { 39 User user = new User(); 40 user.setId((int)(Math.random()*100)); 41 user.setLoginName("abc" + i); 42 user.setPwd(String.valueOf(i)); 43 Message message = new Message("log-topic", "user-tag", JSON.toJSONString(user).getBytes()); 44 //System.out.println("生产者发送消息:" + JSON.toJSONString(user)); 45 //异步发送 46 producer.send(message, new SendCallback() { 47 @Override 48 public void onSuccess(SendResult sendResult) { 49 System.out.println("发送消息成功!result is : " + user.toString()); 50 } 51 52 @Override 53 public void onException(Throwable throwable) { 54 throwable.printStackTrace(); 55 //消息发送失败,存进数据库以便日后手动重新发送(模拟) 56 Map<String,Object> memMap=new HashMap(); 57 memMap.put("topic","log-topic"); 58 memMap.put("tag","user-tag"); 59 memMap.put("body",JSON.toJSONString(user).getBytes()); 60 memList.add(memMap); 61 System.out.println("发送消息失败!result is : " + JSON.toJSONString(new String(message.getBody()))); 62 } 63 }); 64 } 65 } catch (Exception e) { 66 e.printStackTrace(); 67 } 68 producer.shutdown(); 69 } 70 71 /** 72 * byte转对象 73 * @param bytes 74 * @return 75 */ 76 private static Object ByteToObject(byte[] bytes) { 77 Object obj = null; 78 try { 79 // bytearray to object 80 ByteArrayInputStream bi = new ByteArrayInputStream(bytes); 81 ObjectInputStream oi = new ObjectInputStream(bi); 82 83 84 obj = oi.readObject(); 85 bi.close(); 86 oi.close(); 87 } catch (Exception e) { 88 System.out.println("translation" + e.getMessage()); 89 e.printStackTrace(); 90 } 91 return obj; 92 } 93 /** 94 * 发送用户消息 95 */ 96 static class User implements Serializable { 97 private String loginName; 98 private String pwd; 99 private int id; 100 101 public int getId() { 102 return id; 103 } 104 105 public void setId(int id) { 106 this.id = id; 107 } 108 109 public String getLoginName() { 110 return loginName; 111 } 112 113 public void setLoginName(String loginName) { 114 this.loginName = loginName; 115 } 116 117 public String getPwd() { 118 return pwd; 119 } 120 121 public void setPwd(String pwd) { 122 this.pwd = pwd; 123 } 124 125 @Override 126 public String toString() { 127 return "User{" + 128 "loginName=‘" + loginName + ‘\‘‘ + 129 ", pwd=‘" + pwd + ‘\‘‘ + 130 ", id=" + id + 131 ‘}‘; 132 } 133 } 134 }
Consumer
1 import com.alibaba.fastjson.JSONObject; 2 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 5 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 6 import org.apache.rocketmq.client.exception.MQClientException; 7 import org.apache.rocketmq.common.message.MessageExt; 8 9 import java.util.HashSet; 10 import java.util.List; 11 import java.util.Set; 12 13 /** 14 * Consumer最佳实践 15 * 1.消费过程要做到幂等 16 * 2.尽量使?批量?式消费,可以很?程度上提?消费吞吐量。 17 * 3.优化每条消息的消费过程 18 */ 19 public class Consumer { 20 21 public static void main(String[] args) throws MQClientException { 22 Set<Integer> set=new HashSet<>(); 23 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group"); 24 25 consumer.setNamesrvAddr("192.168.83.128:9876"); 26 consumer.setInstanceName("rmq-instance"); 27 //设置topic和Tag 28 consumer.subscribe("log-topic", "user-tag"); 29 //从RocketMq拿到数据后执行的操作 30 consumer.registerMessageListener(new MessageListenerConcurrently() { 31 public ConsumeConcurrentlyStatus consumeMessage( 32 List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 33 System.out.println("msgs.size为:"+msgs.size()); 34 for (MessageExt msg : msgs) { 35 JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody())); 36 boolean addResult = set.add(jsonObject.getIntValue("id")); 37 if(!addResult){ 38 System.out.println("重复数据,id为"+jsonObject.getIntValue("id")); 39 } 40 System.out.println("消费者消费数据:"+new String(msg.getBody())); 41 } 42 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 43 } 44 }); 45 consumer.start(); 46 } 47 }
原文:https://www.cnblogs.com/zhujianqiang/p/14871124.html