Kafka权威指南(第2版)读书笔记

目录

  • Kafka生产者——向Kafka写入数据
    • 生产者概览
    • 创建Kafka生产者
      • bootstrap.servers
      • key.serializer
      • value.serializer
    • 发送消息到Kafka
      • 同步发送消息

Kafka生产者——向Kafka写入数据

不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。

生产者概览

一个应用程序会在很多情况下向Kafka写入消息:记录用户的活动(用于审计和分析)​、记录指标、记录日志、记录从智能家电收集到的信息、与其他应用程序进行异步通信、缓冲即将写入数据库的数据,等等。不同的应用场景直接影响如何使用和配置生产者API。尽管生产者API使用起来很简单,但消息的发送过程还是有点儿复杂。下图展示了向Kafka发送消息的主要步骤:
在这里插入图片描述
先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。

接下来,如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。

broker在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。

创建Kafka生产者

要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必须设置的属性。

bootstrap.servers

broker的地址。可以由多个host:port组成,生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址,因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。

key.serializer

一个类名,用来序列化消息的键。broker希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker,但它需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常见的几种Java对象类型,就没有必要实现自己的序列化器。需要注意的是,必须设置key.serializer这个属性,尽管你可能只需要将值发送给Kafka。如果只需要发送值,则可以将Void作为键的类型,然后将这个属性设置为VoidSerializer。

value.serializer

一个类名,用来序列化消息的值。与设置key.serializer属性一样,需要将value.serializer设置成可以序列化消息值对象的类。

发送消息到Kafka

同步发送消息

同步发送消息很简单,当Kafka返回错误或重试次数达到上限时,生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同,broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能。因此,同步发送方式通常不会被用在生产环境中​。

KafkaProducer一般会出现两种错误。一种是可重试错误,这种错误可以通过重发消息来解决。例如,对于连接错误,只要再次建立连接就可以解决。对于“not leader for partition”​(非分区首领)错误,只要重新为分区选举首领就可以解决,此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题,则应用程序会收到重试异常。另一种错误则无法通过重试解决,比如“Message size too large”​(消息太大)​。对于这种错误,KafkaProducer不会进行任何重试,而会立即抛出异常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/892769.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

类模板的使用方法

目录 类模板的使用方法 1.类模板语法 2.类模板和函数模板区别 3.类模板中成员函数创建时机 4.类函数对象做函数参数 5.类模板和继承 6.类模板成员函数类外实现 7.类模板分文件编写 person.hpp 实现cpp文件: 8.类模板与友元 9.类模板案例 MyArray.hpp …

Android SystemUI——使用Dagger2加载组件(四)

SystemUI 是 Android 系统中的一个重要模块,负责绘制系统栏(如状态栏、导航栏)、锁屏、快捷设置等用户界面元素。由于其复杂性,良好的架构设计和依赖管理对于保持代码的可维护性和扩展性至关重要。这就是 Dagger2 在此发挥重要作用的地方。 一、Dagger2介绍 Dagger2 是一个…

python识别图片中指定颜色的图案并保存为图片

示例代码: def chuli(color):import cv2import numpy as np# 定义颜色名称到HSV阈值范围的映射color_thresholds {red: ([0, 100, 100], [10, 255, 255], [160, 100, 100], [180, 255, 255]),yellow: ([20, 100, 100], [30, 255, 255]),blue: ([90, 100, 100], [1…

golang 环境变量配置

一、GoLand显示环境如下 修改环境变量 新建系统变量 GOROOT: D:\ENSPACE\golandsdk\1.23.1\go1.23.1新建系统变量 GOPATH:工作目录(在下面目录下新建目录:src,项目工程目录都要建在src下如:demo1 demo2) D…

当自动包布机遇上Profinet转ModbusTCP网关,“妙啊”,工业智能“前景无限

在自动化控制技术日新月异的当下,Profinet与ModbusTCP这两种协议在工业通信领域占据着举足轻重的地位。ModbusTCP是基于以太网的串行通信协议,而Profinet则是依托工业以太网的现场总线协议。它们在数据传输速度、实时性表现以及兼容性等方面各具特色。不…

Redis的过期策略、内存淘汰机制

Redis只能存5G数据,可是你写了10G,那会删5G的数据。怎么删的?还有,你的数据已经设置了过期时间,但是时间到了,为什么内存占用率还是比较高? 一、Redis的过期策略 Redis采用的是定期删除惰性删除策略。 1…

大数据技术实训:Zookeeper集群配置

一、本地模式安装部署 1)安装前准备 (1)安装jdk (2)拷贝Zookeeper安装包到Linux系统下 (3)解压到指定目录 tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/ 2)配置修改 &am…

一文通透OpenVLA及其源码剖析——基于Prismatic VLM(SigLIP、DinoV2、Llama 2)及离散化动作预测

前言 当对机器人动作策略的预测越来越成熟稳定之后(比如ACT、比如扩散策略diffusion policy),为了让机器人可以拥有更好的泛化能力,比较典型的途径之一便是基于预训练过的大语言模型中的广泛知识,然后加一个policy head(当然,一开…

MySQL数据库(SQL分类)

SQL分类 分类全称解释DDLData Definition Language数据定义语言,用来定义数据库对象(数据库,表,字段)DMLData Manipulation Language数据操作语言,用来对数据库表中的数据进行增删改DQLData Query Languag…

202309 青少年软件编程等级考试C/C++ 二级真题答案及解析(电子学会)

第 1 题 数组指定部分逆序重放 将一个数组中的前k项按逆序重新存放。例如,将数组8,6,5,4,1前3项逆序重放得到5,6,8,4,1。 时间限制:1000 内存限制:65536 输入 输入为两行: 第一行两个整数,以空格分隔,分别为数组元素的个数n(1<n<100)以及指定的k(1<=k<= n…

网管平台(进阶篇):路由器的管理实践

在当今数字化时代&#xff0c;路由器作为网络连接的核心设备&#xff0c;其管理对于确保网络的稳定、高效和安全至关重要。本文旨在深入探讨路由器管理的重要性、基本设置步骤、高级功能配置以及日常维护&#xff0c;帮助读者构建一个高效且安全的网络环境。 一、路由器管理的…

word-break控制的几种容器换行行为详解

word-break 属性在控制换行行为时需要根据语言判断&#xff0c;对于中文 一个字符就是一个单词&#xff0c;字符换行不影响阅读理解&#xff0c;而对于英文来说&#xff0c;多个连续的字符才会是一个单词&#xff0c;例如中文的 早 英文为 morning。 morning7个字符才算一个单词…

国内源快速在线安装qt5.15以上版本。(10min安装好)(图文教程)

参考文章&#xff1a;Qt6安装教程——国内源-CSDN博客 1、在国内源上下载qt在线安装工具 NJU Mirror 2、 将下载好的在线安装工具&#xff0c;放到C盘根目录&#xff0c; 2.1 打开windows Powershell&#xff08;WinX&#xff09;&#xff0c;下边那个最好。 输入两条指令&a…

环境搭建——Mysql、Redis、Rocket MQ部署

前言 在搭建分布式系统时&#xff0c;MySQL、Redis 和 RocketMQ 是常用的基础服务。每个服务各自的功能不同&#xff0c;但它们在数据存储、缓存、消息队列等方面不可或缺。如果你是初学者&#xff0c;别担心&#xff0c;本文会一步步详细教你如何在服务器上通过 Docker 部署这…

Kafka——两种集群搭建详解 k8s

1、简介 Kafka是一个能够支持高并发以及流式消息处理的消息中间件&#xff0c;并且Kafka天生就是支持集群的&#xff0c;今天就主要来介绍一下如何搭建Kafka集群。 Kafka目前支持使用Zookeeper模式搭建集群以及KRaft模式&#xff08;即无Zookeeper&#xff09;模式这两种模式搭…

【SQL】进阶知识 -- 删除表的几种方法(包含表内单个字段的删除方法)

大家好&#xff01;欢迎来到本篇SQL进阶博客。如果你已经掌握了基础的SQL操作&#xff0c;接下来就让我们一起探索删除表的几种方法。删除表可能听起来有点危险&#xff0c;事实也是如此&#xff0c;所以在我们实际开发过程中&#xff0c;大多数时候我们都有数据的使用权限&…

MYSQL5.7 全文检索中文无返回数据

在MySQL 5.7.6之前&#xff0c;全文索引只支持英文全文索引&#xff0c;不支持中文全文索引&#xff0c;需要利用分词器把中文段落预处理拆分成单词&#xff0c;然后存入数据库。 从MySQL 5.7.6开始&#xff0c;MySQL内置了ngram全文解析器&#xff0c;用来支持中文、日文、韩文…

JVM虚拟机的组成 笼统理解 六大部分 类加载子系统 运行时数据区 执行引擎 本地接口 垃圾回收器 线程工具

目录 JVM虚拟机的组成&#xff1a;概述 JVM虚拟机的组成&#xff1a;详细解析 1. 类加载子系统 2. 运行时数据区 3. 执行引擎 4. 本地接口 5. 垃圾回收器 6. 线程管理与调试工具 概述 JVM&#xff08;Java Virtual Machine&#xff09;是一个虚拟计算机&#xff0c;执行…

2025 年 UI 大屏设计新风向

在科技日新月异的 2025 年&#xff0c;UI 大屏设计领域正经历着深刻的变革。随着技术的不断进步和用户需求的日益多样化&#xff0c;新的设计风向逐渐显现。了解并掌握这些趋势&#xff0c;对于设计师打造出更具吸引力和实用性的 UI 大屏作品至关重要。 一、沉浸式体验设计 如…

虚拟拨号技术(GOIP|VOIP)【基于IP的语音传输转换给不法分子的境外来电披上一层外衣】: Voice over Internet Protocol

文章目录 引言I 虚拟拨号技术(GOIP|VOIP)原理特性:隐蔽性和欺骗性II “GOIP”设备原理主要功能III 基于IP的语音传输 “VOIP” (Voice over Internet Protocol)IV “断卡行动”“断卡行动”目的电信运营商为打击电诈的工作V 知识扩展虚拟号保护隐私虚拟运营商被用于拨打骚扰…