如何用Java向kafka发送json数据
问题描述
在网上都只看到一些Java生产STRING类型的消息。 按照Java的producer类来看,是可以自定义发送消息的类型,比如 producer.send(new KeyedMessage<String, HashMap<String , String>>(topic,message); 可是这样运行会报错,报错如下,请求高人解答:Exception in thread 'Thread-4' java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)at kafka.producer.Producer.send(Producer.scala:77)at kafka.javaapi.producer.Producer.send(Producer.scala:33)at com.Model.Producer.kafkaProducer.run(kafkaProducer.java:35)
问题解答
回答1:文档,序列化成String就行了
相关文章:
1. mysql - SQL操作时间的函数?2. python - 求一个在def中可以实现调用本def满足特定条件continue效果的方法(标题说不太清楚,请见题内描述)3. javascript - 用表单提交两个时间段请求后台返回对应数据时出现的一些问题!4. docker内创建jenkins访问另一个容器下的服务器问题5. MYSQL新建用户设置可以远程访问的问题6. angular.js - angularjs的自定义过滤器如何给文字加颜色?7. java - mybatis怎么实现在数据库中有就修改,没有就添加8. 正则表达式 - python pandas的sep参数问题9. javascript - ionic run android报错10. node.js - nodejs和前端JavaScript 字符串处理结果不一样是什么原因?
