kafka使用(1)1ConsumerRecord2创建主题2.1注解方式2.2KafkaAdmin和AdminClient2.3为什么要分区2.4获得主题的信息3自定义发送消息监听器4kafka事务管理使用事务结果:
public class ConsumerDemo {
topics = "topicTEST") (
public void listen (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, timestamp is %s, value is %s \n", record.topic(), record.offset(), record.timestamp(),record.value());
}
}
假设我们有3个kafka broker分别brokerA、brokerB、brokerC.
当我们创建的topic有3个分区partition时并且replication-factor为1,基本上一个broker上一个分区。挡一个broker宕机了,该topic就无法使用了,因为三个分区只有两个能用,
当我们创建的topic有3个分区partition时并且replication-factor为2时,可能分区数据分布情况是
节点 | 分区 | 分区副本 |
---|---|---|
brokerA | partiton0 | partiton1 |
brokerB | partiton1 | partiton2 |
brokerC | partiton2 | partiton0 |
//创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
public NewTopic initialTopic() {
return new NewTopic("topic.quick.initial",8, (short) 1 );
}
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
//配置Kafka实例的连接地址
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
KafkaAdmin admin = new KafkaAdmin(props);
return admin;
}
?
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfigurationProperties());
}
private AdminClient adminClient;
?
public void testCreateTopic() throws InterruptedException {
NewTopic topic = new NewTopic("topic.quick.initial2", 1, (short) 1);
adminClient.createTopics(Arrays.asList(topic));
Thread.sleep(1000);
}
结果:
分区提高系统的吞吐量,但是分区的参数只能增大,不能减小。
/**
* 获得主题的信息
*/
public void testSelectTopicInfo() throws ExecutionException, InterruptedException {
DescribeTopicsResult topicTest = adminClient.describeTopics(Arrays.asList("topic.quick.initial"));
topicTest.all().get().forEach((k,v)->{
System.out.println("k: "+k+" ,v: "+v.toString()+"\n");
});
}
k: topic.quick.initial ,v: (name=topic.quick.initial, internal=false, partitions=(partition=0, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=1, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=2, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=3, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=4, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=5, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=6, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=7, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)), authorizedOperations=null)
?
public class KafkaSendListenerHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendListenerHandler.class);
?
?
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("监听消息发送成功...");
String key = (String)producerRecord.key();
log.info("key : " + key);
log.info("Message send success : " + producerRecord.toString());
log.info("-----------------------------");
}
?
public void onError(ProducerRecord producerRecord, Exception exception) {
?
}
}
KafkaTemplate 的 executeInTransaction 方法来声明事务
public void testExecuteInTransaction() throws InterruptedException {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send("topicTEST", "test executeInTransaction");
throw new RuntimeException("fail");
//return true;
}
});
}
使用@Transactional注解方式使用注解方式开启事务,首先需要配置KafkaTransactionManager,这个类是Kafka提供事务管理类,需要使用生产者工厂来创建这个事务管理类。需要注意的是,在producerFactory中开启事务功能,并设置TransactionIdPrefix,TransactionIdPrefix是用来生成Transactional.id的前缀。
public ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.transactionCapable();
factory.setTransactionIdPrefix("tran-");
return factory;
}
?
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
?
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory());
}
?
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
return manager;
}
org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) [kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) [kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.1.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]
?
2020-09-15 18:50:54.064 INFO 14636 --- [ main] o.s.t.c.transaction.TransactionContext : Rolled back transaction for test: ....
原文:https://www.cnblogs.com/Lambquan/p/13675100.html