首页 > 其他 > 详细

SparkStreaming+Kafka

时间:2021-03-08 09:25:51      阅读:36      评论:0      收藏:0      [点我收藏+]

技术分享图片

 

 

kafka集群安装:

版本是2.7版本

https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz

sudo tar -zxf /home/hadoop/下载/kafka_2.12-2.7.0.tgz -C /usr/local/
cd /usr/local
sudo chown -R hadoop:hadoop ./kafka_2.12-2.7.0
cd ~
sudo vim /etc/profile

编辑添加

export KAFKA_HOME=/usr/local/kafka_2.12-2.7.0
export PATH=$PATH:$KAFKA_HOME/bin

新建目录:

mkdir /usr/local/kafka_2.12-2.7.0/logs

编辑:

sudo vim /usr/local/kafka_2.12-2.7.0/config/server.properties

如下:

broker.id=0
log.dirs=/usr/local/kafka_2.12-2.7.0/logs/
zookeeper.connect=master:2181,slave1:2181,slave2:2181,slave3:2181

 

分发:

cd /usr/local
sudo scp -r kafka_2.12-2.7.0 hadoop@slave3:$PWD
sudo scp -r kafka_2.12-2.7.0 hadoop@slave1:$PWD
sudo scp -r kafka_2.12-2.7.0 hadoop@slave2:$PWD

修改拥有者同时设置环境变量

修改子节点的server.properties

broker.id
分别为
broker.id=1
broker.id=2
broker.id=3

每台节点启动zookeeper集群

cd /usr/local/zookeeper/bin/
./zkServer.sh start

在每台节点上

新开一个终端,启动kafka

cd /usr/local/kafka_2.12-2.7.0/bin
kafka-server-start.sh ../config/server.properties

通过jps可查看到kafka

master新开一个终端

创建一个 topic

cd /usr/local/kafka_2.12-2.7.0/bin
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181 --replication-factor 4 --partitions 4 --topic spark-kafka
##--zookeeper zookeeper服务列表配置项,使用zk集群中任意节点即可
##--create --topic 进行topic创建指令,创建的topic名称为spark-kafka
##--partitions 指定该topic下的partition数量
##--replication-factor 指定该topic的副本数量

获取当前集群的所有 topic:

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181
生产者向指定的topic发送数据
##生产者向指定的topic发送数据
kafka-console-producer.sh --broker-list master:9092 --topic spark-kafka
消费者从指定的topic拉取数据
##消费者从指定的topic拉取数据
kafka-console-consumer.sh --botrap-server master:9092 --topic spark-kafka --from-beginning
删除指定topic
##删除指定topic
kafka-topics.sh --zookeeper master:2181 --delete --topic spark-kafka

测试:

自动提交偏移量

package cn.itcast.streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Author itcast
 * Desc 演示使用spark-streaming-kafka-0-10_2.12中的Direct模式连接Kafka消费数据
 */
object SparkStreaming_Kafka_Demo01 {
  def main(args: Array[String]): Unit = {
    //TODO 0.准备环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //the time interval at which streaming data will be divided into batches
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据-从Kafka
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "master:9092",//kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],//key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer],//value的反序列化规则
      "group.id" -> "sparkdemo",//消费者组名称
      //earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费
      //latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
      //none:表示如果有offset记录从offset记录开始消费,如果没有就报错
      "auto.offset.reset" -> "latest",
      "auto.commit.interval.ms"->"1000",//自动提交的时间间隔
      "enable.auto.commit" -> (true: java.lang.Boolean)//是否自动提交
    )
    val topics = Array("spark-kafka")//要订阅的主题
    //使用工具类从Kafka中消费消息
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, //位置策略,使用源码中推荐的
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略,使用源码中推荐的
    )

    //TODO 2.处理消息
    val infoDS: DStream[String] = kafkaDS.map(record => {
      val topic: String = record.topic()
      val partition: Int = record.partition()
      val offset: Long = record.offset()
      val key: String = record.key()
      val value: String = record.value()
      val info: String = s"""topic:${topic}, partition:${partition}, offset:${offset}, key:${key}, value:${value}"""
      info
    })

    //TODO 3.输出结果
    infoDS.print()

    //TODO 4.启动并等待结束
    ssc.start()
    ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
  }
}
//测试:
//1.准备kafka
// /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
// /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
// /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka
//2.启动程序
//3.发送数据
//4.观察结果

技术分享图片

 

 技术分享图片

 

 在IDEA手动停掉程序

然后再虚拟机上再次输入数据后重启程序,可以看到在那之后的数据

技术分享图片

 

 技术分享图片

 

 

关闭:

先关闭kafka再关闭zookeeper

kafka-server-stop.sh stop
cd /usr/local/zookeeper/bin/
./zkServer.sh stop

 

SparkStreaming+Kafka

原文:https://www.cnblogs.com/a155-/p/14497498.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!