首页 > Windows开发 > 详细

Kafka命令行操作及常用API

时间:2019-02-28 22:54:29      阅读:268      评论:0      收藏:0      [点我收藏+]

一、Kafka命令行操作

  1.查看当前集群已存在的主题

  bin/kafka-topic.sh --zookeeper hd09-01:2181 --list

  2.创建主题

  bin/kafka-topic.sh --zookeeper hd09-01:2181 --create --relication-factor 3 \

  >--partition 1 \

  >---topic xinnian

  3.删除主题

  bin/kafka-topic.sh --zookeeper hd09-01:2181 --delete --topic xinnian

  4.启动生产者发送消息(相当于直接创建主题)

  bin/kafka-console-producer.sh --broker-list hd09-01:9092 --topic xinnian

  5.启动消费者接收消息

  bin/kafka-console-consumer.sh --bootstrap-server hd09-01:9092 \

  >--topic xinnian

  >--from-beginning

  6.查看主题的详细信息

  bin/kafka-topic.sh --zookeeper hd09-01:2181 --describe --topic xinnian

 

二、Kafka的常用API(生产者和消费者)

  注意!!!在本地编译器上编写kafka的生产者消费者API时,要在本地的hosts文件中添加映射!!

  1.生产者

  1)配置生产者属性(kafka节点地址,是否等待应答,发送消息失败是否重试,批处理消息大小,批处理数据延迟,内存缓冲,序列化等)生产者发送数据,只需序列化
     Properties prop = new Properties();
     prop.put("","");
     2)实例化生产者
     KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
     3)生产者发送消息(使用只含有ProducerRecord的send方法)
     for(~){
       producer.send(new ProducerRecord<String,String>("topic","value"));
     }
     4)关闭生产者资源
     producer.close();

 

/**
 * @author: PrincessHug
 * @date: 2019/2/28, 16:36
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class Prodecer {
    public static void main(String[] args) {
        Properties prop = new Properties();

        //参数配置
        //kafka节点的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //发送消息是否等待应答
        prop.put("acks", "all");
        //配置发送消息失败重试
        prop.put("retries", "0");
        //配置批量处理消息大小
        prop.put("batch.size", "10240");
        //配置批量处理数据延迟
        prop.put("linger.ms","5");
        //配置内存缓冲大小
        prop.put("buffer.memory", "12341234");
        //消息在发送前必须序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        for (int i=10;i<100;i++){
            producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"));
        }

        producer.close();

    }
}

 

  2.消费者

  1)配置消费者属性(服务器地址,消费者组,自动确认偏移量,反序列化)
      Properties prop = new Properties();
     2)实例化消费者(线程安全,把consumer定义为常量)
     final KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
     3)释放资源,线程安全
      public class Runtime extends Object
      每个Java应用程序都有一个Runtime类的Runtime ,允许应用程序与运行应用程序的环境进行接口。
      void addShutdownHook(Thread hook) =>注册一个新的虚拟机关机挂钩。
      static Runtime getRuntime() =>返回与当前Java应用程序关联的运行时对象。   
     4)订阅消息主题,消费者拉取消息poll
      while(true){
       ConsumerRecords<String,String> records = consumer.poll(1000);
        //遍历消息
        for(ConsumerRecord<String,String> r:records){
           System.out.println(r.key() + "----------" + r.value());
        }
     }

/**
 * @author: PrincessHug
 * @date: 2019/2/28, 20:07
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class ConsumerDemo {
    public static void main(String[] args) {
        //配置消费者属性
        Properties prop = new Properties();
        prop.put("bootstrap.servers","192.168.126.129:9092");
        prop.put("group.id","g1");
        prop.put("enable.auto.commit","true");
        prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        //初始化消费的对象
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

        //线程安全,释放资源
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                if (consumer!=null){
                    consumer.close();
                }
            }
        }));

        //消费者订阅主题
        consumer.subscribe(Arrays.asList("xinnian"));

        //消费者拉取消息
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record:records){
                System.out.println(record.topic()+"-----"+record.value());
            }
        }

    }
}

 

Kafka命令行操作及常用API

原文:https://www.cnblogs.com/HelloBigTable/p/10453546.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!