python异步消费kafka_Kafka 通过python简单的生产消费实现

使用CentOS6.5、python3.6、kafkaScala 2.10 - kafka_2.10-0.8.2.2.tgz (asc, md5)

一、下载kafka

下载地址

https://kafka.apache.org/downloads

里面包含zookeeper

1530782-20191022172310374-616895087.png

二、安装Kafka

1、安装zookeeper

mkdir /root/kafka/

tar -vzxf kafka_2.10-0.8.2.2

1530782-20191022172430910-1464690358.png

cd /root/kafka/kafka_2.10-0.8.2.2

cat config/zookeeper.properties | grep -v '#' >> config/zk.properties

mkdir -p /home/kafka/zk

vi zk.properties

dataDir=/home/kafka/zk #因为zookeeper变更为zk,所以需要在这里修改一下

启动zookeeper(后台启动)

/root/kafka/kafka_2.10-0.8.2.2/bin/zookeeper-server-start.sh /root/kafka/kafka_2.10-0.8.2.2/config/zk.properties &

2、安装Kafka

cd /root/kafka/kafka_2.10-0.8.2.2

cat config/server.properties | grep -v '#' >> config/kafka_01.properties

启动Kafka(后台启动)

/root/kafka/kafka_2.10-0.8.2.2/bin/kafka-server-start.sh /root/kafka/kafka_2.10-0.8.2.2/config/kafka_01.properties &

三、新建Kafka topic

1、新建topic

cd /root/kafka/kafka_2.10-0.8.2.2

./bin/kafka-topics.sh --create --zookeeper 192.168.50.33:2181 --replication-factor 1 --partitions 1 --topic test

2、查看topic

./bin/kafka-topics.sh --list --zookeeper 192.168.50.33:2181

四、kafka生产者脚本

1、安装python的Kafka模块

pip3 install kafka-python(之前已安装)

1530782-20191022173731120-1338541834.png

2、kafka生产者脚本

cat kafka_pro.py

from kafka import KafkaProducer

from kafka import KafkaConsumer

from kafka.errors import KafkaError

import json

import time

class Kafka_producer():

'''

使用kafka的生产模块

'''

def __init__(self, kafkahost, kafkaport, kafkatopic):

self.kafkaHost = kafkahost

self.kafkaPort = kafkaport

self.kafkatopic = kafkatopic

self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(

kafka_host=self.kafkaHost,

kafka_port=self.kafkaPort

))

def sendjsondata(self, params):

try:

parmas_message = json.dumps(params)

producer = self.producer

producer.send(self.kafkatopic, parmas_message.encode('utf-8'))

producer.flush()

except KafkaError as e:

print(e)

def main():

'''

测试consumer和producer

:return:

'''

# 测试生产模块

producer = Kafka_producer("127.0.0.1",9092,"test")

for i in range(1000000000000):

params = 'test---' + str(i)

print(params)

producer.sendjsondata(params)

time.sleep(1)

if __name__ == '__main__':

main()

import os

print(os.uname)

1530782-20191022173522191-117471226.png

五、kafka消费者脚本

cat kafka_cust.py

from kafka import KafkaProducer

from kafka import KafkaConsumer

from kafka.errors import KafkaError

import json

import time

class Kafka_consumer():

'''

使用Kafka—python的消费模块

'''

def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):

self.kafkaHost = kafkahost

self.kafkaPort = kafkaport

self.kafkatopic = kafkatopic

self.groupid = groupid

self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,

bootstrap_servers='{kafka_host}:{kafka_port}'.format(

kafka_host=self.kafkaHost,

kafka_port=self.kafkaPort))

def consume_data(self):

try:

for message in self.consumer:

# print json.loads(message.value)

yield message

except KeyboardInterrupt as e:

print(e)

def main():

'''

测试consumer和producer

:return:

'''

# 测试消费模块

# 消费模块的返回格式为ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,

# \timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,

# \serialized_key_size=-1, serialized_value_size=21)

consumer = Kafka_consumer('127.0.0.1',

9092,

"test",

'test-python-test')

message = consumer.consume_data()

for i in message:

print(i.value)

if __name__ == '__main__':

main()

1530782-20191022173459008-326944044.png

整理自:

https://www.cnblogs.com/hunttown/p/9041036.html

https://gitee.com/jalright/scriptstodo/blob/master/kafka/producer.py

https://gitee.com/jalright/scriptstodo/blob/master/kafka/cunsumer.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/488564.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【技术趋势】2020 五大技术趋势:无人驾驶发展、机器视觉崛起、区块链实用化、人类增强技术、超自动化...

图源:https://unsplash.com/来源:AI开发者原标题:Here Is A Rundown of 5 Major Tech Trends Hitting 2020作者:| Richard Liu链接:https://medium.com/swlh/here-is-a-rundown-of-5-major-tech-trends-hitting-2020-6…

正则不能输入特殊字符_正则表达式语法学习和在线练习

标题: 正则表达式语法学习和在线练习作者: 梦幻之心星 sky-seekerqq.com标签: [#正则表达式,#语法,#学习,#练习]目录: [语法]日期: 2021-01-26背景说明正则表达式使搜索和替换操作更加灵活高效。许多程序设计语言都支持使用正则表达式进行字符串操作。正则表达式是由普通字符…

非接触物体尺寸形态测量_检修人必备的测量常识

小编今天给大家分享一下测量的常识,咱们搞机械的一定要懂点测量知识!一、测量器具的分类测量器具是一种具有固定形态、用以复现或提供一个或多个已知量值的器具。按用途的不同量具可分为以下几类:1. 单值量具只能体现一个单一量值的量具。可用…

生物战教训、生物安全问题以及未来监控军民两用生物技术扩散的手段

来源:美国空军国家安全研究学会研究报告2005年9月【知远导读】本篇推送编辑节选自美国空军国家安全研究学会2005年9月发布的一份题为《生物战教训、生物安全问题以及未来监控军民两用生物技术扩散的手段》的研究报告。该报告直接来源于作者海伦普凯特(He…

如何避免字符串混淆加密_iOS-代码混淆加固方案

对于iOS来说,由于系统是封闭的,APP上架需要通过App Store,安全性来说相当高。但是对于大厂和知名APP而言,别人给的安全保障永远没有自己做的来得踏实。所以对于大厂、少部分企业级和金融支付类应用来说加固是相当重要的。下面是目…

DARPA计划在2021年开展多种新型武器概念研究

来源:中国指挥与控制学会“远射”项目DARPA计划于2021年投资2200万美元启动“远射”项目,演示一种空射武器系统。该系统由速度较慢的远程飞行器搭载至战区,在战区上空发射多枚空空导弹来实施作战。该武器可由现有战斗机外部挂载,或…

python算法实现源码_Python实现七个基本算法

1.顺序查找当数据存储在诸如列表的集合中时,我们说这些数据具有线性或顺序关系。 每个数据元素都存储在相对于其他数据元素的位置。 由于这些索引值是有序的,我们可以按顺序访问它们。 这个过程产实现的搜索即为顺序查找。顺序查找原理剖析:从…

python词云设计实例_python词云库wordcloud的使用方法与实例详解

wordcloud是优秀的词云展示第三方库一、基本使用 import jieba import wordcloud txt open("1.txt", "r", encodingutf-8).read() words jieba.lcut(txt) txt_1 " ".join(words) # print(txt1) w wordcloud.WordCloud(font_path"msyh.t…

细胞因子风暴与新冠肺炎

来源:陈辉科学网博客链接地址:http://blog.sciencenet.cn/blog-3426569-1219679.html 2020年2月15日下午,在国务院联发联控机制新闻发布会上,周琪院士介绍说“炎症因子风暴”[作者注释:即是细胞因子风暴(Cy…

mysql 启动 修改密码_基础的启动/停止/重启/密码修改MySQL

如何启动/停止/重启MySQL一、启动方式1、使用 service 启动:service mysqld start2、使用 mysqld脚本启动:/etc/inint.d/mysqld start3、使用 safe_mysqld启动:safe_mysqld&二、停止1、使用 service 启动:service mysqldstop2…

预编译对象解决SQL注入问题

转载于:https://www.cnblogs.com/suanshun/p/6739454.html

eclipse中添加jar包后运行时提示noclassdeffounderror_一看你就懂,超详细 java 中的 ClassLoader 详解,耐心看~...

备注:本文篇幅比较长,但内容简单,大家不要恐慌,安静地耐心翻阅就是Class文件的认识我们都知道在Java中程序是运行在虚拟机中,我们平常用文本编辑器或者是IDE编写的程序都是.java格式的文件,这是最基础的源码…

寻找人机之间的中间地带-评述3本人机协作的书

来源: 混沌巡洋舰1 AI 错觉知其然,更要知其所以然,了解数据挖掘的算法的基础原理,可以在这个人工智能和大数据可能比工业革命更能改变人的一生的历史时期中,更有智慧的应用人工智能。AI错觉这本书18年在美国出版&#…

TypeError: HashUpdate fail

关于crypto的md5加密报错: 代码: var crypto require(crypto); var md5 crypto.createHash(md5); //crypto模块功能是加密并生成各种散列 var oldpass md5.update(oldpass).digest(hex); var newpass md5.update(newpass).digest(hex);如果md5.updat…

mysql 与gemfire的同步_(转)分布式缓存GemFire架构介绍

1什么是GemFireGemFire是一个位于应用集群和后端数据源之间的高性能、分布式的操作数据(operational data)管理基础架构。它提供了低延迟、高吞吐量的数据共享和事件分发。GemFire充分利用网络中的内存和磁盘资源,形成一个实时的数据网格(data fabric or grid)。Gem…

混合云关键技术能力和发展趋势

来源:IDC圈为满足成本、按需、隐私、合规、避免供应商锁定等目的,企业常常会采用多个公有云或私有云,这会造成基础设施资源池多样化,还要面临同时管理物理机、虚拟化等异构资源环境。多云管理由于面临同时管理物理机、虚拟化等异构…

文字竖着写怎么设置_微信置顶文字怎么设置 微信置顶文字的方法介绍|微信|置顶软硬件资讯川北在线...

阅读本文前,请您先点击上面的“蓝色字体”,再点击“关注”,这样您就可以继续免费收到文章了。每天都会有分享,都是免费订阅,请您放心关注。注:本文转载自网络,不代表本平台立场,仅供…

DOM查找元素的方法总结

按HTML查找:优点:范围可大可小,可设置条件;包括五种方式:1.按id查找;2.按标签名查找:var elems parent.getElementsByTagName();3.按name属性查找:要回传给服务器的元素需要name属性…

2019 NLP大全:论文、博客、教程、工程进展全梳理(长文预警)

来源:机器学习研究会订阅号在整个2019年,NLP领域都沉淀了哪些东西?有没有什么是你错过的?如果觉得自己梳理太费时,不妨看一下本文作者整理的结果。2019 年对自然语言处理(NLP)来说是令人印象深刻…

collection集合 地址_java集合系列(5)LinkedList

这篇文章开始介绍LinkList。他和ArrayList有一些相似,在上一篇文章讲解 ArrayList时,我们知道ArrayList是以数组实现,它的优势是查询性能高,劣势是按顺序增删性能差。如果在不确定元素数量的情况时,不建议使用ArrayLis…