xshell 设置消费权限
/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Brent --consumer --allow-host 192.168.239.146 --topic AUTHTEST --group Brent_group
package com.chinalife.kafka2demo.java;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Collections;
import java.util.Properties;
public class ConsumerScramJavaSubscribe {
private static KafkaConsumer<String,String> consumer;
private static Properties kfkProperties;
private static String topic = "AUTHTEST";
static{
kfkProperties = new Properties();
kfkProperties.put("bootstrap.servers","192.168.239.146:9094,192.168.239.147:9094");
System.setProperty("java.security.auth.login.config","/opt/cloudera/parcels/KAFKA-4.0.0-1.4.0.0.p0.1/etc/kafka/conf.dist/kafka_client_jaas.conf");
kfkProperties.put("group.id","Brent_group");
kfkProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kfkProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kfkProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kfkProperties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
//kfkProperties.put("sasll.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"Brent\" password=\"thisIsSecret\";");
}
/**
* consumer 2 : 手动提交位移
*/
private static void generalConsumerMessageManualCommitSync() {
kfkProperties.put("enable.auto.commit",false);
consumer = new KafkaConsumer<>(kfkProperties);
consumer.subscribe(Collections.singletonList(topic));
while(true){
ConsumerRecords<String,String> records = consumer.poll(3000);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +
"," + record.value());
}
try{
consumer.commitSync();
}catch (CommitFailedException e){
System.out.println("commit failed msg" + e.getMessage());
}
}
}
public static void main(String args[]){
ConsumerScramJavaSubscribe.generalConsumerMessageManualCommitSync();
}
}
原文:https://www.cnblogs.com/BrentBoys/p/11211678.html