1.非中心的架构模型
2.基于TCP的一套Kafka通信协议
3.消息中间件&存储系统
4.存储逻辑层的高并发保证
5.isr机制降低了保证分布式一致性的代价
我们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoop和hbase都采用了主从式的架构模型,主从式的架构优点有很多,但是主从式下为了避免单点故障而采取的各种策略使得主从式架构的优点并不那么理想,kafka作为一个分布式的消息系统,它采用了非中心式的架构模型,每个节点都作为独立的Server向Client提供服务,在集群环境下,多个节点依赖zookeeper维护client在读写访问中的分布式一致性。
在早期0.8.2之前的kafka版本中,kafka对zookeeper的依赖非常大,producer、server、consumer都非常依赖zookeeper,虽然zookeeper作为一个轻量级的文件系统(已经成为分布式服务的基础服务,用以提供分布式环境下的一致性),但是大量的与其交互仍然会导致一些性能问题和不稳定的方面,在0.8.2之后的改进中,通过将一些状态保持在kafka自身中,减少与zookeeper的大量交互,为读写提供了更稳定的实现。
kafka的通信协议相当的简单,只有六类核心的客户端请求APIs。
Metadata:描述当前可用的brokers的host和port信息,并给出每个broker上分配了哪些partitions;
Send:发送messages到broker;
Fetch:从broker中获取messages,包括获取data、获取集群的元数据信息以及获取某个topic的offset信息;
Offsets:获取某个给定topic partition的可用offsets信息;
Offset Commit:提交consumer group的offsets信息;
Offset Fetch:获取某个consumer group的offsets信息集合。
这些都会在下面详细描述。另外,0.9版本的kafka对consumers和kafka connect支持一般的group management。这部分的Client Api包括五种requests:
GroupCoordinator:定位当前consumer group的coordinator;
JoinGroup:加入一个consumer group,如果没有就创建一个;
SyncGroup:同步同一个group下的所有consumer状态(partition分配到consumer的分布情况);
Heartbeat:用来检测group中的成员的存活状态;
LeaveGroup:直接离开一个group。
还有一些用于监控/管理 kafka集群的administrativeAPIs
DescribeGroups:用来检测当前的groups;
ListGroups:列出broker中管理的groups。
RequestMessage => ApiKey ApiVersionCorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest |OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response => CorrelationIdResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse |ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse |OffsetFetchResponse
ApiKey | 这个int数值是用来表明是哪一种请求,KafkaApis根据这个值来调用相应的处理逻辑 |
ApiVersion | 由于不同的Kafka版本支持的ApiVersion不同,因此要根据KafkaServer支持的ApiVersion来发送对应格式的Request |
CorrelationId | 客户端提供的一个整型值,在response中会原封不动的返回,它的作用主要是用来匹配client和server之间的request和response。 |
下面的列表是ApiKey的整型值对应的Request类:
API name | ApiKey Value |
ProduceRequest | 0 |
FetchRequest | 1 |
OffsetRequest | 2 |
MetadataRequest | 3 |
Non-user facing control APIs | 4-7 |
OffsetCommitRequest | 8 |
OffsetFetchRequest | 9 |
GroupCoordinatorRequest | 10 |
JoinGroupRequest | 11 |
HeartbeatRequest | 12 |
LeaveGroupRequest | 13 |
SyncGroupRequest | 14 |
DescribeGroupsRequest | 15 |
ListGroupsRequest | 16 |
Error | Code | 重试 | 描述 |
NoError | 0 | 没有错误,执行成功! | |
Unknown | -1 | 未知的server error | |
OffsetOutRange | 1 | 请求的offset值超出了server端维护的对应topic/partition的offset值(可以大于也可以小于) | |
InvalidMessage/CorruptMessage | 2 | YES | 消息内容不能通过CRC校验 |
UnknownTopicOrPartition | 3 | YES | 请求的topic或partition不再发往的broker上 |
InvalidMessageSize | 4 | 消息的大小为负值 | |
LeaderNotAvailable | 5 | YES | 请求发生在leader选举过程中时抛出这个异常,此时请求的partition没有leader无法读写 |
NotLeaderForPartition | 6 | YES | 在客户端尝试向不是leader的replica写入信息时抛出,意味着客户端的元数据信息过期了 |
RequestTimedOut | 7 | YES | request超过了用户指定的时间,一般是值socket超时 |
BrokerNotAvailable | 8 | 这个错误不是client遇到的,往往发生在工具类的请求中 | |
ReplicaNotAvailable | 9 | broker上没有期望的replica(可以被安全的忽视) | |
MessageSizeTooLarge | 10 | server有一个最大消息的配置,当client向server端写入超过配置大小的message时抛出 | |
StaleControllerEpochCode | 11 | 在broker和broker通信时发生的内部错误 | |
OffsetMetadataTooLargeCode | 12 | 如果指定了一个大于配置的offset metadata大小的string | |
GroupLoadInProgressCode | 14 | YES | 当topic partition的leader发生变化后,新的leader在load offsets时,offset fetch request请求时抛出,或者在group membership(例如heartbeat)的response中返回当coordinator在load group metadata时 |
GroupCoordinatorNotAvailableCode | 15 | YES | offsets topic还没创建或者group coordinator没有active |
NotCoordinatorForGroupCode | 16 | YES | offset fetch或commit request的请求发往一个不是coordinator的节点 |
InvalidTopicCode | 17 | 访问一个不可用的topic或者尝试对内部topic(__consumer_offset)进行写入操作时 | |
RecordListTooLargeCode | 18 | 如果produce的message batch超过了配置的segment size | |
NotEnoughReplicasCode | 19 | YES | 处于in-sync的replicas数量小于配置的produce要求的最小replicas和requiredAcks=-1 |
NotEnoughReplicasAfterAppendCode | 20 | YES | 当message被写入到log后,但是in-sync的replicas数小于需要的 |
InvalidRequiredAcksCode | 21 | 请求的requiredAcks是不可获得的 | |
IllegalGenerationCode | 22 | server端的generation id和request中的generation id不一致 | |
InconsistentGroupProtocolCode | 23 | 当前group能够接受的protocol type中不包含join group时给出的protocol type | |
InvalidGroupIdCode | 24 | 当join group时groupId为空或者null | |
UnknownMemberIdCode | 25 | 当前generation里group中不包含请求的memberId | |
InvalidSessionTimeoutCode | 26 | join group时超出了配置的request session timeout | |
RebalanceInProgressCode | 27 | 当请求发起时coodinator正在对group进行rebalance,此时client要重新join group | |
InvalidCommitOffsetSizeCode | 28 | 当offset commit超过metadata的最大限制被拒绝时 | |
TopicAuthorizationFailedCode | 29 | client没有访问请求的topic的权限时 | |
GroupAuthorizationFailedCode | 30 | ||
ClusterAuthorizationFailedCode | 31 |
kafka实现了基于tcp的一种通信协议,只要符合通信协议的规范,即可与kafka server进行通信,因而kafka client是跨语言的
kafka既可以被认为是消息中间件,也可以作为存储系统使用
由于kafka可以将producer发送的消息保存起来供consumer消费,因此既可以作为消息中间件使用,也可以作为存储系统来保存数据。
kafka在存储逻辑层的设计为高吞吐量提供了可能
kafka存储数据的逻辑单元是partition,producer和consumer的处理单元也是基于partition的,针对某个topic,可以有多个partition,而多个partition又可以分布在不同的节点上,从而在存储层保证了I/O的并发,为高吞吐量提供了可能。
kafka的isr同步机制使得保证分布式一致性的代价大大降低
kafka的isr机制,允许isr中的replica和主副本之前有一定的差距,这样做保证了响应的及时性,另一方面,由于在isr层面没有考虑严格的分布式一致性,没有使用paxos的leader选举策略,使得kafka的leader选举更加容易,没有严格的节点数要求的限制,只要有一个节点是isr中的,就不会丢数据。
本文出自 “数字科技” 博客,请务必保留此出处http://7639538.blog.51cto.com/7629538/1883631
原文:http://7639538.blog.51cto.com/7629538/1883631