消息中间件 --- Kafka快速入门

消息中间件 --- Kafka 快速入门

消息中间件:https://blog.51cto.com/u_9291927/category33

GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka

  • Kafka快速入门(一)--- Kafka简介:https://blog.51cto.com/9291927/2493953
  • Kafka快速入门(二)--- Kafka架构:https://blog.51cto.com/9291927/2497814
  • Kafka快速入门(三)--- Kafka核心技术:https://blog.51cto.com/9291927/2497820
  • Kafka快速入门(四)--- Kafka高级功能:https://blog.51cto.com/9291927/2497828
  • Kafka快速入门(五)--- Kafka管理:https://blog.51cto.com/9291927/2497842
  • Kafka快速入门(六)--- Kafka集群部署:https://blog.51cto.com/9291927/2498428
  • Kafka快速入门(七)--- Kafka监控:https://blog.51cto.com/9291927/2498434
  • Kafka快速入门(八)--- Confluent Kafka简介:https://blog.51cto.com/9291927/2499090
  • Kafka快速入门(九)--- C客户端:https://blog.51cto.com/9291927/2502001
  • Kafka快速入门(十)--- C++客户端:https://blog.51cto.com/9291927/2502063
  • Kafka快速入门(十一)--- RdKafka源码分析:https://blog.51cto.com/9291927/2504489
  • Kafka快速入门(十二)--- Python客户端:https://blog.51cto.com/9291927/2504495

Python3 学习(五十四):confluent-kafka 模块的使用

From:https://blog.csdn.net/liao392781/article/details/90487438

coufluent-kafka 是 Python 模块,是对 librdkafka 的轻量级封装,librdkafka 又是基于 c/c++ 的kafka 库,性能上不必多说。使用上要优于 kafka-python。confluent-kafka-python 是 Confluent 用于 Apache Kafka( Apache Kafka ) 和 Confluent Platform( Data in Motion Platform for the Enterprise | Confluent ) 的 Python 客户端。

特征:

  • 高性能 : confluent-kafka-python 是 librdkafka( https://github.com/edenhill/librdkafka ) 的一个轻量级包装器,librdkafka是一个 经过精心调优的C客户端。
  • 可靠性 : 在编写Apache Kafka客户端时,有很多细节要做。我们将它们放在一个地方(librdkafka)并在我们所有客户中利用这项工作(也是汇合 - kafka-go ( https://github.com/confluentinc/confluent-kafka-go ) 和 confluent-kafka-dotnet ( GitHub - confluentinc/confluent-kafka-dotnet: Confluent's Apache Kafka .NET client ))

示例代码:

# -*- coding: utf-8 -*-
# @Author  : 
# @Date    :
# @File    : kafka_operate.py
# @description : XXXimport time
import datetime
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka import Producer, Consumer, KafkaErrordef delivery_report(err, msg):""" Called once for each message produced to indicate delivery result.Triggered by poll() or flush(). """if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))def kafka_producer():p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})while True:try:current_date = str(datetime.datetime.now().replace(microsecond=0))data = current_date# Trigger any available delivery report callbacks from previous produce() callsp.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.p.produce('my_topic', data.encode('utf-8'), callback=delivery_report)time.sleep(1)except BaseException as be:print(be)break# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.p.flush()def kafka_consumer():c = Consumer({'bootstrap.servers': 'mybroker','group.id': 'mygroup','auto.offset.reset': 'earliest'})c.subscribe(['my_topic'])while True:msg = c.poll(1.0)if msg is None:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))c.close()def kafka_avro_producer():value_schema_str = """{"namespace": "my.test","name": "value","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""key_schema_str = """{"namespace": "my.test","name": "key","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""value_schema = avro.loads(value_schema_str)key_schema = avro.loads(key_schema_str)value = {"name": "Value"}key = {"name": "Key"}avro_producer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2','schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)avro_producer.produce(topic='my_topic', value=value, key=key)avro_producer.flush()def kafka_avro_consumer():c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2','group.id': 'groupid','schema.registry.url': 'http://127.0.0.1:8081'})c.subscribe(['my_topic'])while True:try:msg = c.poll(10)except SerializerError as e:print("Message deserialization failed for {}: {}".format(msg, e))breakif msg is None:continueif msg.error():print("AvroConsumer error: {}".format(msg.error()))continueprint(msg.value())c.close()if __name__ == '__main__':pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/495142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

asp.net中jQuery $post用法

函数原型:$.post(url, params, callback) url是提交的地址,eg: "sample.ashx" params是参数,eg: { name:"xxx" , id:"001" } callback是回调函数,eg: function…

美研究人员公布“盲动”机器人技术细节

来源:新华网摘要:7月7日美国麻省理工学院近日发布公报称,该校研究人员最新公布了一种“盲动”机器人的技术细节。这种机器人不需要借助视觉系统,可在崎岖地形中穿行跳跃,有望在危险工作环境中得…

使IE6下PNG背景图片透明的七种方法

PNG图像格式介绍: PNG是20世纪90年代中期开始开发的图像文件存储格式,其目的是企图替代GIF和TIFF文件格式,同时增加一些GIF文件格式所不具备的特性。流式 网络图形格式(Portable Network Graphic Format,PNG)名称来源于非官方的“…

AutoJs 4.1.1 实战教程

Auto.js 中文文档:https://hyb1996.github.io/AutoJs-Docs/#/?id综述 pro 版本支持 Node.js AutoJs Pro 7.0.4-1 实战教程---史上最全快手、抖音极速版 :https://blog.csdn.net/zy0412326/article/details/107180887/:https://blog.csdn.n…

人工智能军备竞赛:一文尽览全球主要国家AI战略

来源:网络大数据摘要:人工智能的迅速发展将深刻改变人类社会和世界的面貌,为了抓住 AI 发展的战略机遇,越来越多的国家和组织已争相开始制定国家层面的发展规划。人工智能的迅速发展将深刻改变人类社会和世界的面貌,为…

EJB3与EJB2的差别

1、Annotation替代了配置文件   凡是EJB2中使用配置文件定义的;EJB3一般都可以使用 annotations定义(当然EJB3也支持配置文件定义);   凡是EJB2通过JNDI寻找的资源(调用容器中其他EJB、调用环境变量等Resource资源…

Android 读取、接收、发送 手机短信

&#xff1a;https://www.cnblogs.com/ycclmy/tag/android/ 1、Android 读取手机短信 From&#xff1a;https://www.cnblogs.com/ycclmy/p/3193075.html 获取 android 手机短信需要在 AndroidManifest.xml 加权限&#xff1a; <uses-permission android:name"android.…

flex和js进行参数传递

来着&#xff1a;http://www.cnblogs.com/Cnol/archive/2009/09/20/1570365.html 方法一&#xff1a;flex接收网页传值&#xff01;~ 1<?xml version"1.0" encoding"utf-8"?> 2<mx:Application xmlns:mx"http://www.adobe.com/2006/mxml&q…

师法自然,仿生技术是如何改变世界的?

来源&#xff1a;36Kr摘要&#xff1a;“向自然学习”&#xff0c;这并非是句空话。本文介绍了科学家如何借鉴大自然&#xff0c;在材料科学&#xff0c;信息技术等领域实现创新。希望能为您带来启发。当今世界最伟大的创新者&#xff0c;非大自然莫属。大自然经过45亿年的演变…

Auto.JS 开发

From&#xff1a;https://blog.csdn.net/a6892255/article/details/107302369 autojs 代码大全(实战演练)&#xff1a;https://blog.csdn.net/qq_30931547/article/details/106459765 &#xff1a;https://github.com/snailuncle/autojsCommonFunctions/blob/master/autojsCo…

【研究】大脑如何在“知道”与“无知”之间做出决定

来源&#xff1a;中国生物技术网摘要&#xff1a;我们时而会对“求知欲”如饥似渴&#xff0c;时而又会觉得“无知是福”而享受放空&#xff0c;那么问题来了&#xff0c;在特定的时间里&#xff0c;我们是如何在这两种心态之间进行选择的呢&#xff1f;英国伦敦大学学院(UCL)的…

js 逆向分析的神器 --- v_jstools

From&#xff1a;https://mp.weixin.qq.com/s/LisYhDKK_6ddF-19m1gvzg 1、下载和安装插件 这是一款浏览器插件&#xff0c;功能非常的nice 工具地址&#xff1a;https://github.com/cilame/v_jstools 浏览器打开上面的网站后&#xff0c;点击 code 按钮&#xff0c;选择 Down…

网站运营

一个站点要有对用户有用&#xff0c;比如亚马逊&#xff0c;卖书的网站多得是&#xff0c;但是亚马逊除了卖书&#xff0c;还提供了很多其他的对客户有用的东西。首先一点&#xff0c;要考虑对自己有用&#xff0c;如果对自己有用&#xff0c;那么必然会对很多其他人有用。 The…

《中国人工智能开源软件发展白皮书(2018)》(附下载及解读PPT)

来源&#xff1a;走向智能论坛摘要&#xff1a;近日&#xff0c;中国人工智能开源软件发展联盟召集中国电子技术标准化研究院等企事业单位&#xff0c;编撰并正式发布《中国人工智能开源软件发展白皮书&#xff08;2018&#xff09;》&#xff0c;白皮书研究梳理人工智能开源软…

把 charles,Fiddler 证书安装到安卓根目录,解决安卓微信 7.0 版本以后安装证书也无法抓包问题,需要 root

From&#xff1a;https://testerhome.com/topics/21956 OpenSSL &#xff1a;https://slproweb.com/products/Win32OpenSSL.html 谷歌在安卓7.0修改了安全策略&#xff0c;安卓系统 大于 7.0 时&#xff0c; 应用不在信任用户安装的证书文件。用户添加的 CA 证书不能再用于安全…

nfs配置小结

nfs服务器端配置文件&#xff1a;vim /etc/exports共享目录 允许访问主机(访问权限)/mnt/share 192.168.17.152(rw)/mnt/share *(rw)载入nfs配置信息&#xff1a;/etc/init.d/nfs reloadexportfs -a查看本地nfs共享信息&#xff1a;exportfs -v查看远程n…

科学家发现跨越生命的重要门槛或许没那么难

来源&#xff1a;中国科学报将团藻&#xff08;拥有数百个细胞的藻类&#xff09;与其相对简单的亲缘物种——单细胞衣藻&#xff08;左上&#xff09;和拥有4~16个细胞的盘藻&#xff08;右上&#xff09;作对比&#xff0c;揭示了向多细胞生命发展的步骤。数十亿年前&#xf…

windows 的 wsl 命令

​wsl 文档&#xff1a;https://docs.microsoft.com/zh-cn/windows/wsl/ From &#xff1a;https://blog.csdn.net/weixin_34101784/article/details/88729575 From &#xff1a;https://www.cnblogs.com/Flat-White/p/13501639.html 玩转 WLS&#xff1a;Windows 10 Ubuntu子系…

C++ Primer 第10章 习题10.24

//10.24.cpp //建立一个单词排除集 //用于识别以s借位、但这个结尾的s,又不能删除的单词 //使用这个排除集删除输入单词尾部的s&#xff0c;生成该单词的非复数版本 //如果输入的是排除集中的单词&#xff0c;则保持该单词不变 #include<iostream> #include<set> #…

人机工程学/人因工程学的定义

来源&#xff1a;人机与认知实验室摘要&#xff1a;人机工程学&#xff08;Human Machine Environment&#xff09;和人因工程学&#xff08;Human Factor Environment&#xff09;国际百科全书的标题表明&#xff0c;人机工程学和人因工程学可能是两个独立的学科领域。人机工程…