kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!

1533 [Thread-0] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:1,host:node3,port:6667] failed
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.9.2-0.8.2.2.jar:na]
[kafka_2.9.2-0.8.2.2.jar:na]
at kafka.utils.Utils$.swallowError(Utils.scala:45) [kafka_2.9.2-0.8.2.2.jar:na]

集群: bin/kafka-console-producer.sh --broker-list node2:6667 --topic test,可以正常产生数据,也能正常消费》
Java中代码:
public class DataProducerInsert {
// TODO Auto-generated method stub
private static Producer<Integer,String> producer;
Properties props=new Properties();
public DataProducerInsert(){
//定义连接的broker list
props.put("metadata.broker.list", "node1:6667,node3:6667,node2:6667");
//定义序列化类 Java中对象传输之前要序列化
props.put("serializer.class", "kafka.serializer.StringEncoder");
//props.put("advertised.host.name", "192.168.1.216");
producer = new Producer<Integer, String>(new ProducerConfig(props));
}
public static void main(String[] args) {
DataProducerInsert sp=new DataProducerInsert();
//定义topic
String topic="topic1";
//开始时间统计
long startTime = System.currentTimeMillis();
//定义要发送给topic的消息
String messageStr = "This is a message";
List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>();

//构建消息对象
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
datalist.add(data);

//结束时间统计
long endTime = System.currentTimeMillis();
KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, "用时" + (endTime-startTime)/1000.0);
datalist.add(data1);

//推送消息到broker
producer.send(data);
producer.close();
}

}
windows 平台的hosts文件:(外网地址)
106.14.47.114 node1
106.14.76.118 node2
106.14.39.127 node3
Linux服务器上的hosts文件:(内网地址)
10.27.147.94 node1
10.27.146.93 node2
10.27.148.109 node3,
kafka的地址是node1:6667,node2:6667,node3:6667,这是ambari平台下集成的kafka。

第1个回答  2018-04-12
首先你在链接时候检查是否代码里的IP 和端口是不是对的,端口是broker 端口,默认9092 ;
其次查看代码是生产者,看Kafka 集群里这个主题是否存在(如果不存在,默认是配置可以自动创建,看是非将该配置修改);然后检测防火墙,相应端口是否开放(防火墙直接关也可以);检测 server.properties 文件的 listeners 是否配置,若没有将其配置好
相似回答