Attempted to decrease connection count for address with no connections
[2017-09-20 19:37:05,265] ERROR Found invalid messages during fetch for partition [xxxx,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
[2017-09-20 19:37:05,458] ERROR Found invalid messages during fetch for partition [xxxx,75] offset 1501373 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread)
[2017-09-20 19:37:07,455] ERROR [ReplicaFetcherThread-0-5], Error due to (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: error processing data for partition [xxxx,87] offset 1503346
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:147)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122)
at scala.Option.foreach(Option.scala:257)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbractFeherThread.scala:120)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.lang.RuntimeException: Offset mismatch: fetched offset = 1503346, log end offset = 1503297.
at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:110)
at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:138)
ERROR Found invalid messages during fetch for partition [qssnews_download,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log truncation is not allowed for topic test, Current leader 1‘s latest offset 0 is less than replica 2‘s latest offset 151 (kafka.server.ReplicaFetcherThread)
unclean.leader.election.enable=false
, 然后走到了代码里下面这段逻辑 if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// Prior to truncating the follower‘s log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("...")
Runtime.getRuntime.halt(1)
}
调用Runtime.getRuntime.halt(1)
直接暴力退出了.
可参考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"
WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [orderservice.production,0] hasn‘t been created. (kafka.server.ReplicaManager)
和
ERROR [ReplicaFetcherThread-0-58], Error for partition [reptest,0] to broker 58:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
replica
从错误的partition leader
上去同步数据了, 这理论上不应该啊;/brokers/[topic]
节点的内容里直接去掉了这个partiton的信息, 但是kafka controller
并不会处理partiton减少的情况, 可参考KafkaController分析
/controller
临时节点;[2017-09-30 10:49:36,126] ERROR [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: 138296566648 messages in segment __consumer_offsets-5/00000000000000000000.log but offset map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
at scala.Predef$.require(Predef.scala:219)
at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
at kafka.log.Cleaner.clean(LogCleaner.scala:322)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
LogCleaner
的源码可知,是00000000000000000000.log
这个logSegment
的segment.nextOffset() - segment.baseOffset
大于了maxDesiredMapSize
, 导致了LogClean
线程的终止, 从而无法清理, 这不应该啊?! val segmentSize = segment.nextOffset() - segment.baseOffset
require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can in了crease log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize))
if (map.size + segmentSize <= maxDesiredMapSize)
offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
else
full = true
00000000000000000000.log
和00000000000000000000.index
, 然后删掉了cleaner-offset-checkpoint
中相关的项,重启broker, 日志开始了压缩清理logSegment
的segment.nextOffset() - segment.baseOffset
大于了maxDesiredMapSize
, 猜测是有个业务是手动提交offset到这个partition, 没有控制好,导致每秒能提交8,9MByte上来;stop the world
,broker所有线程都停止工作, 自然也无法与zk保持心跳;GC
是个大麻烦, 网上也搜了一圈, 没找到有效的解决方案, 个人水平有限, 哪位大神有什么好的方法, 可以留言给我,谢谢~zookeeper.forceSync=no
, 降低写盘IO, 这个配置有其副作用, 在线上使用时还需慎重;Grafana
来显示和报警;Attempted to decrease connection count for address with no connections
[2016-10-13 00:00:00,495] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /xxx.xxx.xxx.xxx
at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:445)
at java.lang.Thread.run(Thread.java:745)
[2017-10-12 16:52:38,141] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:745)
SocketServer
中: try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error("Closing socket for " + receive.source + " because of error", e)
isClose = true
close(selector, receive.source)
}
在处理Request时并未处理这个异常,导致这个异常被其外层的try...catch...
处理, 直接进入了下一轮的selector.poll(300)
, 而在这个selector.poll(300)
中会清理之前所有的接收到的Requests, 这就导致在这种情况下,可能会漏处理一些Request, 这样看起来还是个比较严重的问题;
selector.completedReceives.asScala.foreach { receive =>
var isClose = false
try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error("Closing socket for " + receive.source + " because of error", e)
isClose = true
close(selector, receive.source)
case e : ArrayIndexOutOfBoundsException =>
error("NotSupport Request | Closing socket for " + receive.source + " because of error", e)
isClose = true
close(selector, receive.source)
}
if (!isClose) {
selector.mute(receive.source)
}
}
inflightResponses
会缓存住需要发送但还没有发送完成的response, 这个response又同时持有其对应的request的引用, 访问请求量大的时候其内存占用不少.inflightResponses
0.9.0.1代码中只在completedSends中作了remove, 在disconnected
和close
中没有处理;inflightResponses
变量去掉, 但这会有个副作用,会影响到Metrics的统计;disconnected
和close
也加入移除的操作;原文:https://www.cnblogs.com/wangbin/p/10647997.html