Java操作Kafka执行不成功
问题描述
使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!
环境及依赖<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version></dependency>
JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。
测试代码TestBase.java
public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = '192.168.60.160:9092' ; protected String topic = 'zlikun_topic';}
ProducerTest.java
public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() {props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ProducerConfig.ACKS_CONFIG, 'all');props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException {KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) {System.out.printf('offset = %d ,partition = %d n', recordMetadata.offset() ,recordMetadata.partition()); } else {log.error('send error !' ,e); }} });}TimeUnit.SECONDS.sleep(3);producer.close(); }}
ConsumerTest.java
public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() {props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ConsumerConfig.GROUP_ID_CONFIG ,'zlikun') ;props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 'true');props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '1000');props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() {Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));//consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value()); }} }}问题
# 测试topic为手动创建$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic
控制台输出信息
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
问题解答
回答1:测试了下, 正常 https://github.com/MOBX/kafka...
建议检查下kafka集群连接是否正常,你报的是TimeoutException;如果不行, kafka-clients降到0.8.2.0试试
回答2:我把日志调成DEBUG级别,观察日志发现是不能正确解析主机名造成的。
2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:java.io.IOException: Can’t resolve address: m160:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:182) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:107) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649) at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ... 36 more
网上找到一篇博文http://blog.sina.com.cn/s/blo...也支持了这一点,同样我是在hosts文件中配置了主机名,测试就正常了。不过感觉这样做似乎不太合理,实际应用中这样用,太影响运维了吧,不知道有没有其它更好的解决办法。
[2017/04/11 16:16]刚从网上找到一篇文章http://www.tuicool.com/articl...,解决了这个问题!
相关文章:
1. node.js - 关于你不知道的JavaScript上一书介绍的行为委托2. android clickablespan获取选中内容3. javascript - 最终生成的jsBundle文件压缩问题4. docker安装后出现Cannot connect to the Docker daemon.5. 关于docker下的nginx压力测试6. python - TypeError: tryMsgcode() takes exactly 2 arguments (0 given)7. 关于Java引用传递的一个困惑?8. angular.js - angularjs的自定义过滤器如何给文字加颜色?9. javascript - canvas 裁剪空白区域10. nignx - docker内nginx 80端口被占用
