开发者可以使用Kafka内置的客户端API开发应用程序
一个应用程序往kafka写入消息:记录用户的活动、记录度量指标、保存日志消息、记录智能家电的信息、与其他应用程序进行异步通信等
创建Producer代码
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
创建一个生产者对象必须指定序列化器。
如果发送到kafka的对象不是简单的字符串或整型,可以使用序列化框架来创建消息记录,如Avro、Thrift、Protobuf,或者使用自定义序列化器。
//简单一个客户类
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
//创建序列化器
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// 不做任何配置
}
@Override
/**
Customer对象被序列化成:
表示customerID的4字节整数
表示customerName长度的4字节整数(如果customerName为空,则长度为0)
表示customerName的N个字节
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to
byte[] " + e);
}
}
@Override
public void close() {
// 不需要关闭任何东西
}
}
数据被序列化成二进制文件或JSON文件,Avro在读写文件时需要用到schema,schema一般会被内嵌在数据文件里。特点,当写信息使用新的schema,负责读信息可以继续使用,无需改动。
如果键位NULL,分区器使用Round Robin将消息均衡分布到各个分区,键不为空,则对键散列,映射到相应分区,同一个键总是在同一个分区。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String))) ?
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana总是被分配到最后一个分区
// 其他记录被散列到其他分区
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
原文:https://www.cnblogs.com/chenshaowei/p/12513709.html