首页 > 其他 > 详细

RabbitMQ实现消息的发送和数据同步

时间:2020-08-27 09:48:13      阅读:75      评论:0      收藏:0      [点我收藏+]

引言

    最近参与了一个智慧城市综合管理平台的项目,主要核心业务就是针对视频监控管理统计分析城市车辆的相关信息,涉及到几个平台以及和其他公司合作开发业务的场景,需要对车辆数据进行同步和共享,中间就用到了RabbitMQ的消息中间件,对车辆的大数据进行发送监控和不同系统间的信息同步,下边就简单梳理讲解一下RabbitMQ发送消息的过程和业务。具体的RabbitMQ服务端和客户端的配置再此就不讲解了

  1:车辆大数据的 BMS同步车辆大数据消息实体CLS_VO_Ignite_Message 

package com.tiandy.easy7.core.vo;

import java.util.List;

/**
 * BMS同步车辆大数据消息结构
 */
public class CLS_VO_Ignite_Message {

    private String source;                          //标识消息来源于BMS系统
    private String type;                            //user-用户
    private String operate;                         //add-新增,del-删除,update-更新
    private CLS_VO_Ignite_User user;                //用户信息
    private List<CLS_VO_Ignite_Role> role_list;     //用户的权限信息
    private List<CLS_VO_Ignite_Tollgate> tollgate_list;                 //卡口对应角色清单
    private List<CLS_VO_Ignite_Camera> camera_list;                   //相机对应角色清单
    private List<CLS_VO_AreaParam> area_list;
    private List<CLS_VO_GisCoordinates> gis_list;   //经纬度集合
    private CLS_VO_DomainInfoEx domainInfo;
    public CLS_VO_Ignite_Message(){}

    public CLS_VO_Ignite_Message(String source, String type, String operate, CLS_VO_Ignite_User user, List<CLS_VO_Ignite_Role> role_list, List<CLS_VO_Ignite_Tollgate> tollgate_list, List<CLS_VO_Ignite_Camera> camera_list,List<CLS_VO_AreaParam> area_list) {
        this.source = source;
        this.type = type;
        this.operate = operate;
        this.user = user;
        this.role_list = role_list;
        this.tollgate_list = tollgate_list;
        this.camera_list = camera_list;
        this.area_list = area_list;
    }

    public CLS_VO_Ignite_Message(String source,String type, String operate, List<CLS_VO_GisCoordinates> gis_list) {
        this.source = source;
        this.type = type;
        this.operate = operate;
        this.gis_list = gis_list;
    }
    
    public CLS_VO_Ignite_Message(String source,String type, String operate, CLS_VO_DomainInfoEx domainInfo) {
        this.source = source;
        this.type = type;
        this.operate = operate;
        this.domainInfo = domainInfo;
    }

    public CLS_VO_DomainInfoEx getDomainInfo() {
        return domainInfo;
    }

    public void setDomainInfo(CLS_VO_DomainInfoEx domainInfo) {
        this.domainInfo = domainInfo;
    }

    public String getSource() {
        return source;
    }

    public void setSource(String source) {
        this.source = source;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getOperate() {
        return operate;
    }

    public void setOperate(String operate) {
        this.operate = operate;
    }

    public CLS_VO_Ignite_User getUser() {
        return user;
    }

    public void setUser(CLS_VO_Ignite_User user) {
        this.user = user;
    }

    public List<CLS_VO_Ignite_Role> getRole_list() {
        return role_list;
    }

    public void setRole_list(List<CLS_VO_Ignite_Role> role_list) {
        this.role_list = role_list;
    }

    public List<CLS_VO_Ignite_Tollgate> getTollgate_list() {
        return tollgate_list;
    }

    public void setTollgate_list(List<CLS_VO_Ignite_Tollgate> tollgate_list) {
        this.tollgate_list = tollgate_list;
    }

    public List<CLS_VO_Ignite_Camera> getCamera_list() {
        return camera_list;
    }

    public void setCamera_list(List<CLS_VO_Ignite_Camera> camera_list) {
        this.camera_list = camera_list;
    }

    public List<CLS_VO_AreaParam> getArea_list() {
        return area_list;
    }

    public void setArea_list(List<CLS_VO_AreaParam> area_list) {
        this.area_list = area_list;
    }

    public List<CLS_VO_GisCoordinates> getGis_list() {
        return gis_list;
    }

    public void setGis_list(List<CLS_VO_GisCoordinates> gis_list) {
        this.gis_list = gis_list;
    }
}

2:发送RabbitMQ消息的帮助类RabbitMQClientSend

package com.tiandy.easy7.its.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import cn.jpush.api.utils.StringUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.tiandy.easy7.core.bo.CLS_BO_User;
import easy7.datatype.CLS_Easy7_Error;
import easy7.datatype.CLS_Easy7_Types;
import org.apache.log4j.Logger;

public class CLS_RabbitMQClientSend {

    private static final Logger log = Logger.getLogger(CLS_RabbitMQClientSend.class);

    //rabbitmq连接
    private static Connection connection = null;
    //rabbitmq通道
    private static Channel channel = null ;
    //连接状态标识
    public static boolean connectStatus = false;

    public Connection getConnection() {
        return connection;
    }
    public Channel getChannel() {
        return channel;
    }
    //初始化rabbitmq连接工厂和通道
    public static void initialize(){
        try {
            //连接工厂
            ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory();
            //关闭连接与通道
            closeConnection();
            connection = factory.newConnection();
            channel = connection.createChannel();

            try {
                //声明一个持久化的交换器,名称为BMS_SYNC_EXCHANGE_NAME 类型为FANOUT(广播模式)
                channel.exchangeDeclare("bms_sync_user_auth", BuiltinExchangeType.FANOUT ,true);
                connectStatus = true ;
            } catch (Exception e) {
                connectStatus = false;
                e.printStackTrace();
            }
        } catch (Exception e) {
            connectStatus = false ;
            log.error("CLS_RabbitMQClientSend method initialize error!");
        }
    }

    /**
     * 向消息中间件发送消息(从BMS同步到车辆大数据)
     * @param info 消息
     * @return
     */
    public static int sendMsg(String info) {
        //校验
        if(StringUtils.isEmpty(info) ){
            return CLS_Easy7_Error.ERROR_PARAM;
        }
        try {
            log.debug("CLS_RabbitMQClientSend sendMsg:" + info.getBytes());
            channel.basicPublish("bms_sync_user_auth", "", null, info.getBytes());
            return CLS_Easy7_Error.ERROR_OK;
        } catch (Exception e) {
            log.error("CLS_RabbitMQClientSend method sendMsg error!");
            return CLS_Easy7_Error.ERROR_REQUEST_FAILED;
        }
    }

    //关闭连接
    public static void closeConnection(){
        try {
            if (channel != null) {
                if(channel.isOpen()) {
                    channel.close();
                    channel = null;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            if (connection != null) {
                if(connection.isOpen()) {
                    connection.close();
                    connection = null;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

3:RabbitMQ的核心帮助类RabbitMQUtil

package com.tiandy.easy7.its.rabbitmq;

import javax.annotation.Resource;

import com.tiandy.easy7.core.util.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;

import net.sf.json.JSONObject;

import com.rabbitmq.client.ConnectionFactory;
import com.tiandy.easy7.core.bo.CLS_BO_SystemInfo;
import com.tiandy.easy7.core.po.TabSystemInfo;
import com.tiandy.easy7.core.util.MyApplicationContextUtil;
import com.tiandy.easy7.core.vo.CLS_VO_Result;
import com.tiandy.easy7.face.rabbitmq.CLS_RabbitMQExchangeRecive;

import easy7.datatype.CLS_Easy7_Types;

import java.net.InetAddress;

//MQ工具类
public class CLS_RabbitMQUtil {

    private static final Logger log = LoggerFactory.getLogger(CLS_RabbitMQUtil.class);
    @Resource(name="boSystemInfo")
    private static CLS_BO_SystemInfo boSystemInfo;  
    //构造方法私有
    private CLS_RabbitMQUtil(){
        
    }
    //得到rabbitmq连接工厂
    public static final ConnectionFactory getRabbitMQConnectionFactory(){
        //平台重启从数据库获取rabbitMq的IP
        getRabbitMQIP();
        //当平台第一次使用时,默认为127.0.0.1
        if(null == CLS_Easy7_Types.rabbitmq_host_for_bj || "".equals(CLS_Easy7_Types.rabbitmq_host_for_bj)){
            CLS_Easy7_Types.rabbitmq_host_for_bj = "127.0.0.1";
        }
        RabbitMQConnectionFactory.factory.setHost(CLS_Easy7_Types.rabbitmq_host_for_bj); //127.0.0.1
        return RabbitMQConnectionFactory.factory;
    }
    
    /**
     * 获取RabbitMQ IP地址
     */
    public static void getRabbitMQIP() {
        ApplicationContext ctx = MyApplicationContextUtil.getContext();
        CLS_BO_SystemInfo boSystemInfo = ctx.getBean(CLS_BO_SystemInfo.class);
        CLS_VO_Result result = null;
        JSONObject json = null;
        JSONObject jsonContent = null;
        JSONObject jsonParam = null;
        int ret;
        String rabbitmq_ip = null;
        try {
            //LS_Easy7_Types.EASY7_ACTIVEMQ_SID ="activemq-server-config-2017728";//activemqid
            result = boSystemInfo.getSystemInfo(CLS_Easy7_Types.EASY7_ACTIVEMQ_SID);
            json = JSONObject.fromObject(result);
            ret = json.getInt("ret");
            if(ret == 0) {
                String content = json.getString("content");
                if(content != null && !"".equals(content)) {
                    jsonContent = JSONObject.fromObject(content);
                    String sParam = jsonContent.getString("sParam");
                    if(sParam != null && !"".equals(sParam)) {
                        jsonParam = JSONObject.fromObject(sParam);
                        rabbitmq_ip = jsonParam.getString("rabbitmqIp");
                    }
                }
            }
        } catch (Exception e1) {
            e1.printStackTrace();
            log.error("rabbitmq id is not exist!");
            return;
        }
        if(jsonParam != null) {
            jsonParam.clear();
            jsonParam = null;
        }
        if(jsonContent != null) {
            jsonContent.clear();
            jsonContent = null;
        }
        if(json != null) {
            json.clear();
            json = null;
        }
        
        if(result == null || rabbitmq_ip == null || "".equals(rabbitmq_ip)) {
            log.error("rabbitmq ip is not exist!");
            return;
        }
          "127.0.0.1" = rabbitmq_ip;

        //获取本地服务器IP,作为接受下级权限申请的routinfKey
        try{
            if(null == CLS_Easy7_Types.LOCALHOST_IP || "".equals(CLS_Easy7_Types.LOCALHOST_IP)){
                CLS_Easy7_Types.LOCALHOST_IP = Tools.getLinuxLocalIp();
            }
        }catch (Exception e){
            log.error("get LinuxIp error" + e);
        }
    }
    
    
    //rabbitmq连接工厂单例
    private static class RabbitMQConnectionFactory{
        private static ConnectionFactory factory = new ConnectionFactory(); 
        static{
            try {
               //可以通过properties配置文件配置
                factory.setPort(5673);//MQ端口  
                factory.setUsername(admin);//MQ用户名  
                factory.setPassword(123456);//MQ密码  
                factory.setRequestedHeartbeat(10);//设置心跳 (秒)
                factory.setAutomaticRecoveryEnabled(true);//自动恢复连接
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4:获取需要发送MQ的消息

public CLS_VO_Ignite_Message deleteHostCamera(){
//定义一个空的list模拟数据,具体实际业务中里边肯定是有数据的(需要发现车辆的数据消息) List
<CLS_VO_Ignite_Camera> cameralist = new ArrayList<CLS_VO_Ignite_Camera>(); //定义拼接向Rabbit发送的消息体 (json格式的数据) CLS_VO_Ignite_Message message = new CLS_VO_Ignite_Message(CLS_Easy7_Types.MESSAGE_SOURCE_BMS,CLS_Easy7_Types.MESSAGE_TYPE_CAMERA, CLS_Easy7_Types.MESSAGE_OPERATE_DEL,null,null,null,cameralist,null); return message; }

5:发送消息

public void sendRabbitMQMessage(){
  //如发送的消息:
//{"id":"f6560157-24ae-475b-9736-34072684b008","reviewInfo":"","status":1,"ids":[],"currentUserId":"admin","reviewUser":"admin"}
CLS_VO_Ignite_Message message
=deleteHostCamera(); //消息为空时不发送消息 if(null != message.getSource() && !"".equals(message.getSource())){ int sendResult = CLS_RabbitMQClientSend.sendMsg(JSONObject.fromObject(message).toString()); //同步消息失败,错误代码使用-40,数据库操作回滚 if(sendResult != 0){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); //事务回滚 result.setRet( -40);//请求失败 return result; } } }

其他应用或平台接收到MQ中的消息后,解析JSON数据,转换为对于的数据实体,将数据添加到相应的库中表中即可,这样就完成了消息的发送和数据的同步!

至此,发现消息得过程就完成了!

RabbitMQ实现消息的发送和数据同步

原文:https://www.cnblogs.com/zhaosq/p/13424903.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!