引言
最近参与了一个智慧城市综合管理平台的项目,主要核心业务就是针对视频监控管理统计分析城市车辆的相关信息,涉及到几个平台以及和其他公司合作开发业务的场景,需要对车辆数据进行同步和共享,中间就用到了RabbitMQ的消息中间件,对车辆的大数据进行发送监控和不同系统间的信息同步,下边就简单梳理讲解一下RabbitMQ发送消息的过程和业务。具体的RabbitMQ服务端和客户端的配置再此就不讲解了
1:车辆大数据的 BMS同步车辆大数据消息实体CLS_VO_Ignite_Message
package com.tiandy.easy7.core.vo; import java.util.List; /** * BMS同步车辆大数据消息结构 */ public class CLS_VO_Ignite_Message { private String source; //标识消息来源于BMS系统 private String type; //user-用户 private String operate; //add-新增,del-删除,update-更新 private CLS_VO_Ignite_User user; //用户信息 private List<CLS_VO_Ignite_Role> role_list; //用户的权限信息 private List<CLS_VO_Ignite_Tollgate> tollgate_list; //卡口对应角色清单 private List<CLS_VO_Ignite_Camera> camera_list; //相机对应角色清单 private List<CLS_VO_AreaParam> area_list; private List<CLS_VO_GisCoordinates> gis_list; //经纬度集合 private CLS_VO_DomainInfoEx domainInfo; public CLS_VO_Ignite_Message(){} public CLS_VO_Ignite_Message(String source, String type, String operate, CLS_VO_Ignite_User user, List<CLS_VO_Ignite_Role> role_list, List<CLS_VO_Ignite_Tollgate> tollgate_list, List<CLS_VO_Ignite_Camera> camera_list,List<CLS_VO_AreaParam> area_list) { this.source = source; this.type = type; this.operate = operate; this.user = user; this.role_list = role_list; this.tollgate_list = tollgate_list; this.camera_list = camera_list; this.area_list = area_list; } public CLS_VO_Ignite_Message(String source,String type, String operate, List<CLS_VO_GisCoordinates> gis_list) { this.source = source; this.type = type; this.operate = operate; this.gis_list = gis_list; } public CLS_VO_Ignite_Message(String source,String type, String operate, CLS_VO_DomainInfoEx domainInfo) { this.source = source; this.type = type; this.operate = operate; this.domainInfo = domainInfo; } public CLS_VO_DomainInfoEx getDomainInfo() { return domainInfo; } public void setDomainInfo(CLS_VO_DomainInfoEx domainInfo) { this.domainInfo = domainInfo; } public String getSource() { return source; } public void setSource(String source) { this.source = source; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getOperate() { return operate; } public void setOperate(String operate) { this.operate = operate; } public CLS_VO_Ignite_User getUser() { return user; } public void setUser(CLS_VO_Ignite_User user) { this.user = user; } public List<CLS_VO_Ignite_Role> getRole_list() { return role_list; } public void setRole_list(List<CLS_VO_Ignite_Role> role_list) { this.role_list = role_list; } public List<CLS_VO_Ignite_Tollgate> getTollgate_list() { return tollgate_list; } public void setTollgate_list(List<CLS_VO_Ignite_Tollgate> tollgate_list) { this.tollgate_list = tollgate_list; } public List<CLS_VO_Ignite_Camera> getCamera_list() { return camera_list; } public void setCamera_list(List<CLS_VO_Ignite_Camera> camera_list) { this.camera_list = camera_list; } public List<CLS_VO_AreaParam> getArea_list() { return area_list; } public void setArea_list(List<CLS_VO_AreaParam> area_list) { this.area_list = area_list; } public List<CLS_VO_GisCoordinates> getGis_list() { return gis_list; } public void setGis_list(List<CLS_VO_GisCoordinates> gis_list) { this.gis_list = gis_list; } }
2:发送RabbitMQ消息的帮助类RabbitMQClientSend
package com.tiandy.easy7.its.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import cn.jpush.api.utils.StringUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.tiandy.easy7.core.bo.CLS_BO_User; import easy7.datatype.CLS_Easy7_Error; import easy7.datatype.CLS_Easy7_Types; import org.apache.log4j.Logger; public class CLS_RabbitMQClientSend { private static final Logger log = Logger.getLogger(CLS_RabbitMQClientSend.class); //rabbitmq连接 private static Connection connection = null; //rabbitmq通道 private static Channel channel = null ; //连接状态标识 public static boolean connectStatus = false; public Connection getConnection() { return connection; } public Channel getChannel() { return channel; } //初始化rabbitmq连接工厂和通道 public static void initialize(){ try { //连接工厂 ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory(); //关闭连接与通道 closeConnection(); connection = factory.newConnection(); channel = connection.createChannel(); try { //声明一个持久化的交换器,名称为BMS_SYNC_EXCHANGE_NAME 类型为FANOUT(广播模式) channel.exchangeDeclare("bms_sync_user_auth", BuiltinExchangeType.FANOUT ,true); connectStatus = true ; } catch (Exception e) { connectStatus = false; e.printStackTrace(); } } catch (Exception e) { connectStatus = false ; log.error("CLS_RabbitMQClientSend method initialize error!"); } } /** * 向消息中间件发送消息(从BMS同步到车辆大数据) * @param info 消息 * @return */ public static int sendMsg(String info) { //校验 if(StringUtils.isEmpty(info) ){ return CLS_Easy7_Error.ERROR_PARAM; } try { log.debug("CLS_RabbitMQClientSend sendMsg:" + info.getBytes()); channel.basicPublish("bms_sync_user_auth", "", null, info.getBytes()); return CLS_Easy7_Error.ERROR_OK; } catch (Exception e) { log.error("CLS_RabbitMQClientSend method sendMsg error!"); return CLS_Easy7_Error.ERROR_REQUEST_FAILED; } } //关闭连接 public static void closeConnection(){ try { if (channel != null) { if(channel.isOpen()) { channel.close(); channel = null; } } } catch (Exception e) { e.printStackTrace(); } try { if (connection != null) { if(connection.isOpen()) { connection.close(); connection = null; } } } catch (Exception e) { e.printStackTrace(); } } }
3:RabbitMQ的核心帮助类RabbitMQUtil
package com.tiandy.easy7.its.rabbitmq; import javax.annotation.Resource; import com.tiandy.easy7.core.util.Tools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import net.sf.json.JSONObject; import com.rabbitmq.client.ConnectionFactory; import com.tiandy.easy7.core.bo.CLS_BO_SystemInfo; import com.tiandy.easy7.core.po.TabSystemInfo; import com.tiandy.easy7.core.util.MyApplicationContextUtil; import com.tiandy.easy7.core.vo.CLS_VO_Result; import com.tiandy.easy7.face.rabbitmq.CLS_RabbitMQExchangeRecive; import easy7.datatype.CLS_Easy7_Types; import java.net.InetAddress; //MQ工具类 public class CLS_RabbitMQUtil { private static final Logger log = LoggerFactory.getLogger(CLS_RabbitMQUtil.class); @Resource(name="boSystemInfo") private static CLS_BO_SystemInfo boSystemInfo; //构造方法私有 private CLS_RabbitMQUtil(){ } //得到rabbitmq连接工厂 public static final ConnectionFactory getRabbitMQConnectionFactory(){ //平台重启从数据库获取rabbitMq的IP getRabbitMQIP(); //当平台第一次使用时,默认为127.0.0.1 if(null == CLS_Easy7_Types.rabbitmq_host_for_bj || "".equals(CLS_Easy7_Types.rabbitmq_host_for_bj)){ CLS_Easy7_Types.rabbitmq_host_for_bj = "127.0.0.1"; } RabbitMQConnectionFactory.factory.setHost(CLS_Easy7_Types.rabbitmq_host_for_bj); //127.0.0.1 return RabbitMQConnectionFactory.factory; } /** * 获取RabbitMQ IP地址 */ public static void getRabbitMQIP() { ApplicationContext ctx = MyApplicationContextUtil.getContext(); CLS_BO_SystemInfo boSystemInfo = ctx.getBean(CLS_BO_SystemInfo.class); CLS_VO_Result result = null; JSONObject json = null; JSONObject jsonContent = null; JSONObject jsonParam = null; int ret; String rabbitmq_ip = null; try { //LS_Easy7_Types.EASY7_ACTIVEMQ_SID ="activemq-server-config-2017728";//activemqid result = boSystemInfo.getSystemInfo(CLS_Easy7_Types.EASY7_ACTIVEMQ_SID); json = JSONObject.fromObject(result); ret = json.getInt("ret"); if(ret == 0) { String content = json.getString("content"); if(content != null && !"".equals(content)) { jsonContent = JSONObject.fromObject(content); String sParam = jsonContent.getString("sParam"); if(sParam != null && !"".equals(sParam)) { jsonParam = JSONObject.fromObject(sParam); rabbitmq_ip = jsonParam.getString("rabbitmqIp"); } } } } catch (Exception e1) { e1.printStackTrace(); log.error("rabbitmq id is not exist!"); return; } if(jsonParam != null) { jsonParam.clear(); jsonParam = null; } if(jsonContent != null) { jsonContent.clear(); jsonContent = null; } if(json != null) { json.clear(); json = null; } if(result == null || rabbitmq_ip == null || "".equals(rabbitmq_ip)) { log.error("rabbitmq ip is not exist!"); return; } "127.0.0.1" = rabbitmq_ip; //获取本地服务器IP,作为接受下级权限申请的routinfKey try{ if(null == CLS_Easy7_Types.LOCALHOST_IP || "".equals(CLS_Easy7_Types.LOCALHOST_IP)){ CLS_Easy7_Types.LOCALHOST_IP = Tools.getLinuxLocalIp(); } }catch (Exception e){ log.error("get LinuxIp error" + e); } } //rabbitmq连接工厂单例 private static class RabbitMQConnectionFactory{ private static ConnectionFactory factory = new ConnectionFactory(); static{ try { //可以通过properties配置文件配置 factory.setPort(5673);//MQ端口 factory.setUsername(admin);//MQ用户名 factory.setPassword(123456);//MQ密码 factory.setRequestedHeartbeat(10);//设置心跳 (秒) factory.setAutomaticRecoveryEnabled(true);//自动恢复连接 } catch (Exception e) { e.printStackTrace(); } } } }
4:获取需要发送MQ的消息
public CLS_VO_Ignite_Message deleteHostCamera(){
//定义一个空的list模拟数据,具体实际业务中里边肯定是有数据的(需要发现车辆的数据消息) List<CLS_VO_Ignite_Camera> cameralist = new ArrayList<CLS_VO_Ignite_Camera>(); //定义拼接向Rabbit发送的消息体 (json格式的数据) CLS_VO_Ignite_Message message = new CLS_VO_Ignite_Message(CLS_Easy7_Types.MESSAGE_SOURCE_BMS,CLS_Easy7_Types.MESSAGE_TYPE_CAMERA, CLS_Easy7_Types.MESSAGE_OPERATE_DEL,null,null,null,cameralist,null); return message; }
5:发送消息
public void sendRabbitMQMessage(){ //如发送的消息:
//{"id":"f6560157-24ae-475b-9736-34072684b008","reviewInfo":"","status":1,"ids":[],"currentUserId":"admin","reviewUser":"admin"}
CLS_VO_Ignite_Message message =deleteHostCamera(); //消息为空时不发送消息 if(null != message.getSource() && !"".equals(message.getSource())){ int sendResult = CLS_RabbitMQClientSend.sendMsg(JSONObject.fromObject(message).toString()); //同步消息失败,错误代码使用-40,数据库操作回滚 if(sendResult != 0){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); //事务回滚 result.setRet( -40);//请求失败 return result; } } }
其他应用或平台接收到MQ中的消息后,解析JSON数据,转换为对于的数据实体,将数据添加到相应的库中表中即可,这样就完成了消息的发送和数据的同步!
至此,发现消息得过程就完成了!
原文:https://www.cnblogs.com/zhaosq/p/13424903.html