答:
5.消费者数据的不丢失通过 offset commit 来保证数据的不丢失,kafka 自己记录了每次消费的 offset 数值,下次继续消费的时候,接着上次的 offset 进行消费即可。
答:
①kafka producer 发送消息的时候,可以指定 key,这个 key 的作用是为消息选择存储分区,key 可以为空,当指定 key 且不为空的时候,kafka 是根据 key 的 hash 值与分区数取模来决定数据存储到那个分区
②当 key=null 时,kafka 是先从缓存中取分区号,然后判断缓存的值是否为空,如果不为空,就将消息存到这个分区,否则重新计算要存储的分区,并将分区号缓存起来,供下次使用。
③ kafka 定 义 了 一 个 全 局 变 量 , 这 个 变 量 值 是 配 置 参 数 中 的topic.metadata.refresh.interval.ms 设置的值,也就是说在这个时间内,key=null 的消息都会往缓存起来的这个分区存储,当时缓存过时之后,就会重新计算分区号,将计算结果缓存起来。也就是说在key为null的情况下,Kafka并不是每条消息都随机选择一个Partition;而是每隔 topic.metadata.refresh.interval.ms 才会随机选择一次!
答:Kafka 可以将主题划分为多个分区(Partition),会根据分区规则选择把消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力:
Kafka 中的 message 是以 topic 为基本单位进行组织的,不同的 topic 之间是相互独立的。每个 topic 又可以分为不同的 partition,每个 partition 存储一部分的 message 信息。 Partition 是以文件的形式存储在文件系统中,比如,创建了一个名为 test 的 topic,其有 5 个 partition,那么在 Kafka 的数据目录中(由配置文件中的 log.dirs 指定的)中就会有这样 5 个 目 录 : test-0 , test-1 , test-2 , test-3 , test-4 , 其 命 名 规 则 为,,里面存储的分别就是这 5 个 partition 的数据。Partition中的每条 Message 由 offset 来表示它在这个 partition 中的偏移量,这个 offset 不是该Message 在 partition 数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition 中的一条 Message。因此,可以认为 offset 是 partition 中 Message 的 id。partition 中的每条 Message 包含了以下三个属性:offset / MessageSize / data。
(1)kafka 和 flume 都是日志系统。kafka 是分布式消息中间件,自带存储,提供 push 和 pull 存取数据功能,是一个非常通用的系统,可以有许多生产者和很多的消费者共享多个主题 Topics。flume 分为 agent(数据采集器)[source channel sink]。是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展 API.
(2)kafka 做日志缓存应该是更为合适的,但是 flume 的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行 flume+kafka 模式,如果为了利用 flume 写hdfs 的能力,也可以采用 kafka+flume 的方式。
Kafka 的数据最终是落地到磁盘上的。存数据会先存到一个 Topic 主题之下,一个 Topic 可以认为是一类消息,每个 topic 将被分成多个 partition(区),每个 partition 在存储层面是append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型数字,它是唯一标记一条消息。为了保证数据不丢失,需要将存储到每个 partion 上的数据进行备份,并且备份的数据其他的多台服务器上,每个 partition 都有一个 server 为"leader";leader 负责所有的读写操作,如果 leader 失效,那么将会有其他 follower 来接管(成为新的 leader);follower 只是单调的和 leader 跟进,同步消息即可..由此可见作为 leader 的 server 承载了全部的请求压力,因此从集群的整体考虑,有多少个 partitions 就意味着有多少个"leader",kafka 会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定.
kafka 的可以接受的数据来源非常广泛,可以是服务器日志,也可以是数据库中的数据,甚至你可以自己编写 python 脚本往 kafka 发送自己构造的数据。kafka 是一个开源产品,提供 api 接口,常见的就是 post 到 kafka 所在机器的 ip+port/url。 他是一个产品,不是一个库。
Kafka 是分布式的消息队列系统,数据放在分区中,可并发执行,先进先出,数据有序,有producers 生产和 consumers 消费顺序kafka 集群有多个 Broker 服务器组成,每个类型的消息被定义为 topic。先有 topic 主题后有分区,分区散列在不同的节点上同一 topic 内部的消息按照一定的 key 和算法被分区(partition)存储在不同的 Broker 上。消息生产者 producer 和消费者 consumer 可以在多个 Broker 上生产/消费 topicKafka 遵循主从架构由 Zookeeper 维护,只保证单分区内的数据有序(offset 保证数据的有序性)对于每个 partition 都有副本
(1) 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2) 冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
(3) 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
(4) 灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5) 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka 保证一个 Partition 内的消息的有序性。
(6) 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
1、kafka消费消息的offffset是定义在zookeeper中的
2、如果想重复消费kafka的消息,可以在redis中自己记录offffset的checkpoint点(n个)
3、当想重复消费消息时,通过读取redis中的checkpoint点进行zookeeper的offffset重设
4、这样就可以达到重复消费消息的目的了
1 请求元数据,(开始的时候metadata中没有元数据信息)
2 Sender networkclient 向kafkacluster 中的 brokerserver中获取元数据的信息
3 对metadata更新topic的元数据 ,topic的元数据及相关分区,每个分区对应所在的机器
对record进行分区,Metadata的元数据中有topic topic中的有分区 用来维护topic的元数据信
4 Record进行分区,partitioner分区,默认default的分区,如果record有分区信号, 则按照分区信号进行分区,如果没有分区信号,record中的value为 (key: value)类型,以key的hash值进行分区, 如果非key、value类型的,则进行轮询分区
5 讲分区完的数据进入到recordaccumulator往后进行追加,根据分区类型。会有一定的判断,当数据量达到一定的数量的时候,将数据发往kafka集群中去,(为了使得性能更好一点,不是来一条数据就发,先将数据放入到recordaccumulator内存中,网络有开销。来一条数据先放入produce的内存中去,——
6 根据规则拿到需要发送的records ( 规则: 数据量的大小,以及拿到分区所在的node中。)
7 将相应的分区数据发送到node。由sender进行。
1.生产者数据的不丢失
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有0,1,-1。
如果是同步模式:ack机制能够保证数据的不丢失,如果ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。
producer.type=sync
request.required.acks=1
如果是异步模式:也会考虑ack的状态,除此之外,异步模式下的有个buffffer,通过buffffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffffer满了数据还没有发送出去,有个选项是配置是否立即清空buffffer。可以设置为-1,永久阻塞,也就数据不再生产。
异步模式下,即使设置为-1。也可能因为程序员的不科学操作,操作数据丢失,比如kill -9,但这
是特别的例外情况。
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
结论:producer有丢数据的可能,但是可以通过配置保证消息的不丢失。
2.消费者数据的不丢失
通过offffset commit 来保证数据的不丢失,kafka自己记录了每次消费的offffset数值,下次继续消费的时候,会接着上次的offffset进行消费。
而offffset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offffset的值,找到之前消费消息的位置,接着消费,由于offffset的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,
但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置KafkaSpoutConfifig.bulider.setGroupid的时候设置成了一样的groupid,这种情况会导致这两个组共享同一份数据,就会产生组A消费partition1,partition2中的消息,组B消费partition3的消息,这样每个组消费的消息都会丢失,都是不完整的。 为了保证每个组都独享一份消息数据,groupid一定不要重复才行。
3.kafka集群中的broker的数据不丢失
每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。
原文:https://www.cnblogs.com/tesla-turing/p/11959994.html