kafka集群安装:
版本是2.7版本
https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz
sudo tar -zxf /home/hadoop/下载/kafka_2.12-2.7.0.tgz -C /usr/local/ cd /usr/local sudo chown -R hadoop:hadoop ./kafka_2.12-2.7.0 cd ~ sudo vim /etc/profile
编辑添加
export KAFKA_HOME=/usr/local/kafka_2.12-2.7.0 export PATH=$PATH:$KAFKA_HOME/bin
新建目录:
mkdir /usr/local/kafka_2.12-2.7.0/logs
编辑:
sudo vim /usr/local/kafka_2.12-2.7.0/config/server.properties
如下:
broker.id=0 log.dirs=/usr/local/kafka_2.12-2.7.0/logs/ zookeeper.connect=master:2181,slave1:2181,slave2:2181,slave3:2181
分发:
cd /usr/local sudo scp -r kafka_2.12-2.7.0 hadoop@slave3:$PWD sudo scp -r kafka_2.12-2.7.0 hadoop@slave1:$PWD sudo scp -r kafka_2.12-2.7.0 hadoop@slave2:$PWD
修改拥有者同时设置环境变量
修改子节点的server.properties
broker.id
分别为
broker.id=1 broker.id=2 broker.id=3
每台节点启动zookeeper集群
cd /usr/local/zookeeper/bin/
./zkServer.sh start
在每台节点上
新开一个终端,启动kafka
cd /usr/local/kafka_2.12-2.7.0/bin kafka-server-start.sh ../config/server.properties
通过jps可查看到kafka
master新开一个终端
创建一个 topic
cd /usr/local/kafka_2.12-2.7.0/bin kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181 --replication-factor 4 --partitions 4 --topic spark-kafka
##--zookeeper zookeeper服务列表配置项,使用zk集群中任意节点即可 ##--create --topic 进行topic创建指令,创建的topic名称为spark-kafka ##--partitions 指定该topic下的partition数量 ##--replication-factor 指定该topic的副本数量
获取当前集群的所有 topic:
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181
生产者向指定的topic发送数据
##生产者向指定的topic发送数据 kafka-console-producer.sh --broker-list master:9092 --topic spark-kafka
消费者从指定的topic拉取数据
##消费者从指定的topic拉取数据 kafka-console-consumer.sh --botrap-server master:9092 --topic spark-kafka --from-beginning
删除指定topic
##删除指定topic kafka-topics.sh --zookeeper master:2181 --delete --topic spark-kafka
测试:
package cn.itcast.streaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Author itcast * Desc 演示使用spark-streaming-kafka-0-10_2.12中的Direct模式连接Kafka消费数据 */ object SparkStreaming_Kafka_Demo01 { def main(args: Array[String]): Unit = { //TODO 0.准备环境 val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //the time interval at which streaming data will be divided into batches val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次 ssc.checkpoint("./ckp") //TODO 1.加载数据-从Kafka val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "master:9092",//kafka集群地址 "key.deserializer" -> classOf[StringDeserializer],//key的反序列化规则 "value.deserializer" -> classOf[StringDeserializer],//value的反序列化规则 "group.id" -> "sparkdemo",//消费者组名称 //earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费 //latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费 //none:表示如果有offset记录从offset记录开始消费,如果没有就报错 "auto.offset.reset" -> "latest", "auto.commit.interval.ms"->"1000",//自动提交的时间间隔 "enable.auto.commit" -> (true: java.lang.Boolean)//是否自动提交 ) val topics = Array("spark-kafka")//要订阅的主题 //使用工具类从Kafka中消费消息 val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //位置策略,使用源码中推荐的 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略,使用源码中推荐的 ) //TODO 2.处理消息 val infoDS: DStream[String] = kafkaDS.map(record => { val topic: String = record.topic() val partition: Int = record.partition() val offset: Long = record.offset() val key: String = record.key() val value: String = record.value() val info: String = s"""topic:${topic}, partition:${partition}, offset:${offset}, key:${key}, value:${value}""" info }) //TODO 3.输出结果 infoDS.print() //TODO 4.启动并等待结束 ssc.start() ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来 //TODO 5.关闭资源 ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭 } } //测试: //1.准备kafka // /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181 // /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka // /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka //2.启动程序 //3.发送数据 //4.观察结果
在IDEA手动停掉程序
然后再虚拟机上再次输入数据后重启程序,可以看到在那之后的数据
关闭:
先关闭kafka再关闭zookeeper
kafka-server-stop.sh stop
cd /usr/local/zookeeper/bin/
./zkServer.sh stop
原文:https://www.cnblogs.com/a155-/p/14497498.html