整个会话始于<stream:stream >止于</stream:stream>,其中又主要有三大元素<message/>,<presence/>和<iq/>(Info<Query)。
<Message> public void connect() {
Log.d(LOGTAG, "connect()...");
submitLoginTask();
}
private void submitConnectTask() {
Log.d(LOGTAG, "submitConnectTask()...");
addTask(new ConnectTask());
}
private void submitRegisterTask() {
Log.d(LOGTAG, "submitRegisterTask()...");
submitConnectTask();
addTask(new RegisterTask());
}
private void submitLoginTask() {
Log.d(LOGTAG, "submitLoginTask()...");
submitRegisterTask();
addTask(new LoginTask());
}
//连接服务器
private class ConnectTask implements Runnable {
final XmppManager xmppManager;
private ConnectTask() {
this.xmppManager = XmppManager.this;
}
public void run() {
Log.i(LOGTAG, "ConnectTask.run()...");
if (!xmppManager.isConnected()) {
// Create the configuration for this new connection
ConnectionConfiguration connConfig = new ConnectionConfiguration(
xmppHost, xmppPort);
// connConfig.setSecurityMode(SecurityMode.disabled);
connConfig.setSecurityMode(SecurityMode.required);
connConfig.setSASLAuthenticationEnabled(false);
connConfig.setCompressionEnabled(false);
XMPPConnection connection = new XMPPConnection(connConfig);
xmppManager.setConnection(connection);
try {
// Connect to the server
connection.connect();
Log.i(LOGTAG, "XMPP connected successfully");
// packet provider 这个地方是重点,实现了一个推送数据解析器,把服务器发送的XML文件转化为IQ Packet
ProviderManager.getInstance().addIQProvider("notification",
"androidpn:iq:notification",
new NotificationIQProvider());
} catch (XMPPException e) {
Log.e(LOGTAG, "XMPP connection failed", e);
}
xmppManager.runTask();
} else {
Log.i(LOGTAG, "XMPP connected already");
xmppManager.runTask();
}
}
}
/**
* A runnable task to register a new user onto the server.
*/
private class RegisterTask implements Runnable {
final XmppManager xmppManager;
private RegisterTask() {
xmppManager = XmppManager.this;
}
public void run() {
Log.i(LOGTAG, "RegisterTask.run()...");
if (!xmppManager.isRegistered()) {
final String newUsername = newRandomUUID();
final String newPassword = newRandomUUID();
Registration registration = new Registration();
PacketFilter packetFilter = new AndFilter(new PacketIDFilter(
registration.getPacketID()), new PacketTypeFilter(
IQ.class));
PacketListener packetListener = new PacketListener() {
public void processPacket(Packet packet) {
Log.d("RegisterTask.PacketListener",
"processPacket().....");
Log.d("RegisterTask.PacketListener", "packet="
+ packet.toXML());
if (packet instanceof IQ) {
IQ response = (IQ) packet;
if (response.getType() == IQ.Type.ERROR) {
if (!response.getError().toString().contains(
"409")) {
Log.e(LOGTAG,
"Unknown error while registering XMPP account! "
+ response.getError()
.getCondition());
}
} else if (response.getType() == IQ.Type.RESULT) {
xmppManager.setUsername(newUsername);
xmppManager.setPassword(newPassword);
Log.d(LOGTAG, "username=" + newUsername);
Log.d(LOGTAG, "password=" + newPassword);
Editor editor = sharedPrefs.edit();
editor.putString(Constants.XMPP_USERNAME,
newUsername);
editor.putString(Constants.XMPP_PASSWORD,
newPassword);
editor.commit();
Log
.i(LOGTAG,
"Account registered successfully");
xmppManager.runTask();
}
}
}
};
connection.addPacketListener(packetListener, packetFilter);
registration.setType(IQ.Type.SET);
registration.addAttribute("username", newUsername);
registration.addAttribute("password", newPassword);
connection.sendPacket(registration);
} else {
Log.i(LOGTAG, "Account registered already");
xmppManager.runTask();
}
}
}
/**
* A runnable task to log into the server.
*/
private class LoginTask implements Runnable {
final XmppManager xmppManager;
private LoginTask() {
this.xmppManager = XmppManager.this;
}
public void run() {
Log.i(LOGTAG, "LoginTask.run()...");
if (!xmppManager.isAuthenticated()) {
Log.d(LOGTAG, "username=" + username);
Log.d(LOGTAG, "password=" + password);
try {
xmppManager.getConnection().login(
xmppManager.getUsername(),
xmppManager.getPassword(), XMPP_RESOURCE_NAME);
Log.d(LOGTAG, "Loggedn in successfully");
// connection listener
if (xmppManager.getConnectionListener() != null) {
xmppManager.getConnection().addConnectionListener(
xmppManager.getConnectionListener());
}
// packet filter
PacketFilter packetFilter = new PacketTypeFilter(
NotificationIQ.class);
// packet listener 这里也是重点,将客户端接收到的IQ Packet进行逻辑处理,进行显示等等
PacketListener packetListener = xmppManager
.getNotificationPacketListener();
connection.addPacketListener(packetListener, packetFilter);
xmppManager.runTask();
} catch (XMPPException e) {
Log.e(LOGTAG, "LoginTask.run()... xmpp error");
Log.e(LOGTAG, "Failed to login to xmpp server. Caused by: "
+ e.getMessage());
String INVALID_CREDENTIALS_ERROR_CODE = "401";
String errorMessage = e.getMessage();
if (errorMessage != null
&& errorMessage
.contains(INVALID_CREDENTIALS_ERROR_CODE)) {
xmppManager.reregisterAccount();
return;
}
xmppManager.startReconnectionThread();
} catch (Exception e) {
Log.e(LOGTAG, "LoginTask.run()... other error");
Log.e(LOGTAG, "Failed to login to xmpp server. Caused by: "
+ e.getMessage());
xmppManager.startReconnectionThread();
}
} else {
Log.i(LOGTAG, "Logged in already");
xmppManager.runTask();
}
}
}到这里客户端与服务器的连接已经建立,后面等待接收推送消息的过程可以解析分析。public class NotificationIQ extends IQ {
@Override
public String getChildElementXML() {
StringBuilder buf = new StringBuilder();
buf.append("<").append("notification").append(" xmlns=\"").append(
"androidpn:iq:notification").append("\">");
if (id != null) {
buf.append("<id>").append(id).append("</id>");
}
buf.append("</").append("notification").append("> ");
return buf.toString();
}
}
public class NotificationPacketListener implements PacketListener {
@Override
public void processPacket(Packet packet) {
if (packet instanceof NotificationIQ) {
NotificationIQ notification = (NotificationIQ) packet;
if (notification.getChildElementXML().contains("androidpn:iq:notification")) {
String notificationId = notification.getId();
String notificationApiKey = notification.getApiKey();
String notificationTitle = notification.getTitle();
String notificationMessage = notification.getMessage();
String notificationUri = notification.getUri();
Intent intent = new Intent(Constants.ACTION_SHOW_NOTIFICATION);
intent.putExtra(Constants.NOTIFICATION_ID, notificationId);
intent.putExtra(Constants.NOTIFICATION_API_KEY, notificationApiKey);
intent.putExtra(Constants.NOTIFICATION_TITLE, notificationTitle);
intent.putExtra(Constants.NOTIFICATION_MESSAGE, notificationMessage);
intent.putExtra(Constants.NOTIFICATION_URI, notificationUri);
xmppManager.getContext().sendBroadcast(intent);
}
}
}
}
public class NotificationIQProvider implements IQProvider {
@Override
public IQ parseIQ(XmlPullParser parser) throws Exception {
NotificationIQ notification = new NotificationIQ();
for (boolean done = false; !done;) {
int eventType = parser.next();
if (eventType == 2) {
if ("id".equals(parser.getName())) {
notification.setId(parser.nextText());
}
if ("apiKey".equals(parser.getName())) {
notification.setApiKey(parser.nextText());
}
if ("title".equals(parser.getName())) {
notification.setTitle(parser.nextText());
}
if ("message".equals(parser.getName())) {
notification.setMessage(parser.nextText());
}
if ("uri".equals(parser.getName())) {
notification.setUri(parser.nextText());
}
} else if (eventType == 3
&& "notification".equals(parser.getName())) {
done = true;
}
}
return notification;
}
} Smack使用起来还是很简单的,它把固定请求到XML文本转化的过程都封装好了,并且提供了一组接口用于插入自定义的数据内容的转化过程,将协议内容的实现和业务逻辑进行封装隔离。http://guangboo.org/2013/01/30/xmpp-introduction
实际上,其他推送系统(包括GCM、XMPP方案)的原理都与此类似。
下面自己搭建基于此方案的推送测试框架。
运行后可以看界面,注意一下DeviceId为9774d56d682e549c,在启动服务之后(实际就是进行了MQTT连接)就在服务的界面上看到了这样一个Topic,点击send就可以直接给客户端发送消息了,看客户端的日志已经收到了。
// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic) throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + brokerHostName + "@" + MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/" + mPrefs.getString(PREF_DEVICE_ID, "");
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);
// Subscribe to an initial topic, which is combination of client ID and device ID.
initTopic = MQTT_CLIENT_ID + "/" + initTopic;
subscribeToTopic(initTopic);
log("Connection established to " + brokerHostName + " on topic " + initTopic);
// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
}
/*
* Send a request to the message broker to be sent messages published with
* the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName) throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
log("Connection error" + "No connection");
} else {
String[] topics = { topicName };
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}
// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager)getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}
private synchronized void keepAlive() {
try {
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
log("MqttException: " + (e.getMessage() != null? e.getMessage(): "NULL"), e);
mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}
public void sendKeepAlive() throws MqttException {
log("Sending keep alive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive", mPrefs.getString(PREF_DEVICE_ID, ""));
}
/*
* Sends a message to the message broker, requesting that it be published
* to the specified topic.
*/
private void publishToTopic(String topicName, String message) throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
// a connection
log("No connection to public to");
} else {
mqttClient.publish(topicName,
message.getBytes(),
MQTT_QUALITY_OF_SERVICE,
MQTT_RETAINED_PUBLISH);
}
}
// Disconnect
public void disconnect() {
try {
stopKeepAlives();
mqttClient.disconnect();
} catch (MqttPersistenceException e) {
log("MqttException" + (e.getMessage() != null? e.getMessage():" NULL"), e);
}
}这几个方法就包含了,建立连接,订阅,发送心跳,发布,断开连接等主要操作,这也正是MQTT协议的正常工作模式。tokudu.com/post/50024574938/how-to-implement-push-notifications-for-android
参考:http://blog.csdn.net/aa2650/article/details/17027845
最后就是大公司的行为,比如腾讯,自有协议实现,这里就不讨论了,里面水很深,基本上他们的协议都是基于TCP/UDP来自己实现的。基于SMS这里提一下:工作模式就是当服务器有新内容时,发送一条类似短信的信令给客户端,客户端收到后从服务器中下载新内容,SMS的及时性由通信网络和手机的MODEM模块来保证。
当然咯在Android平台上还有一个课题,就是如何让长连接的进程保持生存的问题,因为遇到内存不足的情况这个进程的生存能力是有质疑的,作为第三方是各有各的招,逮住一切的机会让自己重启来保持运行,作为OEM厂商完全有将其设计为系统进程的杀手锏。
原文:http://blog.csdn.net/hehui1860/article/details/44063957