刘伟 360云计算
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。小编会给大家带来几期 Kafka 相关的源码分析文章。这一系列文章是基于kafka 0.9.1版本,今天先来网络层的第一部分-概述和网络层模型实现。
PS:丰富的一线技术、多元化的表现形式,尽在“HULK一线技术杂谈”,点关注哦!
这个模型其实一点也不神秘,很质朴,很清晰,也很好用,引用源码中的一句话:
The threading model is 1 Acceptor thread that handles new connections Acceptor has N Processor threads that each have their own selector and read requests from socketsM Handler threads that handle requests and produce responses back to the processor threads for writing
再来张图:
总结起来就是个半同步半异步模型(https://github.com/DavidLiuXh/lightningserver).
数据传输层实现:clients/src/main/java/org/apache/kafka/common/network,里面包括了Channel,TransportLayer,Authenticator等.
(1) 根据配置的若干endpoint创建相应的Acceptor及相关联的一组Processor线程;
for (i <- processorBeginIndex until processorEndIndex) {
processors(i) = newProcessor(i,
time,
maxRequestSize,
requestChannel,
connectionQuotas,
connectionsMaxIdleMs,
protocol,
config.values,
metrics
)
}
val acceptor = newAcceptor
(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
(2) 创建Acceptor运行的线程并启动;
Utils
.newThread(
"kafka-socket-acceptor-%s-%d"
.format(protocol.toString,
endpoint.port), acceptor,
false
).start()
acceptor.awaitStartup()
val serverChannel =
ServerSocketChannel
.open()
serverChannel.configureBlocking(
false
)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
try
{
serverChannel.socket.bind(socketAddress)
info(
"Awaiting socket connections on %s:%d."
.format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
}
catch
{
case
e:
SocketException
=>
throw
new
KafkaException
(
"Socket server failed to bind to %s:%d: %s."
.format(socketAddress.getHostName, port, e.getMessage), e)
}
(2) 开启分配到的若干Processor:
this.synchronized { processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor,
false
).start()
}
}
(3) run()-利用NIO的selector来接收网络连接:
varcurrentProcessor = 0
while(isRunning) {
try{val ready = nioSelector.select(500)
if (ready > 0) { val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
tr {val key = iter.
nextiter.remove()
if
(key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw
new
IllegalStateException
(
"Unrecognized key state for acceptor thread."
)
// round robin to the next processor thread
currentProcessor = (currentProcessor +
1
) % processors.length
}
catch
{
case
e:
Throwable
=> error(
"Error while accepting connection"
, e)
}
}
}
}
这里面最主要的就是 accept(key,processors(currentProcessor))
(4) accept: 设置新连接socket的参数后交由Processor处理:
socketChannel.configureBlocking(
false
)
socketChannel.socket().setTcpNoDelay(
true
)
socketChannel.socket().setKeepAlive(
true
)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)
run()是核心, 包裹在一个循环里,直接线程退出, while(isRunning){},里面依次调用如下函数:
(1) configureNewConnections():从并发队列Q1里取出SocketChannel,添加到自身的nio selector中,监听读事件;
(2) processNewResponses():处理当前所有处理完成的request相应的response, 这些response都是从RequestChannel获得( requestChannel.receiveResponse),根据request的类型来决定从当前连接的nio selector中暂时删除读事件监听/添加写事件/关闭当前连接;
(3) selector.poll(300): 这个就不用解释了, 这个selector是对nio selector的又一封装,我们后面一章会讲到,它完成具体的数据接收和发送;
(4) selector.completedReceives.asScala.foreach: 处理当前所有的从selector返回的完整request,将其put到RequestChannel的一个阻塞队列里,供应用层获取并处理;同时会暂时删除些连接上的读事件监听: selector.mute(receive.source);
(5) selector.completedSends.asScala.foreach: 处理当前所有的从selector返回的写操作,重新将读事件添加到连接的selector监听中 selector.unmute(send.destination);
(6) selector.disconnected.asScala.foreach: 处理当前所有将关闭的连接;
processNewResponses() 处理RequestChannel中所有的response;
下一篇咱们来讲在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送。
原文:https://blog.51cto.com/15127564/2669081