1:kafka需要zookeeper管理,所以需要先安装zookeeper。 下载docker pull wurstmeister/zookeeper:latest版本
1 安装docker zookeeper
docker pull wurstmeister/zookeeper
2. 启动镜像生成容器
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
1、查询kafaka镜像
docker search kafka
2、拉取镜像
docker pull wurstmeister/kafka
4:启动kafka镜像生成容器
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.155.56:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.155.56:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=192.168.155.56:2181/kafka 配置zookeeper管理kafka的路径192.168.155.56:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.155.56:9092 把kafka的地址端口注册给zookeeper
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
5:验证kafka是否可以使用
进入容器
docker exec -it kafka /bin/sh
进入路径:/opt/kafka_2.11-2.0.0/bin下
运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic sun
发送消息
{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}
重新打开一个终端 运行kafka消费者接收消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning
查看zookeeper容器内,可以看到kafka注册信息
docker exec -it zookeeper /bin/sh
运行zkCli.sh进入zookeeper客户端
./zkCli.sh
ls / 可以查看zookeeper根节点下都挂了那些目录
熟悉kafka的封装技巧
熟悉阿里审核图片和文本内容审核
完成自媒体文章审核代码
完成自媒体端发布文章发送消息
完成admin端接收消息并自动审核
消息对于现代软件项目来说,占有很重要的地位;同时市场上也发展处ActiveMq、RabbitMQ、Kafka、RocketMQ、Pulsar等众多优秀的框架;这些优秀的框架都由自身的特点和擅长的业务领域,在大数据领域中Kafka目前是使用较多的框架,Pulsar是一个后起之秀,目前处于一个快速发展的状态,有望能够成为下一代中间件的黑马。在本案例中我们选择使用Kafka作为内部消息通知的框架,以适应项目中大数据量的高吞吐、实时流计算等功能实现。
(1)Topic命名约束
Topic分为单类和混合类消息,不同类的消息命名约束如下:
KafkaProducerConfig自动配置Kafka消费者
KafkaConsumerConfig自动配置Kafka消费者
RetryErrorHandler实现消费者处理消息失败后重新发送到消息队列
KafkaMessage实现对发送的消息包装,提供重试次数、分类等信息
KafkaSender实现消息的统一发送入口功能
KafkaTopicConfig自动装载topic名称信息
KafkaListener提供自动注册消息消费监听接口类
KafkaListenerFactory提供启动时自动注册实现了KafkaListener的消息消费者
Kafka功能有独立的配置文件,放置在src\main\resources\kafka.properties,相关的值在maven_*.properties中配置。
#kafka config
kafka.hosts=localhost:9092
kafka.group=heima.${profiles.name}.${spring.application.name}
# 单消息通道,需要以sigle结尾
kafka.topic.admin-test=${kafka.topic.admin-test}
创建类com.heima.common.kafka.KafkaMessage。KafkaMessage是一个抽象类包含记录当前消息重发处理的次数retry、消息类型type、第一次创建消息的时间time信息。
/**
* Kafka消息
*/
public abstract class KafkaMessage<T> {
// 尝试次数
@Getter
int retry;
// 生成时间
@Getter
long time = System.currentTimeMillis();
// 消息类型
String type;
// 消息实体数据
@Setter
@Getter
T data;
public KafkaMessage(){}
public KafkaMessage(T data){
this.data = data;
}
public void addRetry(){
this.retry++;
}
// 获取消息类型
protected abstract String getType();
}
创建类com.heima.common.kafka.KafkaListener。KafkaListener是一个接口,继承ConsumerAwareMessageListener(提供Consumer信息和自动提交offsets功能)接口。
topic方法用于返回监听器监听的topic名称
factory方法用于指定监听器容器的创建工厂
group方法用于指定监听器的groupid,目前没用
/**
* 消息监听实现接口
*/
public interface KafkaListener<K,V> extends ConsumerAwareMessageListener<K,V> {
String topic();
default String factory(){
return "defaultKafkaListenerContainerFactory";
}
default String group(){ return "default";}
}
创建类:com.heima.common.kafka.KafkaTopicConfig。KafkaTopicConfig用于自动装入kafka.properties文件中的kafka.topic.*信息
@Data
@Configuration
@ConfigurationProperties(prefix="kafka.topic")
@PropertySource("classpath:kafka.properties")
public class KafkaTopicConfig {
String userLogin;
String userLogout;
String userRefresh;
String userRegister;
String hotArticle;
}
创建类com.heima.common.kafka.KafkaProducerConfig。KafkaProducerConfig类是自动化配置类,定义了默认的Producer工厂,以及KafkaTemplate,并约束了消息的类型为String,大小不超过16M。
@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix="kafka")
@PropertySource("classpath:kafka.properties")
public class KafkaProducerConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
@Autowired(required = false)
private ProducerListener<String, String> producerListener;
@Bean
public DefaultKafkaProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.getHosts());
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,3*MAX_MESSAGE_SIZE);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,3*MAX_MESSAGE_SIZE);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 256 * 1024);
return new DefaultKafkaProducerFactory<>( props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) {
KafkaTemplate<String, String> t = new KafkaTemplate<>(producerFactory);
if (this.producerListener != null) {
t.setProducerListener(this.producerListener);
}
return t;
}
}
创建类com.heima.common.kafka.KafkaSender。KafkaSender类是所有发送消息的方法统一管理器,其实现通过kafkaTemplate发送。
@Component
public class KafkaSender {
Logger logger = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
ObjectMapper mapper;
@Autowired
KafkaTopicConfig kafkaTopicConfig;
/**
* 发送一个消息
* @param topic
* @param key
* @param message
*/
public void sendMesssage(String topic,String key,KafkaMessage<?> message){
try {
this.kafkaTemplate.send(topic, key, mapper.writeValueAsString(message));
}catch (Exception e){
logger.error("send message to [{}] error:",topic,e);
}
}
/**
* 发送一个不包装的消息
* 只能是内部使用,拒绝业务上使用
* @param topic
* @param key
* @param message
*/
public void sendMesssageNoWrap(String topic,String key,String message){
try {
this.kafkaTemplate.send(topic, key, message);
}catch (Exception e){
logger.error("send message to [{}] error:",topic,e);
}
}
}
创建类com.heima.common.kafka.RetryErrorHandler。RetryErrorHandler类用于在消费者解析消息出现错误时,重新放回消息到队列中,并设置超过一个小时或者超过10次处理错误的消息丢弃,避免消息无限滚动;然后这类消息可以通过日志搜索查找出数据补偿重试。
@Component
public class RetryErrorHandler extends LoggingErrorHandler {
private static Logger logger = LoggerFactory.getLogger(RetryErrorHandler.class);
private static final int RETRY_COUNT = 10;
private static final int TIME_OUT = 3_600_000;//1个小时超时
@Autowired
KafkaSender sender;
@Autowired
ObjectMapper mapper;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
super.handle(thrownException, record);
if (record != null) {
try{
KafkaMessage<?> message = mapper.readValue((String)record.value(),KafkaMessage.class);
message.addRetry();
long time = System.currentTimeMillis()-message.getTime();
if(message.getRetry()>RETRY_COUNT||time>TIME_OUT){
logger.info("超时或者尝试{}次后,抛弃消息[topic:{}][{}]",RETRY_COUNT,record.topic(),record.value());
}else{
this.sender.sendMesssage(record.topic(),(String)record.key(),message);
logger.info("处理失败重新回滚到队列[retry:{}][topic:{}][key:{}]",message.getRetry(),record.topic(),record.key());
}
}catch (Exception e){
sender.sendMesssageNoWrap(record.topic(),(String) record.key(),(String) record.value());
}
}
}
}
创建类com.heima.common.kafka.KafkaProducerConfig。KafkaProducerConfig主要配置消费者监听器,配置重试器、错误处理器等信息,同时设置group消费者。
@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix="kafka")
@PropertySource("classpath:kafka.properties")
public class KafkaConsumerConfig {
private static final int CONCURRENCY = 8;
public final static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);
String hosts;
String group;
@Bean("defaultKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryErrorHandler retryErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setRetryTemplate(this.buildRetryTemplate());
factory.setErrorHandler(retryErrorHandler);
factory.getContainerProperties().setAckOnError(false);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(buildComsumerConfig()));
factory.setConcurrency(KafkaConsumerConfig.CONCURRENCY);
return factory;
}
protected Map<String, Object> buildComsumerConfig() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.getHosts());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, this.group);
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,8 * 1024 * 1024);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90_000);
return propsMap;
}
private RetryTemplate buildRetryTemplate() {
RetryTemplate t = new RetryTemplate();
ExponentialBackOffPolicy backOff = new ExponentialRandomBackOffPolicy();
backOff.setInitialInterval(1000L);
t.setBackOffPolicy(backOff);
t.setRetryPolicy(new SimpleRetryPolicy(5));
t.registerListener(new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
KafkaConsumerConfig.LOGGER.warn("Retry processing Kafka message "
+ context.getRetryCount() + " times", throwable);
}
});
return t;
}
}
创建类com.heima.common.kafka.KafkaListenerFactory。KafkaListenerFactory类实现在构造之后扫描实现了的KafkaListener接口的Bean,并自动注册成消费者监听器。
@Component
public class KafkaListenerFactory implements InitializingBean {
Logger logger = LoggerFactory.getLogger(KafkaListenerFactory.class);
@Autowired
DefaultListableBeanFactory defaultListableBeanFactory;
@Override
public void afterPropertiesSet() {
Map<String,KafkaListener> map = defaultListableBeanFactory.getBeansOfType(KafkaListener.class);
for (String key : map.keySet()) {
KafkaListener k = map.get(key);
AbstractKafkaListenerContainerFactory factory = (AbstractKafkaListenerContainerFactory)defaultListableBeanFactory.getBean(k.factory());
AbstractMessageListenerContainer container = factory.createContainer(k.topic());
container.setupMessageListener(k);
String beanName = k.getClass().getSimpleName()+"AutoListener" ;
defaultListableBeanFactory.registerSingleton(beanName,container);
logger.info("add auto listener [{}]",beanName);
}
}
}
/**
* 扫描所有的kafkamessage类
*/
@Log4j2
@Component
public class MessagesRegister implements InitializingBean {
Map<String,Class> messages = Maps.newConcurrentMap();
@Override
public void afterPropertiesSet() throws Exception {
Reflections reflections = new Reflections("com.heima");
Set<Class<? extends KafkaMessage>> ms = reflections.getSubTypesOf(KafkaMessage.class);
if(ms!=null){
ms.forEach(cla->{
try {
Constructor<?>[] cs = cla.getConstructors();
KafkaMessage mess = null;
if (cs != null && cs.length > 0) {
Class[] temp = cs[0].getParameterTypes();
Object[] parms = new Object[temp.length];
for (int i = 0; i < temp.length; i++) {
if(temp[i].isPrimitive()){
if(temp[i].getName().contains("boolean")){
parms[i]=false;
}else {
parms[i] = 0;
}
}else{
parms[i]=null;
}
}
mess = (KafkaMessage) cs[0].newInstance(parms);
} else {
mess = (KafkaMessage) cla.newInstance();
}
String type = mess.getType();
messages.put(type,cla);
}catch (Exception e){
System.out.println(cla+"====================:"+cla.getConstructors()[0].getParameterCount());
e.printStackTrace();
}
});
}
log.info("=================================================");
log.info("scan kafka message resultt[{}]",messages);
log.info("=================================================");
}
/**
* 通过消息的类型名称,查找对应的class定义
* @param type
* @return
*/
public Class<? extends KafkaMessage> findClassByType(String type){
return this.messages.get(type);
}
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaTest {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Test
public void test(){
try {
this.kafkaTemplate.send("topic.test", "123key","123value");
System.out.println("=================================");
Thread.sleep(500000);// 休眠等待消费者接收消息
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Component
public class TestKafkaListener implements KafkaListener<String,String> {
@Override
public String topic () {
return "topic.test";
}
@Override
public void onMessage (ConsumerRecord< String, String > data, Consumer< ?, ?> consumer){
System.out.println("===========receive test message:" + data);
}
}
原文:https://www.cnblogs.com/jianjunliu/p/14532806.html