(1)首先通过在服务器上使用命令行来模拟生产、消费数据,发现生产、消费数据正常。测试如下:
#生产者
bin/kafka-console-consumer.sh --bootstrap-server ***.***.***.***:9092 --topic test
其中 ***.***.***
代表的是 kafka broker的地址.
再开一个窗口启动消费者:
#消费者
bin/kafka-console-consumer.sh --bootstrap-server ***.***.***.***:9092 --topic test
在生产端随机输入数据,在消费者端接收正常,如下图所示:
(2)那么问题可能出现在client和服务端的连接上了,经过查资料分析,看到网上有如下解决方案:
将kafka/config/server.properties文件中advertised.listeners改为如下属性。192.168.75.137是我虚拟机的IP。改完后重启,OK了。Java端的代码终于能通信了
advertised.listeners=PLAINTEXT://192.168.75.137:9092
advertised.listeners上的注释是这样的:
#Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
意思就是说:hostname、port都会广播给producer、consumer。如果你没有配置了这个属性的话,则使用listeners的值,如果listeners的值也没有配置的话,则使用
java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了,也就是你的broker的hostname)。
这么说也没毛病,但是我本身也做了hostname的映射,即使返回hostname,我这边也能解析的,不用这样配置也ok啊,问题肯定不是这里。然后我继续排查。
(3) 通过getCanonicalHostName()方法排查
找到的测试语句如下:
import java.net.*;
public class TestDemo {
public static void outHostName(InetAddress address, String s)
{
System.out.println("通过" + s + "创建InetAddress对象");
System.out.println("主 机 名:" + address.getCanonicalHostName());
System.out.println("主机别名:" + address.getHostName());
System.out.println("");
}
public static void main(String[] args) throws Exception
{
// outHostName(InetAddress.getLocalHost(), "getLocalHost方法");
outHostName(InetAddress.getByName("***.***.***.***"),"***.***.***.***");
// outHostName(InetAddress.getByName("www.baidu.com"), "www.baidu.com");
}
}
***.***.***.***
处为我的kafka broker的地址,结果输出结果为映射的其他的hostname,果然是配的时候大意了,在客户端查找对应的ip的映射,果然有问题,把对应的ip和hostname进行了更正,再次运行java代码生产消息后,kafka端成功消费,问题解决。
关于Java客户端连接虚拟机中的Kafka时,无法发送、接收消息的问题
原文:https://www.cnblogs.com/zhqin/p/13871362.html