示例一:
生产者
package kafka import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object Producter { def main(args: Array[String]): Unit = { val props = new Properties () //服务器 props.put ( "bootstrap.servers", "192.168.186.150:9092" ) //acks配置控制请求被认为是完整的标准。我们指定的“all”设置将导致阻塞记录的完全提交,这是最慢但最持久的设置。 props.put ( "acks", "all" ) //如果请求失败,生产者可以自动重试,但由于我们已经将重试指定为0,所以它不会。启用重试也会增加重复的可能性(有关消息传递语义的文档中有详细信息)。 // props.put ( "retries", 0 ) //生产者维护每个分区的未发送记录的缓冲区。这些缓冲区的大小由batch.size配置指定。使得这个较大的可能导致更多的批处理, // 但需要更多的内存(因为我们通常会为每个活动分区拥有这些缓冲区之一)。 //props.put ( "batch.size", 16384 ) //默认情况下,即使缓冲区中有其他未使用的空间,也可以立即发送缓冲区。但是,如果您想减少请求的数量,可以将linger.ms设置为大于0的值。 // 这将指示生产者在发送请求之前等待这个数毫秒,希望更多的记录将到达以填满同一批。这类似于Nagle在TCP中的算法。 // 例如,在上面的代码片段中,由于我们将逗留时间设置为1毫秒,很可能所有100条记录都是在一个请求中发送的。 // 但是,如果不填充缓冲区,此设置将为我们的请求增加1毫秒的延迟,等待更多的记录到达。 // 请注意,在时间上接近到达的记录通常会批处理到一起,即使使用linger.ms=0,因此在重负载批处理下, // 无论逗留配置如何,都会发生批处理;但是,将其设置为大于0的值会导致在没有最大负载的情况下,以少量延迟为代价的更少、更高效的请求。 // props.put ( "linger.ms", 1 ) //buffer.memory控制生产者可用于缓冲的内存总量。如果记录的发送速度超过了将其传输到服务器的速度,那么这个缓冲区空间就会耗尽。 // 当缓冲区空间耗尽时,额外的发送调用将被阻塞。阻塞时间的阈值由max.block.ms确定,然后抛出一个TimeoutException。 //props.put ( "buffer.memory", 33554432 ) //key.serializer和value.serializer指示如何将用户提供的密钥和值对象转换为字节。您可以使用包含的ByteraySerializer或StringSerializer来实现简单的字符串或字节类型。 props.put ( "key.serializer", "org.apache.kafka.common.serialization.StringSerializer" ) props.put ( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer" ) val producter = new KafkaProducer[String, String]( props ) /** * private val topic: String = null * private val partition: Integer = null * private val key: K = null * private val value: V = null * private val timestamp: Long = 0L */ var i = 0 while (i < 100) { val pro = new ProducerRecord[String, String]( "hello", i % 3, "" + i, "" + i ) producter.send ( pro ) //睡眠 Thread.sleep ( 2000 ) println ( "生产者数据: 分区:" + pro.partition () + " key:" + pro.key () + " value:" + pro.value () ) i += 1 } producter.close () } }
消费者
package kafka import java.util import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} object consumerDemo01 { def main(args: Array[String]): Unit = { val map = new util.HashMap[String, Object]() //通过使用configuration bootstrap.servers指定要联系的一个或多个代理程序的列表,可以引导与群集的连接。此列表仅用于发现群集中的其余代理, // 并且不需要是群集中服务器的详尽列表(尽管在客户端连接时服务器出现故障时,您可能需要指定多个服务器)。 map.put ( "bootstrap.servers", "192.168.186.150:9092" ) //在本例中,使用者作为一组名为test的消费者的一部分订阅主题foo和bar,并配置为group.id。 map.put ( "group.id", "test" ) //设置enable.auto.commit意味着偏移以配置auto.commit.interval.ms.控制的频率自动提交 map.put ( "enable.auto.commit", "true" ) map.put ( "auto.commit.interval.ms", "1000" ) map.put ( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" ) map.put ( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" ) val consumer = new KafkaConsumer[String, String]( map ) consumer.subscribe ( util.Arrays.asList ( "hello" ) ) while (true) { //如果缓冲区中没有数据可用,则以100毫秒为单位的时间在轮询中等待.如果0,立即返回缓冲区中可用的任何记录,否则返回空。不得为负数。 val records = consumer.poll ( 1000 ) val result: util.Iterator[ConsumerRecord[String, String]] = records.iterator () while (result.hasNext) { val r = result.next () println ( "消费者数据是:偏移量:" + r.offset () + " 分区:" + r.partition () + " K: " + r.key () + " value: " + r.value () ) } } } }
原文:https://www.cnblogs.com/wangshuang123/p/11133273.html