当连接向一个mqtt服务器时,clientId必须是唯一的。设置一样,导致client.setCallback总是走到 connectionLost回调。报connection reset。调查一天才发现是clientid重复导致。
client = new MqttAsyncClient(serverURIString, "client-id");
clientId是用来保存会话信息。
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
当服务器将message发布向所有订阅过的客户端,就会清除这个message。如果当前客户端不在线,等它连接时发送。
这样的,比如你一个板子,作为客户端,发起mqtt的连接请求connect到mqtt服务器,比如说就是emqtt服务吧,emqtt服务端收到这个板子的连接请求之后,在tcp层上会和板子建立一个tcp的连接,在emqtt内部,会产生一个进程,和这个板子做数据通讯,同时还会产生一个进程,叫session,这个sessoin是专门管理这个板子订阅的主题,其它板子如果发布了这个板子感兴趣的主题的时候,也会发到这个板子对应的这个session里面,如果这个session收到订阅的主题之后,发现对用的client还活着,就通过这个client把数据经过tcp发到这个板子上,如果发现client已经没有了,就是说板子和服务端断掉了,那么session就会把收到的订阅的主题,先保存在session里面,下次板子连接上了,而且cleansession=false,那么这个session就不会清除,在这次连接时,就会把以前收到的订阅消息,发给板子,大概就是这个意思。
参考:
http://www.blogjava.net/yongboy/archive/2014/02/15/409893.html
http://www.cnblogs.com/znlgis/p/4930990.html
http://blog.csdn.net/ljf10010/article/details/51424506
paho客户端示例
https://github.com/eclipse/paho.mqtt.java/tree/master/org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test
http://www.eclipse.org/paho/files/javadoc/index.html api文档
ibm客户端paho示例:
http://www.programcreek.com/java-api-examples/index.php?source_dir=streamsx.messaging-master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttAsyncClientWrapper.java
1 package com.xxx.mqtt; 2 3 import java.net.URI; 4 5 import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; 6 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 7 import org.eclipse.paho.client.mqttv3.IMqttToken; 8 import org.eclipse.paho.client.mqttv3.MqttAsyncClient; 9 import org.eclipse.paho.client.mqttv3.MqttCallback; 10 import org.eclipse.paho.client.mqttv3.MqttClient; 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 12 import org.eclipse.paho.client.mqttv3.MqttException; 13 import org.eclipse.paho.client.mqttv3.MqttMessage; 14 import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 15 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 16 17 18 public class MyMqttClient implements MqttCallback { 19 20 private static final MemoryPersistence DATA_STORE = new MemoryPersistence(); 21 private static final String topic = "mytopic"; 22 23 private String HOST = "127.0.0.1"; 24 private int PORT = 1883; 25 private String USERNAME = "user"; 26 private String PASSWORD = "password"; 27 private String serverURIString = "tcp://" + HOST + ":" + PORT; 28 29 String clientId = "client-1"; 30 31 MqttAsyncClient client; 32 // Tokens 33 IMqttToken connectToken; 34 IMqttDeliveryToken pubToken; 35 36 37 public static void main(String[] args) { 38 MyMqttClient app = new MyMqttClient(); 39 app.asyncClient(); 40 try { 41 Thread.sleep(20000); 42 app.disconnect(); 43 } catch (Exception e) { 44 e.printStackTrace(); 45 } 46 System.out.println("end"); 47 } 48 49 public void blockingClient() { 50 51 try { 52 MqttClient sampleClient = new MqttClient(serverURIString, clientId); 53 MqttConnectOptions connOpts = new MqttConnectOptions(); 54 connOpts.setCleanSession(true); 55 connOpts.setUserName(USERNAME); 56 connOpts.setPassword(PASSWORD.toCharArray()); 57 System.out.println("Connecting to broker: " + serverURIString); 58 sampleClient.connect(connOpts); 59 sampleClient.subscribe("#", 1); 60 System.out.println("Connected"); 61 // System.out.println("Publish message: " + content); 62 // MqttMessage message = new MqttMessage(content.getBytes()); 63 // message.setQos(qos); 64 sampleClient.setCallback(this); 65 // sampleClient.publish(topic, message); 66 // System.out.println("Message published"); 67 try { 68 Thread.sleep(10000000); 69 System.out.println("Disconnected"); 70 sampleClient.disconnect(); 71 } catch (Exception e) { 72 e.printStackTrace(); 73 } 74 75 } catch (MqttException me) { 76 System.out.println("reason " + me.getReasonCode()); 77 System.out.println("msg " + me.getMessage()); 78 System.out.println("loc " + me.getLocalizedMessage()); 79 System.out.println("cause " + me.getCause()); 80 System.out.println("except " + me); 81 me.printStackTrace(); 82 } 83 } 84 85 public void asyncClient() { 86 info(" MQTT init start."); 87 88 // Tokens 89 IMqttToken connectToken; 90 IMqttDeliveryToken pubToken; 91 92 // Client Options 93 MqttConnectOptions options = new MqttConnectOptions(); 94 options.setCleanSession(false); 95 options.setAutomaticReconnect(true); 96 97 options.setUserName(USERNAME); 98 options.setPassword(PASSWORD.toCharArray()); 99 100 try { 101 client = new MqttAsyncClient(serverURIString, clientId); 102 103 DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions(); 104 disconnectedOpts.setBufferEnabled(true); 105 client.setBufferOpts(disconnectedOpts); 106 107 connectToken = client.connect(options); 108 connectToken.waitForCompletion();//异步变成了同步。可以用IMqttCallbackListen..在connect时候设置回调。 109 boolean isConnected = client.isConnected(); 110 info("Connection isConnected: " + isConnected); 111 112 if (connectToken.isComplete() && connectToken.getException() == null && client.isConnected()) { 113 info("[Connect:] Success: "); //$NON-NLS-1$ //$NON-NLS-2$ 114 client.setCallback(this); 115 116 } else { 117 info("[Connect:] faild: "); //$NON-NLS-1$ //$NON-NLS-2$ 118 } 119 120 // MqttTopic topic = client.getTopic(topic); 121 // topic. 122 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 123 // options.setWill(topic, "close".getBytes(), 2, true); 124 125 IMqttToken subToken = client.subscribe("#", 1); 126 127 subToken.waitForCompletion(1000); 128 129 if (subToken.isComplete()) { 130 info("subToken complete."); 131 if (subToken.getException() != null) { 132 info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 133 } 134 } else { 135 info("subToken not complete."); 136 if (subToken.getException() != null) { 137 info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 138 } 139 } 140 141 info("init end"); 142 143 } catch (MqttException e) { 144 // TODO Auto-generated catch block 145 e.printStackTrace(); 146 } 147 148 } 149 150 //String clientId, String topic, String message 151 public void send() { 152 String topic; 153 String message; 154 info("===Send Message start.==="); 155 message = "Hello, boy."; 156 157 158 boolean isConnected = client.isConnected(); 159 if (!isConnected) { 160 //no need. it will auto reconnect and send. 161 } 162 163 // Publish Message 164 try { 165 pubToken = client.publish(topic, new MqttMessage(message.getBytes())); 166 167 info("Publish attempted: isComplete:" + pubToken.isComplete()); 168 169 pubToken.waitForCompletion(); 170 } catch (MqttPersistenceException e) { 171 // TODO Auto-generated catch block 172 e.printStackTrace(); 173 } catch (MqttException e) { 174 // TODO Auto-generated catch block 175 e.printStackTrace(); 176 } 177 178 // Check that Message has been delivered 179 info("Message Delivered: " + pubToken.isComplete()); 180 info("=== send end.===="); 181 } 182 183 void disconnect() { 184 IMqttToken disconnectToken; 185 try { 186 disconnectToken = client.disconnect(); 187 disconnectToken.waitForCompletion(); 188 client.close(); 189 } catch (MqttException e) { 190 // TODO Auto-generated catch block 191 e.printStackTrace(); 192 } 193 client = null; 194 } 195 196 void info(String s) { 197 System.out.println(s); 198 } 199 200 public void connectionLost(Throwable thrwbl) { 201 // TODO Auto-generated method stub 202 info("connectionLost"); 203 204 info("MQTT is disconnected from topic: {}. Message: {}. Cause: {}" + topic + thrwbl.getMessage() + thrwbl.getCause().getMessage()); 205 thrwbl.printStackTrace(); 206 207 } 208 209 public void deliveryComplete(IMqttDeliveryToken arg0) { 210 // TODO Auto-generated method stub 211 info("deliveryComplete"); 212 213 } 214 215 public void messageArrived(String arg0, MqttMessage arg1) throws Exception { 216 // TODO Auto-generated method stub 217 String message = new String(arg1.getPayload()); 218 String topic = arg0; 219 220 info("xxx Receive : topic=" + topic + "; message=" + message); 221 222 } 223 }
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>com.italktv.mqtt.client</groupId> 6 <artifactId>mqttclient</artifactId> 7 <version>0.0.1-SNAPSHOT</version> 8 <packaging>jar</packaging> 9 10 <name>mqttclient</name> 11 <url>http://maven.apache.org</url> 12 13 <properties> 14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 15 </properties> 16 17 <dependencies> 18 <dependency> 19 <groupId>junit</groupId> 20 <artifactId>junit</artifactId> 21 <version>3.8.1</version> 22 <scope>test</scope> 23 </dependency> 24 25 26 <dependency> 27 <groupId>org.eclipse.paho</groupId> 28 <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 29 <version>1.1.0</version> 30 </dependency> 31 32 </dependencies> 33 </project>
上面是maven管理项目的pom.xml
原文:http://www.cnblogs.com/bigben0123/p/5985562.html