继续上一篇,这篇主要讲通过mina往B端发送消息,并接受消息,mina是一个网络通信框架,封装了javaNIO,简单易用,网上有很多关于他的介绍,在此不赘述了。
如上篇所介绍,完成功能,需要五个类:
PoolListener:监听,用来在系统启动的时候创建连接;
SessionPool:连接池;
SendHandler:处理类;
CharsetEncoder:编码;
CharsetDecoder:解码:
B为我们提供了6个端口,每个端口可建立3个长连接,因此,在系统时,就要创建长连接,下面是一个监听类:
import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; /** * 初始化连接 * @author yuanfubiao * */ public class PoolListener implements ServletContextListener { @Override public void contextDestroyed(ServletContextEvent sce) { } @Override public void contextInitialized(ServletContextEvent sce) { String nds_ip = sce.getServletContext().getInitParameter("nds_ip"); String nds_ports = sce.getServletContext().getInitParameter("nds_ports"); SessionPool pool = new SessionPool(); try { pool.init(nds_ip, nds_ports); } catch (Exception e) { e.printStackTrace(); } } }
下面是监听配置,是配置在web.xml中:
<display-name>Apache-Axis2</display-name> <context-param> <param-name>nds_ip</param-name> <param-value>XX.XXX.XXX.XXX</param-value> </context-param> <context-param> <param-name>nds_ports</param-name> <param-value>12210,12211,12212,12213,12214,12215</param-value> </context-param> <listener> <listener-class>cn.net.easyway.nds.PoolListener</listener-class> </listener>
下面是自己维护的一个连接池,同样使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码如下:
import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class SessionPool { private static Log logger = LogFactory.getLog(SessionPool.class); private static int connNum = 0; private static String ip = null; private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>(); private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>(); /** * 初始化:读取配置文件,创建长连接 * @throws Exception */ public void init(String nds_ip,String nds_ports) throws Exception{ String[] ports = nds_ports.split(","); ip = nds_ip; for(int i=0;i<ports.length;i++){ int port = Integer.parseInt(ports[i]); ConnectFuture future = null; for(int j=0;j<3;j++){ String connNum = this.getConnNums(); logger.info("创建连接号---->>>>>" + connNum); connNumPorts.put(connNum, port); future = SessionPool.createConnect(ip, port); if(future.isConnected()){ logger.info("创建连接------->" + future.getSession()); pool.put(connNum, future.getSession()); }else{ logger.error("连接创建错误,请检查IP和端口配置!" + future); } } } } /** * 获取一个连接 * @param num * @return */ public static IoSession getSession(String strNum){ logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum)); IoSession session = pool.get(strNum); if(null == session || !session.isClosing()){ ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum)); if(!newConn.isConnected()){ newConn = createConnect(ip,connNumPorts.get(strNum)); } session = newConn.getSession(); pool.replace(strNum, session); } return session; } /** * 创建连接 * @param ip * @param port * @return */ private static ConnectFuture createConnect(String strIp,int intPort){ IoConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast("codec" ,new ProtocolCodecFilter(new CharsetCodecFactory())); connector.setHandler(new SendHandler()); ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort)); connector.getSessionConfig().setReadBufferSize(128); future.awaitUninterruptibly(); return future; } /** * 生成连接序列号 * @return */ private synchronized String getConnNums(){ if(18 == connNum){ connNum = 0; } connNum++; return String.format("%02x", connNum); } }
因此,在项目启动的时候就会有18个连接自动创建,并放在pool中等待我们的使用。下面是业务处理类,需要继承IoHandlerAdapter类,并且实现以下几个方法:
import nds.framework.security.NDSMD5; import org.apache.commons.codec.binary.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import cm.custom.service.reception.RecResponse; import cm.custom.service.reception.ReceptionResponseServiceStub; /** * 业务处理 * @author yuanfubiao * */ public class SendHandler extends IoHandlerAdapter { private static Log logger = LogFactory.getLog(SendHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("连接出错", cause); } @Override /** * 设置空闲时间 */ public void sessionCreated(IoSession session) throws Exception { session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60); } /** * 接受到消息后,通过WS发送给用户管理系统 */ @Override public void messageReceived(IoSession session, Object message) throws Exception { String result = message.toString().trim(); String temp = result.substring(0, result.length()-16).trim(); logger.info("接受到的数据:" + result); //验证签名 String signature = null; String securityKey = "12345678"; try { byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey); signature = new String(Hex.encodeHex(binSignature)); } catch (Exception e) { e.printStackTrace(); } String packet = temp + signature.toUpperCase().trim(); if(!result.equalsIgnoreCase(packet)){ logger.error("数字签名不正确!错误指令:" + result); return; } logger.info("接受到的数据:" + packet); RecResponse res = new RecResponse(); res.setResponse(temp); ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub(); stub.recResponse(res); } /** * 连接空闲时,发送心跳包 */ @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { if(status == IdleStatus.BOTH_IDLE){ session.write("heartbeat"); } } }
一般我们在写socket程序时,用阻塞的方式读取消息,一般是根据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完成,在mina中,有默认的编解码方式,但也可以自定义,比如以长度来判断一条消息是否读取完毕:
编码
import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; /** * 编码 * @author yuanfubiao * */ public class CharsetEncoder extends ProtocolEncoderAdapter{ private final static Charset charset = Charset.forName("utf-8"); @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true); buff.putString(message.toString(), charset.newEncoder()); buff.flip(); out.write(buff); } }
解码
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; /** * 解码 * @author yuanfubiao * */ public class CharsetDecoder extends CumulativeProtocolDecoder{ private static Log logger = LogFactory.getLog(CharsetDecoder.class); @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if(in.remaining() >= 9){ //心跳为最小传输长度 byte[] headBytes = new byte[in.limit()]; logger.info("接收到消息" + headBytes.toString()); in.get(headBytes, 0, 9); String head = new String(headBytes).trim(); if("heartbeat".equalsIgnoreCase(head)){ return true; } int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9; if(in.remaining() == lenPack){ //验证消息长度 byte[] bodyBytes = new byte[in.limit()]; in.get(bodyBytes,0,lenPack); String body = new String(bodyBytes); out.write(head.trim()+body.trim()); return true; } in.flip(); return false; } return false; } }
原文:http://blog.csdn.net/stubbornpotatoes/article/details/27845541