整个会话始于<stream:stream >止于</stream:stream>,其中又主要有三大元素<message/>,<presence/>和<iq/>(Info<Query)。
<Message>public void connect() { Log.d(LOGTAG, "connect()..."); submitLoginTask(); } private void submitConnectTask() { Log.d(LOGTAG, "submitConnectTask()..."); addTask(new ConnectTask()); } private void submitRegisterTask() { Log.d(LOGTAG, "submitRegisterTask()..."); submitConnectTask(); addTask(new RegisterTask()); } private void submitLoginTask() { Log.d(LOGTAG, "submitLoginTask()..."); submitRegisterTask(); addTask(new LoginTask()); } //连接服务器 private class ConnectTask implements Runnable { final XmppManager xmppManager; private ConnectTask() { this.xmppManager = XmppManager.this; } public void run() { Log.i(LOGTAG, "ConnectTask.run()..."); if (!xmppManager.isConnected()) { // Create the configuration for this new connection ConnectionConfiguration connConfig = new ConnectionConfiguration( xmppHost, xmppPort); // connConfig.setSecurityMode(SecurityMode.disabled); connConfig.setSecurityMode(SecurityMode.required); connConfig.setSASLAuthenticationEnabled(false); connConfig.setCompressionEnabled(false); XMPPConnection connection = new XMPPConnection(connConfig); xmppManager.setConnection(connection); try { // Connect to the server connection.connect(); Log.i(LOGTAG, "XMPP connected successfully"); // packet provider 这个地方是重点,实现了一个推送数据解析器,把服务器发送的XML文件转化为IQ Packet ProviderManager.getInstance().addIQProvider("notification", "androidpn:iq:notification", new NotificationIQProvider()); } catch (XMPPException e) { Log.e(LOGTAG, "XMPP connection failed", e); } xmppManager.runTask(); } else { Log.i(LOGTAG, "XMPP connected already"); xmppManager.runTask(); } } } /** * A runnable task to register a new user onto the server. */ private class RegisterTask implements Runnable { final XmppManager xmppManager; private RegisterTask() { xmppManager = XmppManager.this; } public void run() { Log.i(LOGTAG, "RegisterTask.run()..."); if (!xmppManager.isRegistered()) { final String newUsername = newRandomUUID(); final String newPassword = newRandomUUID(); Registration registration = new Registration(); PacketFilter packetFilter = new AndFilter(new PacketIDFilter( registration.getPacketID()), new PacketTypeFilter( IQ.class)); PacketListener packetListener = new PacketListener() { public void processPacket(Packet packet) { Log.d("RegisterTask.PacketListener", "processPacket()....."); Log.d("RegisterTask.PacketListener", "packet=" + packet.toXML()); if (packet instanceof IQ) { IQ response = (IQ) packet; if (response.getType() == IQ.Type.ERROR) { if (!response.getError().toString().contains( "409")) { Log.e(LOGTAG, "Unknown error while registering XMPP account! " + response.getError() .getCondition()); } } else if (response.getType() == IQ.Type.RESULT) { xmppManager.setUsername(newUsername); xmppManager.setPassword(newPassword); Log.d(LOGTAG, "username=" + newUsername); Log.d(LOGTAG, "password=" + newPassword); Editor editor = sharedPrefs.edit(); editor.putString(Constants.XMPP_USERNAME, newUsername); editor.putString(Constants.XMPP_PASSWORD, newPassword); editor.commit(); Log .i(LOGTAG, "Account registered successfully"); xmppManager.runTask(); } } } }; connection.addPacketListener(packetListener, packetFilter); registration.setType(IQ.Type.SET); registration.addAttribute("username", newUsername); registration.addAttribute("password", newPassword); connection.sendPacket(registration); } else { Log.i(LOGTAG, "Account registered already"); xmppManager.runTask(); } } } /** * A runnable task to log into the server. */ private class LoginTask implements Runnable { final XmppManager xmppManager; private LoginTask() { this.xmppManager = XmppManager.this; } public void run() { Log.i(LOGTAG, "LoginTask.run()..."); if (!xmppManager.isAuthenticated()) { Log.d(LOGTAG, "username=" + username); Log.d(LOGTAG, "password=" + password); try { xmppManager.getConnection().login( xmppManager.getUsername(), xmppManager.getPassword(), XMPP_RESOURCE_NAME); Log.d(LOGTAG, "Loggedn in successfully"); // connection listener if (xmppManager.getConnectionListener() != null) { xmppManager.getConnection().addConnectionListener( xmppManager.getConnectionListener()); } // packet filter PacketFilter packetFilter = new PacketTypeFilter( NotificationIQ.class); // packet listener 这里也是重点,将客户端接收到的IQ Packet进行逻辑处理,进行显示等等 PacketListener packetListener = xmppManager .getNotificationPacketListener(); connection.addPacketListener(packetListener, packetFilter); xmppManager.runTask(); } catch (XMPPException e) { Log.e(LOGTAG, "LoginTask.run()... xmpp error"); Log.e(LOGTAG, "Failed to login to xmpp server. Caused by: " + e.getMessage()); String INVALID_CREDENTIALS_ERROR_CODE = "401"; String errorMessage = e.getMessage(); if (errorMessage != null && errorMessage .contains(INVALID_CREDENTIALS_ERROR_CODE)) { xmppManager.reregisterAccount(); return; } xmppManager.startReconnectionThread(); } catch (Exception e) { Log.e(LOGTAG, "LoginTask.run()... other error"); Log.e(LOGTAG, "Failed to login to xmpp server. Caused by: " + e.getMessage()); xmppManager.startReconnectionThread(); } } else { Log.i(LOGTAG, "Logged in already"); xmppManager.runTask(); } } }到这里客户端与服务器的连接已经建立,后面等待接收推送消息的过程可以解析分析。
public class NotificationIQ extends IQ { @Override public String getChildElementXML() { StringBuilder buf = new StringBuilder(); buf.append("<").append("notification").append(" xmlns=\"").append( "androidpn:iq:notification").append("\">"); if (id != null) { buf.append("<id>").append(id).append("</id>"); } buf.append("</").append("notification").append("> "); return buf.toString(); } } public class NotificationPacketListener implements PacketListener { @Override public void processPacket(Packet packet) { if (packet instanceof NotificationIQ) { NotificationIQ notification = (NotificationIQ) packet; if (notification.getChildElementXML().contains("androidpn:iq:notification")) { String notificationId = notification.getId(); String notificationApiKey = notification.getApiKey(); String notificationTitle = notification.getTitle(); String notificationMessage = notification.getMessage(); String notificationUri = notification.getUri(); Intent intent = new Intent(Constants.ACTION_SHOW_NOTIFICATION); intent.putExtra(Constants.NOTIFICATION_ID, notificationId); intent.putExtra(Constants.NOTIFICATION_API_KEY, notificationApiKey); intent.putExtra(Constants.NOTIFICATION_TITLE, notificationTitle); intent.putExtra(Constants.NOTIFICATION_MESSAGE, notificationMessage); intent.putExtra(Constants.NOTIFICATION_URI, notificationUri); xmppManager.getContext().sendBroadcast(intent); } } } } public class NotificationIQProvider implements IQProvider { @Override public IQ parseIQ(XmlPullParser parser) throws Exception { NotificationIQ notification = new NotificationIQ(); for (boolean done = false; !done;) { int eventType = parser.next(); if (eventType == 2) { if ("id".equals(parser.getName())) { notification.setId(parser.nextText()); } if ("apiKey".equals(parser.getName())) { notification.setApiKey(parser.nextText()); } if ("title".equals(parser.getName())) { notification.setTitle(parser.nextText()); } if ("message".equals(parser.getName())) { notification.setMessage(parser.nextText()); } if ("uri".equals(parser.getName())) { notification.setUri(parser.nextText()); } } else if (eventType == 3 && "notification".equals(parser.getName())) { done = true; } } return notification; } }Smack使用起来还是很简单的,它把固定请求到XML文本转化的过程都封装好了,并且提供了一组接口用于插入自定义的数据内容的转化过程,将协议内容的实现和业务逻辑进行封装隔离。
http://guangboo.org/2013/01/30/xmpp-introduction
实际上,其他推送系统(包括GCM、XMPP方案)的原理都与此类似。
下面自己搭建基于此方案的推送测试框架。
运行后可以看界面,注意一下DeviceId为9774d56d682e549c,在启动服务之后(实际就是进行了MQTT连接)就在服务的界面上看到了这样一个Topic,点击send就可以直接给客户端发送消息了,看客户端的日志已经收到了。
// Creates a new connection given the broker address and initial topic public MQTTConnection(String brokerHostName, String initTopic) throws MqttException { // Create connection spec String mqttConnSpec = "tcp://" + brokerHostName + "@" + MQTT_BROKER_PORT_NUM; // Create the client and connect mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE); String clientID = MQTT_CLIENT_ID + "/" + mPrefs.getString(PREF_DEVICE_ID, ""); mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE); // register this client app has being able to receive messages mqttClient.registerSimpleHandler(this); // Subscribe to an initial topic, which is combination of client ID and device ID. initTopic = MQTT_CLIENT_ID + "/" + initTopic; subscribeToTopic(initTopic); log("Connection established to " + brokerHostName + " on topic " + initTopic); // Save start time mStartTime = System.currentTimeMillis(); // Star the keep-alives startKeepAlives(); } /* * Send a request to the message broker to be sent messages published with * the specified topic name. Wildcards are allowed. */ private void subscribeToTopic(String topicName) throws MqttException { if ((mqttClient == null) || (mqttClient.isConnected() == false)) { // quick sanity check - don't try and subscribe if we don't have // a connection log("Connection error" + "No connection"); } else { String[] topics = { topicName }; mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE); } } // Schedule application level keep-alives using the AlarmManager private void startKeepAlives() { Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager)getSystemService(ALARM_SERVICE); alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + KEEP_ALIVE_INTERVAL, KEEP_ALIVE_INTERVAL, pi); } private synchronized void keepAlive() { try { // Send a keep alive, if there is a connection. if (mStarted == true && mConnection != null) { mConnection.sendKeepAlive(); } } catch (MqttException e) { log("MqttException: " + (e.getMessage() != null? e.getMessage(): "NULL"), e); mConnection.disconnect(); mConnection = null; cancelReconnect(); } } public void sendKeepAlive() throws MqttException { log("Sending keep alive"); // publish to a keep-alive topic publishToTopic(MQTT_CLIENT_ID + "/keepalive", mPrefs.getString(PREF_DEVICE_ID, "")); } /* * Sends a message to the message broker, requesting that it be published * to the specified topic. */ private void publishToTopic(String topicName, String message) throws MqttException { if ((mqttClient == null) || (mqttClient.isConnected() == false)) { // quick sanity check - don't try and publish if we don't have // a connection log("No connection to public to"); } else { mqttClient.publish(topicName, message.getBytes(), MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH); } } // Disconnect public void disconnect() { try { stopKeepAlives(); mqttClient.disconnect(); } catch (MqttPersistenceException e) { log("MqttException" + (e.getMessage() != null? e.getMessage():" NULL"), e); } }这几个方法就包含了,建立连接,订阅,发送心跳,发布,断开连接等主要操作,这也正是MQTT协议的正常工作模式。
tokudu.com/post/50024574938/how-to-implement-push-notifications-for-android
参考:http://blog.csdn.net/aa2650/article/details/17027845
最后就是大公司的行为,比如腾讯,自有协议实现,这里就不讨论了,里面水很深,基本上他们的协议都是基于TCP/UDP来自己实现的。基于SMS这里提一下:工作模式就是当服务器有新内容时,发送一条类似短信的信令给客户端,客户端收到后从服务器中下载新内容,SMS的及时性由通信网络和手机的MODEM模块来保证。
当然咯在Android平台上还有一个课题,就是如何让长连接的进程保持生存的问题,因为遇到内存不足的情况这个进程的生存能力是有质疑的,作为第三方是各有各的招,逮住一切的机会让自己重启来保持运行,作为OEM厂商完全有将其设计为系统进程的杀手锏。
原文:http://blog.csdn.net/hehui1860/article/details/44063957