注意: 本文不谈废话,低级问题请自行检查。
我使用Java版本的Kafka Producer生产数据,但是抛出了这个异常。百思不得其解,明明防火墙配置,ZooKeeper,Kafka配置都是没问题的啊。
困扰了我一天,最终发现这样一个问题: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
Kafka的server.properties文件中IP不能写主机名,必须写IP地址而不能写映射后的主机名.
如果你在这写的是hostname,例如bigdata:
跑一个Producer程序,你就会喜提Exception:
但是如果改成IP地址:
程序就能正常运行:
我用的版本是Kafka 0.8 ,果然版本低还是BUG太多。浪费了不少时间。
我的生产者代码:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Date; import java.util.Properties; import java.util.Random; public class TestProducer { public static void main(String[] args) { long events = 10; Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "192.168.29.132:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = "192.168.2." + rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("advClickStreamTopic", ip, msg); producer.send(data); } producer.close(); } }
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf(‘.‘); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
我在JIRA上并没找到相关BUG。.... 只能说有点坑
版本信息:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.9.2-RC3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency>
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置
原文:https://www.cnblogs.com/yosql473/p/10722687.html