<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。
@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
如下我们自定义信息通道EsChannel
/** * 自定义信息通道 * @author 47Gamer * @date 2019/9/26 14:54 */ public interface EsChannel { /** * 缺省发送消息通道名称 */ String ES_DEFAULT_OUTPUT = "es_default_output"; /** * 缺省接收消息通道名称 */ String ES_DEFAULT_INPUT = "es_default_input"; /** * 告警发送消息通道名称 */ String ES_ALARM_OUTPUT = "es_alarm_output"; /** * 告警接收消息通道名称 */ String ES_ALARM_INPUT = "es_alarm_input"; /** * 缺省发送消息通道 * @return channel 返回缺省信息发送通道 */ @Output(ES_DEFAULT_OUTPUT) MessageChannel sendEsDefaultMessage(); /** * 告警发送消息通道 * @return channel 返回告警信息发送通道 */ @Output(ES_ALARM_OUTPUT) MessageChannel sendEsAlarmMessage(); /** * 缺省接收消息通道 * @return channel 返回缺省信息接收通道 */ @Input(ES_DEFAULT_INPUT) MessageChannel recieveEsDefaultMessage(); /** * 告警接收消息通道 * @return channel 返回告警信息接收通道 */ @Input(ES_ALARM_INPUT) MessageChannel recieveEsAlarmMessage(); }
@EnableDiscoveryClient @SpringBootApplication @EnableFeignClients @EnableHystrix @MapperScan(basePackages = "com.es.mapper") @EnableBinding(EsChannel.class) public class EsOnenetApplication { public static void main(String[] args) { SpringApplication.run(EsOnenetApplication.class, args); } }
#============================================================== #spring-cloud-stream-Kafka配置 开始 #============================================================== #是否开启kafka(非spring-cloud-stream配置) spring.kafka.enabled=false #缺省的输入、输出通道 spring.cloud.stream.bindings.es_default_input.destination=es_default_topic spring.cloud.stream.bindings.es_default_input.binder=kafka spring.cloud.stream.bindings.es_default_input.group=es_default_group spring.cloud.stream.bindings.es_default_output.destination=es_default_topic spring.cloud.stream.bindings.es_default_output.binder=kafka #入站消费者的并发性 spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2 #告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义) spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic spring.cloud.stream.bindings.es_alarm_input.binder=kafka spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic spring.cloud.stream.bindings.es_alarm_output.binder=kafka #kafka配置 spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092 spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181 spring.cloud.stream.kafka.binder.requiredAcks=1 #============================================================== #spring-cloud-stream-Kafka配置 结束 #==============================================================
/** * kafka消息发送器 * @author 47Gamer * @date 2019/9/26 17:50 */ @Component public class EsKafkaMessageSender { @Autowired private EsChannel channel; /** * 消息发送到默认通道:缺省通道对应缺省主题 * @param message */ public void sendToDefaultChannel(String message){ channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build()); } /** * 消息发送到告警通道:告警通道对应告警主题 * @param message */ public void sendToAlarmChannel(String message){ channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build()); } }
@EnableBinding(value = EsChannel.class) public class EsStreamListener { /** * 从缺省通道接收消息 * @param message */ @StreamListener(EsChannel.ES_DEFAULT_INPUT) public void receive(Message<String> message){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message); try { Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息"); } /** * 从告警通道接收消息 * @param message */ @StreamListener(EsChannel.ES_ALARM_INPUT) public void receiveAlarm(Message<String> message){ System.out.println("订阅告警消息:" + message); } }
从不同的通道实现消息的订阅。
@ApiOperation(value = "test1", httpMethod = "POST") @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8") public void test1(String message, HttpServletRequest request, HttpServletResponse response) { sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); } @ApiOperation(value = "test", httpMethod = "POST") @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8") public void test2(String message, HttpServletRequest request, HttpServletResponse response) { sender.sendToAlarmChannel(message); }
test1:发送消息的缺省消息通道
test2:发送消息到告警消息通道
如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1
2019-09-30 11:13:14------start--------默认消息... 2019-09-30 11:13:24------end--------默认消息
实例2:
2019-09-30 11:13:14------start--------默认消息:... 2019-09-30 11:13:24------end--------默认消息 2019-09-30 11:13:24------start--------默认消息:... 2019-09-30 11:13:34------end--------默认消息 2019-09-30 11:13:34------start--------默认消息:... 2019-09-30 11:13:44------end--------默认消息
通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。
如果将concurrency修改为2.
日志如下
实例1:
2019-09-30 11:31:13------start--------:... 2019-09-30 11:31:13------start--------默认消息:... 2019-09-30 11:31:23------end--------默认消息 2019-09-30 11:31:23------end--------默认消息 2019-09-30 11:31:23------start--------默认消息:... 2019-09-30 11:31:33------end--------默认消息
实例2:
2019-09-30 11:31:13------start--------默认消息:... 2019-09-30 11:31:23------end--------
从日志可以看出,实例1中实现了两个线程的并发消费。
原文:https://www.cnblogs.com/47Gamer/p/13751035.html