在?(一)androidpn-server tomcat版源码解析之--项目启动这篇中,已经描述了整个推送服务器的启动过程,并且把握到了消息的入口即XmppIoHandler这个类,今天我将继续往下分析下面的核心代码,主要分为3大块,链接创建,消息的发送,链接关闭。
/** * Invoked from an I/O processor thread when a new connection has been created. */ public void sessionCreated(IoSession session) throws Exception { log.debug("sessionCreated()..."); session.getConfig().setBothIdleTime(IDLE_TIME);; } /** * Invoked when a connection has been opened. */ public void sessionOpened(IoSession session) throws Exception { log.debug("sessionOpened()..."); log.debug("remoteAddress=" + session.getRemoteAddress()); // Create a new XML parser XMLLightweightParser parser = new XMLLightweightParser("UTF-8"); session.setAttribute(XML_PARSER, parser); // Create a new connection Connection connection = new Connection(session); session.setAttribute(CONNECTION, connection); session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName, connection)); } /** * Invoked when a connection is closed. */ public void sessionClosed(IoSession session) throws Exception { log.debug("sessionClosed()..."); Connection connection = (Connection) session.getAttribute(CONNECTION); connection.close(); } /** * Invoked with the related IdleStatus when a connection becomes idle. */ public void sessionIdle(IoSession session, IdleStatus status) throws Exception { log.debug("sessionIdle()..."); Connection connection = (Connection) session.getAttribute(CONNECTION); if (log.isDebugEnabled()) { log.debug("Closing connection that has been idle: " + connection); } connection.close(); } /** * Invoked when any exception is thrown. */ public void exceptionCaught(IoSession session, Throwable cause) throws Exception { log.debug("exceptionCaught()..."); log.error(cause); } /** * Invoked when a message is received. */ public void messageReceived(IoSession session, Object message) throws Exception { log.debug("messageReceived()..."); log.debug("RCVD: " + message); // Get the stanza handler StanzaHandler handler = (StanzaHandler) session .getAttribute(STANZA_HANDLER); // Get the XMPP packet parser int hashCode = Thread.currentThread().hashCode(); XMPPPacketReader parser = parsers.get(hashCode); if (parser == null) { parser = new XMPPPacketReader(); parser.setXPPFactory(factory); parsers.put(hashCode, parser); } // The stanza handler processes the message try { handler.process((String) message, parser); } catch (Exception e) { log.error( "Closing connection due to error while processing message: " + message, e); Connection connection = (Connection) session .getAttribute(CONNECTION); connection.close(); } } /** * Invoked when a message written by IoSession.write(Object) is sent out. */ public void messageSent(IoSession session, Object message) throws Exception { log.debug("messageSent()..."); }
对应的处理类为:XmppIoHandler- >StanzaHandler->PacketRouterIQAuthHandler.java->IQHandler.java
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IQRegisterHandler.java
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IQRosterHandler.java
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? PresenceUpdateHandler.java
XmppIoHandler.java-> ?sessionCreated:
public void sessionCreated(IoSession session) throws Exception { log.debug("sessionCreated()..."); session.getConfig().setBothIdleTime(IDLE_TIME);; }
public void sessionOpened(IoSession session) throws Exception { log.debug("sessionOpened()..."); log.debug("remoteAddress=" + session.getRemoteAddress()); // Create a new XML parser XMLLightweightParser parser = new XMLLightweightParser("UTF-8"); session.setAttribute(XML_PARSER, parser); // Create a new connection Connection connection = new Connection(session); session.setAttribute(CONNECTION, connection); session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName, connection)); }
/** * Constructor. * * @param serverName the server name * @param connection the connection */ public StanzaHandler(String serverName, Connection connection) { this.serverName = serverName; this.connection = connection; this.router = new PacketRouter();//该构造中初始化了三个router实现,分别是 MessageRouter,PresenceRouter,IQRouter,分别处理xmpp的三种消息类型 notificationService = ServiceLocator.getNotificationService(); notificationManager = new NotificationManager(); }
public void messageReceived(IoSession session, Object message) throws Exception { log.debug("messageReceived()..."); log.debug("RCVD: " + message); // Get the stanza handler StanzaHandler handler = (StanzaHandler) session .getAttribute(STANZA_HANDLER); // Get the XMPP packet parser int hashCode = Thread.currentThread().hashCode(); XMPPPacketReader parser = parsers.get(hashCode); if (parser == null) { parser = new XMPPPacketReader(); parser.setXPPFactory(factory); parsers.put(hashCode, parser); } // The stanza handler processes the message try { handler.process((String) message, parser);//###这个方法中,程序会根据xmpp的xml头来判断消息类型,并且传递到对应的Router处理类 } catch (Exception e) { log.error( "Closing connection due to error while processing message: " + message, e); Connection connection = (Connection) session .getAttribute(CONNECTION); connection.close(); } }
IQRoute.java ->?IQRouter
public IQRouter() { sessionManager = SessionManager.getInstance(); iqHandlers.add(new IQAuthHandler()); iqHandlers.add(new IQRegisterHandler()); iqHandlers.add(new IQRosterHandler()); notificationService = ServiceLocator.getNotificationService(); }
/** * Returns a list that contains all authenticated client sessions. * * @return a list that contains all client sessions */ public Collection<ClientSession> getSessions() { return clientSessions.values(); }
/** * Delivers the packet to the associated connection. * * @param packet the packet to deliver */ public void deliver(Packet packet) { if (connection != null && !connection.isClosed()) { connection.deliver(packet); } }
/** * Delivers the packet to this connection (without checking the recipient). * * @param packet the packet to deliver */ public void deliver(Packet packet) { log.debug("SENT: " + packet.toXML()); if (!isClosed()) { IoBuffer buffer = IoBuffer.allocate(4096); buffer.setAutoExpand(true); boolean errorDelivering = false; try { XMLWriter xmlSerializer = new XMLWriter(new IoBufferWriter( buffer, (CharsetEncoder) encoder.get()), new OutputFormat()); xmlSerializer.write(packet.getElement()); xmlSerializer.flush(); buffer.flip(); ioSession.write(buffer);//###通过mina的isSession对象传递 } catch (Exception e) { log.debug("Connection: Error delivering packet" + "\n" + this.toString(), e); errorDelivering = true; } if (errorDelivering) { close(); } else { session.incrementServerPacketCount(); } } }
/** * Invoked when a connection is closed. */ public void sessionClosed(IoSession session) throws Exception { log.debug("sessionClosed()..."); Connection connection = (Connection) session.getAttribute(CONNECTION); connection.close(); }
/** * Closes the session including associated socket connection, * notifing all listeners that the channel is shutting down. */ public void close() { boolean closedSuccessfully = false; synchronized (this) { if (!isClosed()) { try { deliverRawText("</stream:stream>", false); } catch (Exception e) { // Ignore } if (session != null) { session.setStatus(Session.STATUS_CLOSED); } ioSession.close(false); closed = true; closedSuccessfully = true; } } if (closedSuccessfully) { notifyCloseListeners(); } }
?原创文章,转载请声名出处 ?http://spjich.iteye.com/blog/2226149(二)androidpn-server tomcat版源码解析之--push消息处理