spring:
mqtt:
client:
username: 用户名
password: 密码
serverURIs: tcp://ip:port # 客户端地址,多个使用都好隔开
clientId: client0001 # ${random.value}
keepAliveInterval: 30
connectionTimeout: 30
producer:
defaultQos: 1
defaultRetained: true
defaultTopic: defaultTopicName
consumer:
defaultQos: 1
completionTimeout: 30
consumerTopics: topic1,topic2 # 监听的 topic,多个使用逗号隔开
/* 客户端 */
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
mqttConnectOptions.setConnectionTimeout(connectionTimeout);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory getMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler getMqttProducer() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultRetained(defaultRetained);
messageHandler.setDefaultQos(defaultProducerQos);
return messageHandler;
}
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttSender {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
@RestController
public class TestController {
@Autowired
private MqttSender mqttSender;
@RequestMapping("/send")
private void send(String data){
mqttSender.sendToMqtt(data);
}
}
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer getMqttConsumer() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(defaultConsumerQos);
adapter.setOutputChannel(inboundChannel());
return adapter;
}
@Component
public class MqttConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class);
@Bean
@ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString();
LOGGER.info("[{}]主题接收到消息:{}", topic, message.getPayload().toString());
};
}
}
注意事项
@ServiceActivator 和 @MessagingGateway 中绑定的 Channel 名,需与返回 MessageChannel 的 Bean 的方法名一样:
如发布者绑定的 Channel 名为 outboundChannel,则需要有对应的方法,如下:
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
参考
完整代码:GitHub
原文:https://www.cnblogs.com/victorbu/p/11978107.html