python - 如何使用pykafka consumer进行数据处理并保存?
问题描述
使用本地kafka bin/kafka-console-producer.sh --broker-list kafkaIP:port --topic topicName创建命令行生产数据,然后打开python
from pykafka import KafkaClientclient = KafkaClient(hosts='192.168.x.x:9092')topic = client.topics[’wr_test’]consumer = topic.get_balanced_consumer(consumer_group=’test-consumer-group’,auto_commit_enable=True,zookeeper_connect=’192.168.x.x:2121’)
然后自己编写了简单的一套处理函数,从外部引用。将数据处理后存入elasticsearch 或者 数据库比如for msg in consumer:
if msg is not None: 外部引入的处理函数(msg.value)
在python命令行for msg in consumer:
print msg.offset, msg.value
这时候使用生产者敲入一些数据,在消费端就会就会立即打印出来但是写成py文件之后,每次运行只会处理最近的生产的一次内容,在生产者中再进行输入一些内容,py文件就不会再进行数据处理了。所以向问下如何编写能运行后能一直对消费者数据进行处理的函数?要注意哪些地方?
另外,get_balanced_consumer的方法,是连接zookeeper消费使用topic.get_simple_consumer是直接消费kafka,使用这种方式就提示No handler for...的错误
还有一个疑问,就是实际生产环境日志产生量很快,应该如何编写一个多线程处理方法?
问题解答
回答1:在别人的博客看到一种替代的解决方案http://www.cnblogs.com/castle...从consumer中将msg.value读取到一个列表当中,然后从列表中读取数据进行数据处理,当这个流程结束后,再把列表中获取的数据pop掉。另外也要用try: ... except :... continue
相关文章:
1. python - 求一个在def中可以实现调用本def满足特定条件continue效果的方法(标题说不太清楚,请见题内描述)2. MYSQL新建用户设置可以远程访问的问题3. $fields = $values = [];这条代码一直定义不了,一直报错,老师的源码也是被报错的,执行不了,请问该怎么解决这个问题4. java - mybatis怎么实现在数据库中有就修改,没有就添加5. node.js - nodejs和前端JavaScript 字符串处理结果不一样是什么原因?6. linux - 为什么我在mysql的my.cnf下找不到bind-address?7. 数据库设计 - MySQL数据库主键问题8. mysql同步数据到elasticsearch用什么工具?9. mysql 5.7单表300万数据,性能严重下降,如何破?10. 数据库 - mysql 远程可以连接,但是本地连接拒绝?
![$fields = $values = [];这条代码一直定义不了,一直报错,老师的源码也是被报错的,执行不了,请问该怎么解决这个问题](http://www.haobala.com/attached/image/news/202205/093622cb60.png)