一、Caching
Spring框架支持向应用程序透明地添加缓存。抽象的核心是将缓存应用于方法,从而减少了基于缓存中可用信息的执行次数。缓存逻辑是透明地应用的,对调用程序没有任何干扰。只要通过@EnableCaching注释启用了缓存支持,Spring Boot就会自动配置缓存基础设施。
有关更多细节,请参阅Spring框架参考的相关部分。https://docs.spring.io/spring/docs/5.2.2.RELEASE/spring-framework-reference/integration.html#cache
简而言之,将缓存添加到服务的操作就像将相关注释添加到其方法一样容易,如以下示例所示:
import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Component; @Component public class MathService { @Cacheable("piDecimals") public int computePiDecimal(int i) { // ... } }
这个示例演示了在一个可能开销很大的操作上使用缓存。在调用computePiDecimal之前,该抽象将在piDecimals缓存中查找与i参数匹配的条目。如果找到一个条目,缓存中的内容将立即返回给调用者,而不会调用该方法。否则,将调用方法,并在返回值之前更新缓存。
注:您还可以透明地使用标准JSR-107 (JCache)注释(比如@CacheResult)。但是,我们强烈建议您不要混合使用Spring缓存和JCache注释。
如果您没有添加任何特定的缓存库,Spring Boot将自动配置在内存中使用并发映射的简单提供程序。当需要缓存时(如前面示例中的piDecimals),此提供程序将为您创建缓存。简单提供程序实际上并不推荐用于生产环境,但是它对于入门和确保您了解特性非常有用。当您决定使用缓存提供程序时,请务必阅读它的文档,以确定如何配置应用程序使用的缓存。几乎所有提供程序都要求您显式地配置应用程序中使用的每个缓存。有些提供了定制spring.cache.cache-names属性定义的默认缓存的方法。
也可以从缓存中透明地更新或删除数据。https://docs.spring.io/spring/docs/5.2.2.RELEASE/spring-framework-reference/integration.html#cache-annotations-put
1.1支持缓存提供者
缓存抽象并不提供实际的存储,而是依赖于org.springframe.cache.Cache和org.springframework.cache.CacheManager接口。
如果您还没有定义CacheManager类型的bean或名为CacheResolver的CacheResolver(请参阅CachingConfigurer), Spring Boot将尝试检测以下提供程序(按照指定的顺序):
JCache (JSR-107) (EhCache 3, Hazelcast, Infinispan, and others)
如果CacheManager是通过Spring Boot自动配置的,那么您可以通过公开实现CacheManagerCustomizer接口的bean来进一步调优它的配置,然后才能完全初始化它。下面的例子设置了一个标志,说明null值应该传递给底层映射:
@Bean public CacheManagerCustomizer<ConcurrentMapCacheManager> cacheManagerCustomizer() { return new CacheManagerCustomizer<ConcurrentMapCacheManager>() { @Override public void customize(ConcurrentMapCacheManager cacheManager) { cacheManager.setAllowNullValues(false); } }; }
在前面的示例中,需要一个自动配置的ConcurrentMapCacheManager。如果不是这样(要么您提供了自己的配置,要么自动配置了不同的缓存提供程序),则根本不会调用customizer。您可以拥有任意数量的定制器,还可以使用@Order或Ordered对它们进行排序。
1.1通用类
如果上下文定义了至少一个org.springframe .cache.Cache,则使用通用缓存。创建一个包装该类型所有bean的CacheManager。
1.2JCache(JSR-107)
通过存在类路径下的一个javax.cache.spi.CachingProvider来引导JCache。(即类路径上存在一个兼容JSR-107的缓存库),而JCacheCacheManager是由spring-boot-starter-cache“Starter”提供的。有各种兼容的库可用,Spring Boot为Ehcache 3、Hazelcast和Infinispan提供了依赖管理。也可以添加任何其他兼容的库。
能存在多个提供程序,在这种情况下,必须显式地指定提供程序。即使JSR-107标准没有强制采用标准化的方式来定义配置文件的位置,Spring Boot也会尽其所能来设置带有实现细节的缓存,如下例所示:
# Only necessary if more than one provider is present spring.cache.jcache.provider=com.acme.MyCachingProvider spring.cache.jcache.config=classpath:acme.xml
注:当缓存库同时提供本地实现和JSR-107支持时,Spring Boot更喜欢JSR-107支持,因此如果切换到不同的JSR-107实现,也可以使用相同的特性。
有两种方法可以定制底层的javax.cache.cacheManager:
如果是标准的javax.cache.CacheManager bean被定义,它被自动包装在org.springframe.cache.CacheManager的实现。没有进一步的定制应用到它。
1.3EjCache 2.x
如果可以在类路径的根目录中找到一个名为ehcache.xml的文件,则EhCache 2.x被使用。如果EhCache 2.x被发现,则使用spring-boot-starter-cache“Starter”提供的EhCacheCacheManager来引导缓存管理器。也可以提供一个备用的配置文件,如下例所示:
spring.cache.ehcache.config=classpath:config/another-config.xml
1.4Hazelcast
Spring Boot has general support for Hazelcast.。如果HazelcastInstance已经被自动配置,它将被自动包装在CacheManager中。
1.5Infinispan
Infinispan没有默认的配置文件位置,因此必须显式地指定它。否则,将使用默认的引导程序。
spring.cache.infinispan.config=infinispan.xml
通过设置spring.cache.cache-names属性,缓存可以在启动时创建缓存。如果定义了自定义ConfigurationBuilder bean,则使用它自定义缓存。
在Spring Boot中对Infinispan的支持仅限于嵌入式模式,非常基础。如果你想要更多的选项,你应该使用官方的Infinispan Spring Boot starter。更多细节见Infinispan的文档。
1.6Couchbase
如果Couchbase Java客户机和Couchbase -spring-cache实现可用,并且配置了Couchbase,则自动配置CouchbaseCacheManager。还可以通过设置spring.cache.cache-names属性在启动时创建额外的缓存。这些缓存在自动配置的Bucket上进行操作。您还可以使用customizer在另一个Bucket上创建额外的缓存。假设在“主”桶上需要两个缓存(cache1和cache2),在“另一个”桶上需要一个缓存(cache3),自定义时间为2秒。您可以通过配置创建前两个缓存,如下所示:
spring.cache.cache-names=cache1,cache2
然后可以定义一个@Configuration类来配置额外的Bucket和cache3缓存,如下所示:
@Configuration(proxyBeanMethods = false) public class CouchbaseCacheConfiguration { private final Cluster cluster; public CouchbaseCacheConfiguration(Cluster cluster) { this.cluster = cluster; } @Bean public Bucket anotherBucket() { return this.cluster.openBucket("another", "secret"); } @Bean public CacheManagerCustomizer<CouchbaseCacheManager> cacheManagerCustomizer() { return c -> { c.prepareCache("cache3", CacheBuilder.newInstance(anotherBucket()) .withExpiration(2)); }; } }
此示例配置重用通过自动配置创建的集群。
1.7Redis
如果Redis可用并已配置,则自动配置RedisCacheManager。通过设置spring.cache.cache-names属性,可以在启动时创建额外的缓存。可以使用spring.cache.redis.*配置缓存默认值。例如,下面的配置创建了cache1和cache2缓存,缓存的存活时间为10分钟:
spring.cache.cache-names=cache1,cache2
spring.cache.redis.time-to-live=600000
默认情况下,添加一个键前缀,这样,如果两个单独的缓存使用相同的键,那么Redis没有重叠的键,并且不能返回无效的值。如果您创建自己的RedisCacheManager,我们强烈建议保持启用此设置。
您可以通过添加自己的RedisCacheConfiguration @Bean来完全控制配置。如果您正在寻找自定义序列化策略,那么这将非常有用。
1.8Caffeine
Caffeine是Java 8对Guava缓存的重写,取代了对Guava的支持。如果有Caffeine,则自动配置Caffeine管理器(由spring-boot-starter-cache“Starter”提供)。通过设置spring.cache.cache-names属性,可以在启动时创建缓存。可以通过以下方式之一(按指定的顺序)进行自定义:
例如,下面的配置创建了最大大小为500、存活时间为10分钟的cache1和cache2缓存
spring.cache.cache-names=cache1,cache2
spring.cache.caffeine.spec=maximumSize=500,expireAfterAccess=600s
如果一个com.github.benmanes.caffeine.cache.CacheLoader bean被定义了,它自动关联到CaffeineCacheManager。因为CacheLoader将与由缓存管理器管理的所有缓存相关联,所以它必须被定义为CacheLoader<Object, Object>。自动配置将忽略任何其他泛型类型。
1.9Simple
如果找不到其他提供程序,则配置一个使用ConcurrentHashMap作为缓存存储的简单实现。如果应用程序中没有缓存库,那么这是默认设置。默认情况下,缓存是根据需要创建的,但是您可以通过设置cache-names属性来限制可用缓存的列表。例如,如果你只想要cache1和cache2缓存,设置cache-names属性如下:
spring.cache.cache-names=cache1,cache2
如果您这样做了,而您的应用程序使用了未列出的缓存,那么当需要缓存时,它会在运行时失败,但在启动时不会失败。这与使用未声明的缓存时“真正的”缓存提供程序的行为类似。
1.10None
当@EnableCaching出现在您的配置中时,也需要一个合适的缓存配置。如果需要在某些环境中完全禁用缓存,则强制将缓存类型设置为none以使用无操作实现,如下面的示例所示:
spring.cache.type=none
二、消息
Spring框架为与消息传递系统的集成提供了广泛的支持,从使用JmsTemplate简化JMS API到使用完整的基础设施异步接收消息。Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身就支持STOMP消息传递,而Spring Boot通过启动器和少量的自动配置来支持这一点。Spring Boot还支持Apache Kafka。
1.JMS
javax.jms.ConnectionFactory接口提供了创建javax.jms.Connection的标准方法来用于与JMS代理交互。尽管Spring需要一个ConnectionFactory来使用JMS,但是您通常不需要直接使用它,而是可以依赖于更高级别的消息传递抽象。(详细信息请参阅Spring框架参考文档的相关部分。)Spring Boot还自动配置发送和接收消息所需的基础设施。
1.1ActiveMQ支持
当ActiveMQ在类路径上可用时,Spring Boot还可以配置ConnectionFactory。如果代理存在,则会自动启动和配置嵌入式代理(假设没有通过配置指定代理URL)。
注:如果使用Spring -boot-starter- ActiveMQ,就会提供连接或嵌入ActiveMQ实例所需的依赖项,以及与JMS集成的Spring基础结构。
ActiveMQ配置由spring.activemq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.activemq.broker-url=tcp://192.168.1.210:9876 spring.activemq.user=admin spring.activemq.password=secret
默认情况下,CachingConnectionFactory会用一些合理的设置来包装本机ConnectionFactory,您可以通过spring.jms.*中的外部配置属性来控制这些设置。
spring.jms.cache.session-cache-size=5
如果希望使用本机池,可以通过向org.messaginghub:pooled-jms 添加一个依赖项来实现,并相应地配置JmsPoolConnectionFactory,如下面的示例所示:
spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=50
有关更多受支持的选项,请参见ActiveMQProperties。您还可以注册实现ActiveMQConnectionFactoryCustomizer的任意数量的bean,以实现更高级的定制。
默认情况下,ActiveMQ创建一个目的地(如果它还不存在),以便根据它们提供的名称解析目的地。
1.2Artemis支持
当检测到类路径上有Artemis可用时,Spring Boot可以自动配置ConnectionFactory。如果存在代理,则会自动启动和配置嵌入式代理(除非已显式设置了mode属性)。受支持的模式是嵌入式的(为了明确说明需要嵌入式代理,并且如果代理在类路径上不可用,则应该出现错误)和本机的(为了使用netty传输协议连接到代理)。当配置了后者后,Spring Boot将配置一个ConnectionFactory,它将使用默认设置连接到本地机器上运行的代理。
如果使用spring-boot-starter-Artemis,就会提供连接现有Artemis实例所需的依赖项,以及与JMS集成的Spring基础设施。添加org.apache.activemq:artemis-jms-server到您的应用程序来允许您使用嵌入式模式。
Artemis配置由spring.artemis.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.artemis.mode=native spring.artemis.host=192.168.1.210 spring.artemis.port=9876 spring.artemis.user=admin spring.artemis.password=secret
在嵌入代理时,您可以选择是否启用持久性并列出应该提供的目的地。可以将它们指定为逗号分隔的列表,以使用默认选项创建它们,也可以定义org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration类型的bean或org.apache.activemq.artemis.jms.server.config.TopicConfiguration,分别用于高级队列和主题配置。
默认情况下,CachingConnectionFactory会用一些合理的设置来包装本机ConnectionFactory,您可以通过spring.jms.*中的外部配置属性来控制这些设置。
spring.jms.cache.session-cache-size=5
如果希望使用本机池,可以通过添加一个依赖项来配置org.messaginghub:pooled-jms实现,并相应地配置JmsPoolConnectionFactory,如下面的示例所示:
spring.artemis.pool.enabled=true spring.artemis.pool.max-connections=50
有关更多受支持的选项,请参见ArtemisProperties。https://github.com/spring-projects/spring-boot/blob/v2.2.2.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/jms/artemis/ArtemisProperties.java
使用Artemis配置中的name属性或通过配置提供的名称,不涉及JNDI查找,目的地根据其名称解析。
1.3使用JNDI ConnectionFactory
如果您在应用程序服务器上运行应用程序,Spring Boot将尝试使用JNDI来定位JMS ConnectionFactory。默认情况下,会检查java:/JmsXA和java:/XAConnectionFactory位置。您可以使用spring.jms.jndi-name,如果你需要指定一个替代位置,如下例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
1.4发送消息
Spring的JmsTemplate是自动配置的,您可以将其自动装配到您自己的bean中,如下例所示:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final JmsTemplate jmsTemplate; @Autowired public MyBean(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } // ... }
注:可以以类似的方式注入JmsMessagingTemplate。如果定义了DestinationResolver或MessageConverter bean,它将自动关联到自动配置的JmsTemplate。
1.5接收消息
当JMS基础结构存在时,任何bean都可以使用@JmsListener进行注释以创建侦听器端点。如果没有定义JmsListenerContainerFactory,则自动配置默认的JmsListenerContainerFactory。如果定义了DestinationResolver或MessageConverter bean,它将自动关联到默认工厂。
默认情况下,默认工厂是事务型的。如果在JtaTransactionManager存在的基础设施中运行,默认情况下它与侦听器容器相关联。如果没有,则启用sessionTransacted标志。在后一种场景中,可以通过在侦听器方法(或其委托)上添加@Transactional,将本地数据存储事务与传入消息的处理关联起来。这将确保在本地事务完成后确认传入消息。这还包括发送在同一JMS会话上执行的响应消息。
下面的组件在someQueue目的地创建一个侦听器端点:
@Component public class MyBean { @JmsListener(destination = "someQueue") public void processMessage(String content) { // ... } }
有关更多细节,请参见@EnableJms的Javadoc。
如果您需要创建多个JmsListenerContainerFactory实例或者你想覆盖默认的,弹簧引导提供了一个可以用来初始化一个的DefaultJmsListenerContainerFactoryConfigurer DefaultJmsListenerContainerFactory相同的设置一个自动配置。
例如,下面的示例展示了另一个使用特定MessageConverter的工厂:
@Configuration(proxyBeanMethods = false) static class JmsConfiguration { @Bean public DefaultJmsListenerContainerFactory myFactory( DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory()); factory.setMessageConverter(myMessageConverter()); return factory; } }
然后你可以使用工厂在任何@ jmslistener注释的方法如下:
@Component public class MyBean { @JmsListener(destination = "someQueue", containerFactory="myFactory") public void processMessage(String content) { // ... } }
2.AMQP
高级消息队列协议(AMQP)是一个与平台无关的、面向消息中间件的线级别协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了一些便利,包括spring-boot-starter-amqp“Starter”。
2.1RabbitMQ支持
RabbitMQ是一种轻量级、可靠、可伸缩、可移植的基于AMQP协议的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
RabbitMQ配置由spring.rabbitmq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=secret
或者,您可以使用地址属性配置相同的连接:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
有关详细信息,请参阅RabbitMQ使用的协议AMQP。https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/
2.2发送一条信息
Spring的AmqpTemplate和AmqpAdmin是自动配置的,您可以将它们自动装配到您自己的bean中,如下面的示例所示:
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MyBean { private final AmqpAdmin amqpAdmin; private final AmqpTemplate amqpTemplate; @Autowired public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) { this.amqpAdmin = amqpAdmin; this.amqpTemplate = amqpTemplate; } // ... }
可以以类似的方式注入RabbitMessagingTemplate。如果定义了MessageConverter bean,它将自动关联到自动配置的AmqpTemplate。
如果需要,任何org.springframework.amqp.core.Queue都可被定义为bean的队列将自动用于在RabbitMQ实例上声明相应的队列。
要重试操作,您可以在AmqpTemplate上启用重试(例如,在代理连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=2s
默认情况下禁用重试。您还可以通过声明RabbitRetryTemplateCustomizer bean以编程方式自定义RetryTemplate。
2.3接收一条信息
当Rabbit基础设施存在时,任何bean都可以使用@RabbitListener进行注释,以创建侦听器端点。如果没有定义RabbitListenerContainerFactory,则自动配置默认的SimpleRabbitListenerContainerFactory,您可以使用spring.rabbitmq.listener.type切换到直接容器。如果定义了MessageConverter或MessageRecoverer bean,它将自动与默认工厂关联。
下面的示例组件在someQueue队列上创建一个侦听器端点:
@Component public class MyBean { @RabbitListener(queues = "someQueue") public void processMessage(String content) { // ... } }
如果您需要创建多个RabbitListenerContainerFactory实例或者你想覆盖默认的,弹簧引导提供SimpleRabbitListenerContainerFactoryConfigurer和DirectRabbitListenerContainerFactoryConfigurer,您可以用它来初始化一个SimpleRabbitListenerContainerFactory和DirectRabbitListenerContainerFactory相同的设置工厂使用的自动配置。
您选择哪种容器类型并不重要。这两个bean由自动配置公开。
例如,下面的配置类暴露了另一个使用特定MessageConverter的工厂:
@Configuration(proxyBeanMethods = false) static class RabbitConfiguration { @Bean public SimpleRabbitListenerContainerFactory myFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(myMessageConverter()); return factory; } }
然后你可以在任何@ rabbitlistener注释的方法中使用工厂,如下所示:
@Component public class MyBean { @RabbitListener(queues = "someQueue", containerFactory="myFactory") public void processMessage(String content) { // ... } }
您可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用RejectAndDontRequeueRecoverer,但是您可以定义自己的MessageRecoverer。当重试耗尽时,消息将被拒绝,如果将代理配置为这样做,则消息将被丢弃或路由到死信交换器。默认情况下,重试是禁用的。您还可以通过声明RabbitRetryTemplateCustomizer bean以编程方式自定义RetryTemplate。
默认情况下,如果禁用了重试并侦听器抛出异常,则将无限期重试传递。您可以通过两种方式修改此行为:将defaultrequeuere属性设置为false,以便尝试零次重新传递,或者抛出AmqpRejectAndDontRequeueException来表示应该拒绝消息。后者是在启用重试并达到最大交付尝试次数时使用的机制。
3.3Apache Kafka支持
Apache Kafka支持spring-kafka项目的自动配置。
Kafka配置由spring.kafka.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
要在启动时创建主题,请添加NewTopic类型的bean。如果主题已经存在,则忽略bean。
3.1发送消息
Spring的KafkaTemplate是自动配置的,你可以直接在你自己的bean中自动装配它,如下面的例子所示:
@Component public class MyBean { private final KafkaTemplate kafkaTemplate; @Autowired public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... }
如果属性是spring.kafka.producer.transaction-id-prefix被定义,将自动配置KafkaTransactionManager。另外,如果定义了RecordMessageConverter bean,它将自动关联到自动配置的KafkaTemplate。
3.2接收消息
当Apache Kafka基础设施出现时,任何bean都可以使用@KafkaListener进行注释来创建侦听器端点。如果没有定义KafkaListenerContainerFactory,那么默认的将会自动配置spring.kafka.listener.*中定义的键。
下面的组件在someTopic主题上创建一个侦听器端点:
@Component public class MyBean { @KafkaListener(topics = "someTopic") public void processMessage(String content) { // ... } }
如果定义了KafkaTransactionManager bean,它将自动关联到容器工厂。类似地,如果定义了ErrorHandler、AfterRollbackProcessor或ConsumerAwareRebalanceListener bean,它将自动关联到默认工厂。
根据侦听器类型,RecordMessageConverter或BatchMessageConverter bean与默认工厂相关联。如果仅为批处理侦听器提供了RecordMessageConverter bean,则将其包装在BatchMessageConverter中。
注:自定义的ChainedKafkaTransactionManager必须标记为@Primary,因为它通常引用自动配置的KafkaTransactionManager bean。
3.3Kafka数据流
Apache Kafka的Spring提供了一个工厂bean来创建一个StreamsBuilder对象并管理其流的生命周期。只要Kafka流在类路径上,且Kafka流通过@EnableKafkaStreams注释启用,Spring引导就会自动配置所需的KafkaStreamsConfiguration bean。
启用Kafka流意味着必须设置应用程序id和引导服务器。前者可以使用spring.kafka.streams.application-id配置。应用程序id,如果没有设置,默认为spring.application.name。
使用专用属性可以获得几个附加属性;可以使用sspring.kafka.streams.properties名称空间来设置其他任意Kafka属性。有关更多信息,请参见附加的Kafka属性。
要使用工厂bean,只需将StreamsBuilder连接到您的@Bean中,如下面的示例所示:
@Configuration(proxyBeanMethods = false) @EnableKafkaStreams public static class KafkaStreamsExampleConfiguration { @Bean public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) { KStream<Integer, String> stream = streamsBuilder.stream("ks1In"); stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } }
默认情况下,由它创建的StreamBuilder对象管理的流将自动启动。您可以使用spring.kafka.streams.auto-startup自定义此行为。
3.4附加的Kafka属性
自动配置支持的属性显示在 appendix-application-properties.html中。请注意,在大多数情况下,这些属性(连字符或camelCase)直接映射到Apache Kafka虚线属性。有关详细信息,请参阅Apache Kafka文档。https://docs.spring.io/spring-boot/docs/2.2.2.RELEASE/reference/html/appendix-application-properties.html#common-application-properties
前几个属性适用于所有组件(生产者、消费者、管理员和流),但是如果希望使用不同的值,可以在组件级别指定。Apache Kafka指定属性的重要性为高、中、低。Spring Boot自动配置支持所有重要属性、某些选定的中、低属性以及任何没有默认值的属性。
只有Kafka支持的属性的一个子集可以通过KafkaProperties类直接使用。如果希望为生产者或消费者配置不受直接支持的附加属性,请使用以下属性:
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth spring.kafka.streams.properties.prop.five=fifth
通用的设置prop.one kafka属性到first(适用于producers,consumers和admins),prop.two admin 属性到second,prop.three consumer属性到third,prop.four producer到fourth,strean适用于fifth。
您还可以配置Spring Kafka JsonDeserializer如下:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
类似地,你可以禁用JsonSerializer默认的发送类型信息的行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
以这种方式设置的属性覆盖Spring Boot显式支持的任何配置项。
3.5使用嵌入式Kafka进行测试
Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的方便方法。要使用这个特性,使用来自spring-kafka测试模块的@EmbeddedKafka注释一个测试类。有关更多信息,请参阅Apache Kafka参考手册的Spring。
要使Spring引导自动配置与前面提到的嵌入式Apache Kafka代理一起工作,您需要将嵌入式代理地址的系统属性(由EmbeddedKafkaBroker填充)重新映射到Apache Kafka的Spring引导配置属性中。有几种方法可以做到这一点:
在测试类中提供一个系统属性来将嵌入式代理地址映射到spring.kafka.bootstrap-servers中:
static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); }
在@EmbeddedKafka注释上配置属性名:
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
SpringBoot学习(七)Caching和Messaging
原文:https://www.cnblogs.com/muxi0407/p/12103923.html