python-kafka 常用 api 汇总

简介

     python连接kafka的标准库,kafka-python和pykafka。kafka-python使用的人多是比较成熟的库,kafka-python并没有zk的支持。pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。

安装

# PyPI安装
pip install kafka-python# conda安装
conda install -c conda-forge kafka-python# anaconda自带pip安装
/root/anaconda3/bin/pip install kafka-python

官方链接

  • 官网:https://kafka-python.readthedocs.io/en/master/index.html
  • git:https://github.com/dpkp/kafka-python

注意:1.4.0 以上的 kafka-python 版本使用了独立的心跳线程去上报心跳

生产者

API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html。

生产者代码是线程安全的,支持多线程,而消费者则不然。

类 KafkaProducer

class kafka.KafkaProducer(**configs)

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • key_serializer (可调用对象) –用于转换用户提供的key值为字节,必须返回字节数据。 如果为None,则等同调用f(key)。 默认值: None.
  • value_serializer(可调用对象) – 用于转换用户提供的value消息值为字节,必须返回字节数据。 如果为None,则等同调用f(value)。 默认值: None.

方法

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

函数返回FutureRecordMetadata类型的RecordMetadata数据

  • topic(str) – 设置消息将要发布到的主题,即消息所属主题
  • value(可选) – 消息内容,必须为字节数据,或者通过value_serializer序列化后的字节数据。如果为None,则key必填,消息等同于“删除”。( If value is None, key is required and message acts as a ‘delete’)
  • partition (int, 可选) – 指定分区。如果未设置,则使用配置的partitioner
  • key (可选) – 和消息对应的key,可用于决定消息发送到哪个分区。如果partition为None,则相同key的消息会被发布到相同分区(但是如果key为None,则随机选取分区)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必须为字节数据或者通过配置的key_serializer序列化后的字节数据.
  • headers (可选) – 设置消息header,header-value键值对表示的list。list项为元组:格式 (str_header,bytes_value)
  • timestamp_ms (int, 可选) –毫秒数 (从1970 1月1日 UTC算起) ,作为消息时间戳。默认为当前时间

flush(timeout=None)

发送所有可以立即获取的缓冲消息(即时linger_ms大于0),线程block直到这些记录发送完成。当一个线程等待flush调用完成而block时,其它线程可以继续发送消息。

注意:flush调用不保证记录发送成功

metrics(raw=False)

获取生产者性能指标。

#-*- encoding:utf-8 -*-
from kafka import KafkaProducer
import jsonproducer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(0, 100):producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)# Block直到单条消息发送完或者超时
future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')
result = future.get(timeout=60)
print(result)
# future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替,待验证# Block直到所有阻塞的消息发送到网络
# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms# 序列化json数据
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('MY_TOPIC1', {'shouke':'kafka'})# 序列化字符串key
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)
producer.send('MY_TOPIC1', b'shouke', key='strKey')# 压缩
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')
for i in range(2):producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))# 消息记录携带header
producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时
metrics = producer.metrics()
print(metrics)
producer.flush()

实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:

进入到配置目录(config),编辑server.properties文件,查找并设置listener,配置监听端口,格式:listeners = listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

消费者

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

消费者代码不是线程安全的,最好不要用多线程

类KafkaConsumer

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’
  • group_id (str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None
  • auto_offset_reset (str) – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。
  • enable_auto_commit (bool) – 如果为True,将自动定时提交消费者offset。默认为True。
  • auto_commit_interval_ms (int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。
  • value_deserializer(可调用对象) - 携带原始消息value并返回反序列化后的value
  • consumer_timeout_ms – 毫秒数,若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
  • max_poll_interval_ms – 毫秒数,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该 consumer 处于 livelock 状态,进行 reblancing
  • session_timout_ms – 毫秒数,控制心跳超时时间。在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了
  • heartbeat_interval_ms – 毫秒数,控制心跳发送频率,频率越高越不容易被误判,但也会消耗更多资源。
  • max_pool_record(int),kafka 每次 pool 拉取消息的最大数量

subscribe(topics=(), pattern=None, listener=None)

订阅需要的主题

  • topics (list) – 需要订阅的主题列表
  • pattern (str) – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。

metrics(raw=False)

获取消费者性能指标。

#-*- encoding:utf-8 -*-
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
consumer = KafkaConsumer('MY_TOPIC1',bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest',   # 消费 kafka 中最近的数据,如果设置为 earliest 则消费最早的未被消费的数据enable_auto_commit=True,      # 自动提交消费者的 offsetauto_commit_interval_ms=3000, # 自动提交消费者 offset 的时间间隔group_id='MY_GROUP1',consumer_timeout_ms= 10000,   # 如果 10 秒内 kafka 中没有可供消费的数据,自动退出client_id='consumer-python3'
)for msg in consumer:print (msg)print('topic: ', msg.topic)print('partition: ', msg.partition)print('key: ', msg.key, 'value: ', msg.value)print('offset:', msg.offset)print('headers:', msg.headers)# Get consumer metrics
metrics = consumer.metrics()
print(metrics)# 通过assign、subscribe两者之一为消费者设置消费的主题
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest',enable_auto_commit=True,    # 自动提交消费数据的 offsetconsumer_timeout_ms= 10000, # 如果 10 秒内 kafka 中没有可供消费的数据,自动退出value_deserializer=lambda m: json.loads(m.decode('ascii')), #消费json 格式的消息client_id='consumer-python3'
)# consumer.assign([TopicPartition('MY_TOPIC1', 0)])
# msg = next(consumer)
# print(msg)
consumer.subscribe('MY_TOPIC1')
for msg in consumer:print (msg)

客户端

  • 参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
    • 用于异步请求/响应网络I / O的网络客户端。
    • 这是一个内部类,用于实现面向用户的生产者和消费者客户端。
    • 此类不是线程安全的!
  • 参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html 
    • 管理Kafka集群

类 KafkaClient

class kafka.client.KafkaClient(**configs)

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’
  • request_timeout_ms (int) – 客户端请求超时时间,单位毫秒。默认值: 30000.

方法

brokers()

获取所有broker元数据

available_partitions_for_topic(topic)

返回主题的所有分区

#-*- encoding:utf-8 -*-
from kafka.client import KafkaClientclient = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)# 获取所有broker
brokers = client.cluster.brokers()
for broker in brokers:print('broker: ', broker)  # broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)print('broker nodeId: ', broker.nodeId)  # broker nodeId:  0# 获取主题的所有分区
topic = 'MY_TOPIC1'
partitions = client.cluster.available_partitions_for_topic(topic)
print(partitions)  # {0}partition_dict = {}
partition_dict[topic] = [partition for partition in partitions]
print(partition_dict)  # {'MY_TOPIC1': [0]}

 

类 KafkaAdminClient

class kafka.client.KafkaAdminClient(**configs)

  • bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)
  • client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’
  • request_timeout_ms (int) – 客户端请求超时时间,单位毫秒。默认值: 30000.

方法

list_topics()

获取所有的 topic

create_partitions(topic_partitions,timeout_ms = None,validate_only = False )

为现有主题创建其他分区。返回值:合适版本的CreatePartitionsResponse类。

  • topic_partitions –主题名称字符串到NewPartition对象的映射。
  • timeout_ms –代理返回之前等待创建新分区的毫秒数。
  • validate_only –如果为True,则实际上不创建新分区。默认值:False

create_topics(new_topics,timeout_ms = None,validate_only = False )

在集群中创建新主题。返回值:合适版本的CreateTopicResponse类。

  • new_topics – NewTopic对象的列表。
  • timeout_ms –代理返回之前等待创建新主题的毫秒。
  • validate_only –如果为True,则实际上不创建新主题。并非所有版本都支持。默认值:False

delete_topics(主题,timeout_ms =无)

从集群中删除主题。返回值:合适版本的DeleteTopicsResponse类。

  • 主题-主题名称的字符串列表。
  • timeout_ms –代理返回之前等待删除主题的毫秒数。

describe_consumer_groups(group_ids,group_coordinator_id = None,include_authorized_operations = False)

描述一组消费者group。返回值:组说明列表。目前,组描述是DescribeGroupsResponse的原始结果。

  • group_ids –消费者组ID的列表。这些通常是作为字符串的组名。
  • group_coordinator_id –组的协调器代理的node_id。如果设置为None,它将查询群集中的每个组以找到该组的协调器。如果您已经知道组协调器,则明确指定此选项对于避免额外的网络往返很有用。这仅在所有group_id具有相同的协调器时才有用,否则会出错。默认值:无。
  • include_authorized_operations –是否包括有关允许组执行的操作的信息。仅在API版本> = v3上受支持。默认值:False。

list_consumer_group_offsets(group_id,group_coordinator_id = None,partitions = None)

获取单个消费者组的消费者offset。注意:这不会验证group_id或分区在集群中是否实际存在。一旦遇到任何错误,就会立即报错。   返回字典:具有TopicPartition键和OffsetAndMetada值的字典。省略未指定且group_id没有记录偏移的分区。偏移值-1表示group_id对于该TopicPartition没有偏移。一个-1只能发生于显式指定的分区。 

  • group_id –要获取其偏移量的消费者组ID名称。
  • group_coordinator_id –组的协调代理的node_id。如果设置为None,将查询群集以查找组协调器。如果您已经知道组协调器,则明确指定此选项对于防止额外的网络往返很有用。默认值:无。
  • partitions –要获取其偏移量的TopicPartitions列表。在> = 0.10.2上,可以将其设置为“无”以获取使用者组的所有已知偏移量。默认值:无。

list_consumer_groups(broker_ids = None)

列出集群已知的所有消费者组。这将返回消费者组元组的列表。元组由使用者组名称和使用者组协议类型组成。仅返回将偏移量存储在Kafka中的消费者组。对于使用Kafka <0.9 API创建的群组,协议类型将为空字符串,因为尽管它们将偏移量存储在Kafka中,但它们并不使用Kafka进行群组协调。对于使用Kafka> = 0.9创建的群组,协议类型通常为“消费者”。

  • broker_ids –用于查询使用者组的代理节点ID的列表。如果设置为None,将查询集群中的所有代理。明确指定经纪人对于确定哪些消费者组由这些经纪人进行协调很有用。默认值:无
from kafka.admin import KafkaAdminClient, NewTopic
client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []# 创建自定义分区的topic 可以使用以下方法创建名称为test,12个分区3份副本的topic
topic_list.append(NewTopic(name="test", num_partitions=12, replication_factor=3))
client.create_topics(new_topics=topic_list, validate_only=False)# 获取所有的 topic
client.list_topics()# 删除 topic
client.delete_topics(['test', 'ssl_test'])  # 传入要删除的 topic 列表# list_consumer_groups()的返回值是一个元组(消费者组的名称,消费组协议类型)组成的列表。
client.list_consumer_groups()
#  [('xray', 'consumer'), ('awvs', 'consumer')]# 返回值是一个字典,字典的key是TopicPartition,值是OffsetAndMetada 
client.list_consumer_group_offsets('awvs')
#  {TopicPartition(topic='scan, partition=0): OffsetAndMetadata(offset=17, metadata='')

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/453910.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

scp选择二进制_二进制传输与文本传输区别

Ftp&#xff0c;winscp等工具下载文件时候有选项&#xff0c;可选的有二进制方式和文本方式。文本方式又称为ASCII方式两者区别如下。ASCII 方式和BINARY方式的区别是回车换行的处理&#xff0c;binary方式不对数据执行任何处理&#xff0c;ASCII 方式将回车换行转换为本机的回…

在ffmpeg中加入x264模块

引言&#xff1a;最近一直致力于多媒体应用开发&#xff0c;一说起编码解码就不得不说下FFmpeg。FFmpeg是一个集录制、转换、音/视频编码解码功能为一体的完整的开源解决方案。FFmpeg的开发是基于Linux操作系统&#xff0c;但是可以在大多数操作系统中编译和使用。下面就详细介…

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机&#xff08;Exchange&#xff09; RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列&#xff0c;一般的情况生产者甚至不知道消息应该发送到哪些队列。 相反的&#xff0c;生产者只能发送消息给交换机&#xff08;Exchange&#xff09;。交换机的…

OAuth 2.0(网转)

&#xff08;一&#xff09;背景知识 OAuth 2.0很可能是下一代的“用户验证和授权”标准&#xff0c;目前在国内还没有很靠谱的技术资料。为了弘扬“开放精神”&#xff0c;让业内的人更容易理解“开放平台”相关技术&#xff0c;进而长远地促进国内开放平台领域的发展&#xf…

kafka 自动提交 和 手动提交

Consumer 需要向 Kafka 汇报自己的位移数据&#xff0c;这个汇报过程被称为提交位移&#xff08;Committing Offsets&#xff09;。因为 Consumer 能够同时消费多个分区的数据&#xff0c;所以位移的提交实际上是在分区粒度上进行的&#xff0c;即 Consumer 需要为分配给它的每…

axios vue 回调函数_vue中ajax请求与axios包完美处理

这次给大家带来vue中ajax请求与axios包完美处理&#xff0c;vue中ajax请求与axios包处理的注意事项有哪些&#xff0c;下面就是实战案例&#xff0c;一起来看一下。在vue中&#xff0c;经常会用到数据请求&#xff0c;常用的有&#xff1a;vue-resourse、axios今天我说的是axio…

用int还是用Integer?

昨天例行code review时大家有讨论到int和Integer的比较和使用。 这里做个整理&#xff0c;发表一下个人的看法。【int和Integer的区别】int是java提供的8种原始类型之一&#xff0c;java为每个原始类型提供了封装类&#xff0c;Integer是int的封装类。int默认值是0&#xff0c;…

前端之 JavaScript 常用数据类型和操作

JavaScript 常用数据类型有&#xff1a;数字、字符串、布尔、Null、Undefined、对象 JavaScript 拥有动态类型 JavaScript 拥有动态类型。这意味着相同的变量可用作不同的类型 var x; // 此时x是undefined var x 1; // 此时x是数字 var x "Alex" …

mysql备份还原(视图、存储过程)

最近在备份还原mysql的时候发现&#xff0c;视图还原报错&#xff0c;无法创建视图&#xff0c;在网上查了下资料&#xff0c;找到以下信息&#xff1a;1、如果备份的数据库含有视图,还原时需要把my.ini中的character-set改为latin1,才能够还原视图。2、还原后,需要把latin1改为…

有关javabean的说法不正确的是_关于 JavaBean, 下列叙述中不正确的是 ( ) 。_学小易找答案...

【填空题】在使用 URL 传值时传输的数据只能是 类型。【简答题】陶器是人类最伟大的发明,比四大发明更有意义,你如何认为?(手机上直接回答提交)【单选题】对于 ( ) 作用范围的 Bean, 当客户离开这个页面时 JSP 引擎取消为客户的该页 面分配的 Bean, 释放他所占的内存空间。【填…

Postgres中tuple的组装与插入

1.相关的数据类型 我们先看相关的数据类型&#xff1a; HeapTupleData(src/include/access/htup.h) typedef struct HeapTupleData {uint32 t_len; /* length of *t_data */ItemPointerData t_self; /* SelfItemPointer */Oid t_tableOid; /* ta…

Python 自动生成环境依赖包 requirements

一、生成当前 python 环境 安装的所有依赖包 1、命令 # cd 到项目路径下&#xff0c;执行以下命令 pip freeze > requirements.txt# 或者使用如下命令 pip list --formatfreeze > requirements.txt 2、常见问题 1、中使用 pip freeze > requirements.txt 命令导出…

DenyHosts 加固centos系统安全

DenyHosts是Python语言写的一个程序&#xff0c;它会分析sshd的日志文件&#xff08;/var/log/secure&#xff09;&#xff0c;当发现重 复的攻击时就会记录IP到/etc/hosts.deny文件&#xff0c;从而达到自动屏IP的功能 DenyHosts官方网站 http://denyhosts.sourceforge.net 下…

在windows xp下编译出ffmpeg.exe

找了好多资料&#xff0c;把自己的编译成功过程详细叙述&#xff0c;以避免后来者可以少浪费点时间。 1.安装MSys 到http://sourceforge.net/project/showfiles.php?group_id2435下载文件&#xff1a;   bash-3.1-MSYS-1.0.11-tar.bz2   msysCORE-1.0.11-2007.01.19-1.ta…

手机uc怎么放大页面_手机网站怎样做可以提高用户体验度?——竹晨网络

目前&#xff0c;手机已经占据了人们大多数的闲暇时间&#xff0c;互联网的流量开始逐渐向移动端倾斜&#xff0c;重视移动端的用户体验&#xff0c;就可以给客户端增加很多意想不到的功能。但是还是有很多公司和站长不知道手机网站应该怎么建才能符合用户的使用习惯。下面&…

科技申报项目总结

这个项目分为三大模块&#xff0c;管理员&#xff0c;专家以及单位模块&#xff0c;具体页面有&#xff1a;1单位信息&#xff1b;2项目申报&#xff1b;3专家信息&#xff1b;4项目评审&#xff1b;5 项目信息&#xff1b;6申报设置&#xff1b;7专家信息。 —-项目框架SSM&am…

kafka 异常:ERROR Failed to clean up log for __consumer_offsets-30 in dir /tmp/kafka-logs due to IOExce

问题概述 kafka进程不定期挂掉。ERROR Failed to clean up log for __consumer_offsets-30 in dir /tmp/kafka-logs due to IOException (kafka.server.LogDirFailureChannel)&#xff0c;报错如下 [2020-12-07 16:12:36,803] ERROR Failed to clean up log for __consumer_o…

树形控件(CTreeCtrl和CTreeView)

如何插入数据项目&#xff1f;如何添加鼠标右击事件&#xff1f;插入数据项 通过InsertItem()方法&#xff0c;有四种重载样式: HTREEITEM InsertItem(LPTVINSERTSTRUCT lpInsertStruct); HTREEITEM InsertItem(UINT nMask, LPCTSTR lpszItem, int nImage,int nSelectedImage, …

ffmpeg编译(生成Windows或Win32平台dll, lib)

ffmpeg编译(生成Windows或Win32平台dll, lib) 介绍&#xff1a;本文简要介绍通过cygwin环境来编译生成ffmpeg。 包括解码组件libfaad与libopencore-amrnb的编译。 1)安装msys mingw环境 具体安装过程可以看网上教程 我用的是&#xff1a;http://code.google.com/p/msys-cn/ 假…

2019python课件_2019版经典Python学习路线分享

Python有三大神器&#xff0c;包括numpy,scipy,matplotlib,因此适合用于数据处理。spark&#xff0c;Hadoop都开了Python的接口&#xff0c;所以使用Python做Python的mapreduce也非常简单。因此它也备受欢迎&#xff0c;python学习大纲分享给大家。一、Python基础1.2数据的存储…