三、Kafka生产者

目录

    • 3.1 生产者消息发送流程
      • 3.1.1 发送原理
    • 3.2 异步发送 API
    • 3.3 同步发送数据
    • 3.4 生产者分区
      • 3.4.1 kafka分区的好处
      • 3.4.2 生产者发送消息的分区策略
      • 3.4.3 自定义分区器
    • 3.5 生产者如何提高吞吐量
    • 3.6 数据可靠性

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送 API

3.3 同步发送数据

3.4 生产者分区

3.4.1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者发送消息的分区策略

在这里插入图片描述

3.4.3 自定义分区器

1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2、定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取数据 atguigu  helloString msgValues = value.toString();int partition;if (msgValues.contains("atguigu")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3、使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// 0 配置Properties properties = new Properties();// 连接集群 bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");// 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 关联自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");// 1 创建kafka生产者对象// "" helloKafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
}

在这里插入图片描述



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
在这里插入图片描述

public class CustomProducerParameters {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");// 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// 批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 1 创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 2 发送数据for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));}// 3 关闭资源kafkaProducer.close();}
}

3.6 数据可靠性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/45713.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

30.Netty源码服务端启动主要流程

highlight: arduino-light 服务端启动主要流程 •创建 selector •创建 server socket channel •初始化 server socket channel •给 server socket channel 从 boss group 中选择一个 NioEventLoop •将 server socket channel 注册到选择的 NioEventLoop 的 selector •…

Ubuntu20.04安装Nvidia显卡驱动教程

1、禁用nouveau 1、创建文件&#xff0c;如果没有下载vim编辑器&#xff0c;将vim换成gedit即可 $ sudo vim /etc/modprobe.d/blacklist-nouveau.conf 2、在文件中插入以下内容&#xff0c;将nouveau加入黑名单&#xff0c;默认不开启 blacklist nouveau options nouveau m…

计算机技术与软件专业技术资格(水平)考试----系统架构设计师

【原文链接】计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试----系统架构设计师 考试简介 计算机软件资格考试是由国家人力资源和社会保障部、工业和信息化部领导下的国家级考试。计算机软件资格考试既是职业资格考试&#xff0c;又是职称资格考试。考试合格…

在线HmacSHA256加密工具--在线获取哈希值又称摘要

具体请前往&#xff1a; 在线计算HmacSha256工具

unity 之 Input.GetMouseButtonDown 的使用

文章目录 Input.GetMouseButtonDown Input.GetMouseButtonDown 当涉及到处理鼠标输入的时候&#xff0c;Input.GetMouseButtonDown 是一个常用的函数。它可以用来检测鼠标按键是否在特定帧被按下。下面我会详细介绍这个函数&#xff0c;并举两个例子说明如何使用它。 函数签名…

C语言——通讯录详解(文件版)

文件版通讯录 前言&#xff1a;一、保存通讯录二、读取通讯录2.1 通讯录初始化2.2 将文件的信息加载到通讯录 三、代码展示3.1通讯录的声明和定义(contct.h)3.2通讯录函数的实现&#xff08;contact.c&#xff09;3.2 通讯录的测试(test.c) 前言&#xff1a; 我们已经掌握了通…

Oracle19c-补丁升级报错合集(一)

前言: 本文主要介绍Oracle19c补丁升级遇到的问题&#xff0c;涉及安装补丁prepatch步骤&#xff0c;apply应用报错以及datapatch -verbose数据字典更新报错 问题一: 在执行补丁rootcrs.sh -prepatch操作时&#xff0c;发生执行检查命令cluutil -chkshare报错 CLSRSC-180: An …

记录首次面试2023-08-18

人生第一次面试&#xff0c;大概一个小时左右。没有问我C的&#xff0c;上来一个数据库事务&#xff0c;虽然没有复习&#xff0c;但是还是能够记住一些&#xff0c;主要问的一些事务的隔离级别&#xff0c;以及都有什么作用&#xff0c;我是举例回答的&#xff0c;客户端A和客…

14.pod控制器

文章目录 pod控制器概述有状态和无状态DeploymentDaemonSetSatefulSet配置 JobCronJob总结 pod控制器 概述 Pod控制器及其功用 Pod控制器&#xff0c;又称之为工作负载&#xff08;workload&#xff09;&#xff0c;是用于实现管理pod的中间层&#xff0c;确保pod资源符合预期…

设计模式笔记

工厂模式&#xff1a; 1.Simple Factory Pattern : 是指由一个工厂对象决定创建出哪一种产品类的实例&#xff0c;简单工厂是产品的工厂&#xff0c;工厂类负责创建的对象较少&#xff0c;客户端需要传入工厂类的参数&#xff0c;对于如何创建对象的逻辑不关心。 缺点&#xf…

使用 Node.js 生成优化的图像格式

使用 Node.js 生成优化的图像格式 图像是任何 Web 应用程序的重要组成部分&#xff0c;但如果优化不当&#xff0c;它们也可能成为性能问题的主要根源。在本文中&#xff0c;我们将介绍如何使用 Node.js 自动生成优化的图像格式&#xff0c;并以最适合用户浏览器的格式显示它们…

Eureka:服务注册-信息配置-自我保护机制

首先在提供者服务下&#xff0c;添加一个依赖 <!-- Eureka --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka</artifactId><version>1.4.6.RELEASE</version><…

科大讯飞星火模型申请与chatgpt 3.5模型以及new bing的对比

科大讯飞星火模型 申请科大讯飞星火认知大模型账号科大讯飞星火认知大模型使用1.界面介绍2. 在编程能力上与chatgpt 3.5对比科大讯飞星火模型chatgpt 3.5模型 3. 在图片生成能力上与new bing对比 总结 申请科大讯飞星火认知大模型账号 注册网址&#xff1a; 科大讯飞星火认知大…

回归预测 | MATLAB实现BO-SVM贝叶斯优化支持向量机多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现BO-SVM贝叶斯优化支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现BO-SVM贝叶斯优化支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍程序设计…

22-组件通信

一. 什么是组件通信 组件通信&#xff0c;就是指 组件与组件 之间的数据传递 1. 组件的数据是独立的&#xff0c;无法直接访问其他组件的数据。 2. 想用其他组件的数据 -> 组件通信 二. 不同的组件关系 和 组件通信方案分类 组件关系分类: 1. 父子关系 2. 非父子关系 组件…

基于web的旅游管理系统/旅游网站的设计与实现

摘要 随着计算机技术&#xff0c;网络技术的迅猛发展&#xff0c;Internet 的不断普及&#xff0c;网络在各个领域里发挥了越来越重要的作用。特别是随着近年人民生活水平不断提高&#xff0c;在线旅游给景区商家的业务带来了更大的发展机遇。 在经济快速发展的带动下&#xff…

ICLR2020 Query2Box:基于BOX嵌入的向量空间知识推理8.15+8.16+8.17+8.18

Query2Box&#xff1a;基于BOX嵌入的向量空间知识推理 摘要介绍相关工作Query2Box&#xff1a;向量空间中KG的逻辑推理知识图谱与合取查询基于box嵌入的实体集推理Box嵌入源节点的初始box集合投影运算符几何相交运算符实体到box的距离训练目标使用析取范式处理析取向DNF转换聚合…

构建 NodeJS 影院微服务并使用 docker 部署它(02/4)

一、说明 构建一个微服务的电影网站&#xff0c;需要Docker、NodeJS、MongoDB&#xff0c;这样的案例您见过吗&#xff1f;如果对此有兴趣&#xff0c;您就继续往下看吧。 图片取自网络 — 封面由我制作 这是✌️“构建 NodeJS 影院微服务”系列的第二篇文章。 二、对第一部分的…

商城-学习整理-高级-商城业务-异步线程池(十三)

目录 一、线程1、初始化线程的 4 种方式2、线程池的七大参数3、线程池的运行流程&#xff1a;4、例子5、常见的 4 种线程池6、开发中为什么使用线程池 二、CompletableFuture 异步编排0、业务场景&#xff1a;1、创建异步对象2、计算完成时回调方法3、handle 方法4、线程串行化…

CSS 字体修饰属性

前言 字体修饰属性 属性说明font-family指定文本显示字体font-size设置字体的大小font-weight设置字体的粗细程度font-style设置字体的倾斜样式text-decoration给文本添加装饰线text-indent设置文本的缩进text-align设置文本的对齐方式line-height设置行高color设置文本的颜色…