依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
代码
package com.perfect.kafka; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Arrays; import java.util.Map; public class CustomPartiton implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { System.out.println("s = " + s + ", o = " + o + ", bytes = " + Arrays.toString(bytes) + ", o1 = " + o1 + ", bytes1 = " + Arrays.toString(bytes1) + ", cluster = " + cluster); return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
package com.perfect.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.junit.jupiter.api.Test; import java.util.*; public class PartitonerTest { @Test public void sendmessagetest(){ Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.perfect.kafka.CustomPartiton"); KafkaProducer<String,String> p = new KafkaProducer<String, String>(props); List<String> list = new ArrayList<>(); list.add("a"); list.add("b"); list.add("c"); for(int i=0;i<10;i++){ p.send(new ProducerRecord<>("topic2", list.get(i%3),"message"+i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e==null){ System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset()); } } }); } p.close(); } }
原文:https://www.cnblogs.com/abuduri/p/13342139.html