kafka生产者

1.原理

2.普通异步发送

引入pom:

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies>
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试效果:

3.带回调的异步发送

回调的信息实际是从队列返回的

 

4.同步发送 

只需在异步发送的基础上,再调用一下 get()方法即可。

5.分区

6.分区策略 

指定key的值:对key的hashcode做分配

希望将订单表的数据全部发到kafka的一个分区上,怎么处理?

将该表的名称作为key值然后发送即可 
如:

7.自定义分区 (脏数据的处理)

如果研发人员可以根据企业需求,自己重新实现分区器。 1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区, 不包含 atguigu,就发往 1 号分区。 

自定义分区器:

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String s = value.toString();int partion;if(s.contains("atguigu")) {partion = 1;}else {partion=0;}return partion;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

 拷贝全类名,产生关联

测试结论:

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696644.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

“errcode“:40163,“errmsg“:“code been used

{"errcode":40163,"errmsg":"code been used, rid: 65d6fa01-6ae8fecc-3a2f4bf8"} 通过微信静默授权方式&#xff0c;获得当前微信用户 openid 时&#xff0c;重复使用 code 造成的。 不是腾讯的问题&#xff0c;自己的代码逻辑没有遵循腾讯请…

2024022202-查询优化

查询优化 概述 关系系统和关系模型是两个密切相关而有不同的概念。支持关系模型的数据库管理系统称为关系系统。但是关系模型中并非每一部分都是同等重要的&#xff0c;所以我们不苛求完全支持关系模型的系统才能称为关系系统。因此&#xff0c;我们给出一个关系系统的最小要求…

excel数据处理——一列数据转换为n列多行

按行抽取 如果只希望保留第一行的标题&#xff0c;然后将其他奇数行删除&#xff0c;可以选择一个空白列&#xff0c;为不同的行赋值&#xff0c;函数为“mod(row(),2)”&#xff1b; 这个是0,1 数列&#xff0c;如果是0,1&#xff0c;2就是“mod(row(),3)”。 行列转换 复制…

【学习总结】慢SQL治理经验总结

一、慢SQL定义 执行超过1s的SQL为慢SQL 三、慢SQl的风险 系统的响应时间延迟&#xff0c;影响用户体验 资源占用增加&#xff0c;增高了系统的负载&#xff0c;其他请求响应时间也可能会收到影响。 慢SQL占用数据库连接的时间长,如果有大量慢SQL查询同时执行&#xff0c;可能…

Waline评论服务端转移至Deta

旧文首发地址 问题 前阵子评论系统又挂了&#xff0c;原因是*.vercel.app域名被污染。 解决方法 法一&#xff1a;服务端换个域名 法二&#xff1a;换个服务端部署 我选法二。 步骤 DETA官网&#xff1a;https://www.deta.sh/ Deta is free for ever. 这句话很不错有木有…

C语言中的assert.h:调试助手与断言详解

在C语言编程中&#xff0c;assert.h头文件提供了非常有用的断言&#xff08;Assertion&#xff09;功能&#xff0c;它主要用于开发和调试阶段&#xff0c;确保程序在运行时满足某些预期条件。如果这些条件未得到满足&#xff0c;则程序会立即停止执行&#xff0c;并打印出有关…

【MySQL】解决在join表时一对多的情况下重复数据的问题

在MySQL中进行JOIN操作&#xff0c;特别是在处理一对多关系的表时&#xff0c;可能会出现重复的记录&#xff0c;这是因为左表&#xff08;或右表&#xff09;中的每一项在与右表&#xff08;或左表&#xff09;连接时&#xff0c;如果对应有多条匹配记录&#xff0c;则会生成多…

冷链物流追踪:Java与MySQL的协同实践

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

第三百六十一回

文章目录 1. 概念介绍2. 实现方法2.1 环绕效果2.2 立体效果 3. 示例代码4. 内容总结 我们在上一章回中介绍了"自定义SlideImageSwitch组件"相关的内容&#xff0c;本章回中将介绍两种阴影效果.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在本…

Gson 库的使用

Gson 是由 Google 开发的一个流行的 Java 库,用于处理 JSON 数据的序列化和反序列化。它提供了简单易用的 API,使得在 Java 应用程序中操作 JSON 数据变得非常方便。 以下是 Gson 库的一些主要特点和用法 简单易用 Gson 提供了一个简单而直观的 API,使得在 Java 应用程序中…

谷歌seo推广怎么做?

除了常规的优化之外&#xff0c;还可以针对特定垂直搜索进行优化&#xff0c;比如图片的以及视频的搜索优化&#xff0c;这对于贩卖自己产品的网站来说也是挺重要的一点 图片需要确保您的图片文件名包含相关关键词&#xff0c;并为每张图片添加描述性的ALT文本&#xff0c;以帮…

经济学-信用货币初始发行与派生

由于黄金美元的种种缺陷&#xff0c;经济学家找到了一种替代黄金的方案&#xff0c;这种替代品就是债务&#xff0c;它可以解决黄金有限的问题&#xff0c;并且债务这种抵押品耗费的人力物力远远低于其他抵押品&#xff08;例如黄金还得需要开采&#xff09; 假设一个国家刚刚…

调用 Python 函数遗漏括号 ( )

调用 Python 函数遗漏括号 1. Example - error2. Example - correctionReferences 1. Example - error name "Forever Strong" print(name.upper()) print(name.lower)FOREVER STRONG <built-in method lower of str object at 0x0000000002310670>---------…

Swift基础知识:22.Swift构造过程

在 Swift 中&#xff0c;构造过程是实例化一个类、结构体或枚举实例的过程&#xff0c;它包括设置实例的初始状态和执行其他必要的设置。构造过程通过定义构造器&#xff08;initializer&#xff09;来实现&#xff0c;构造器是一种特殊的方法&#xff0c;用于创建和初始化实例…

SqlServer2016离线安装--Microsoft R Open 和 Microsoft R Server安装文件位置

问题 SQL SERVE 2016离线安装&#xff0c;会出现“Microsoft R Open 和 Microsoft R Server 脱机安装”的界面&#xff0c; 无法点击下一步的情况&#xff0c;如下图&#xff1a; 原因 离线安装时需要下载两个文件 解决方案 1、访问路径下载文件 https://go.microsoft.c…

Python 实现 OBV 指标计算:股票技术分析的利器系列(7)

Python 实现 OBV 指标计算&#xff1a;股票技术分析的利器系列&#xff08;7&#xff09; 介绍算法解释 代码rolling函数介绍核心代码计算 VA 列计算 OBV 列计算 MAOBV 完整代码 介绍 OBV 指标是“On-Balance Volume”的缩写&#xff0c;意为“量价平衡指标”。它是一种用于衡…

《游戏引擎架构》 -- 学习4

资源及文件系统 文件系统 游戏引擎的文件系统API通常提供以下功能&#xff1a; 搜需路径&#xff1a;是含一串路径的字符串&#xff0c;各路径之间以特殊字符&#xff08;如冒号或分号&#xff09;分隔&#xff0c;找文件时就会从这些路径进行搜寻。例如在命令行下执行程序&a…

Code Composer Studio (CCS) - 全局搜索功能

Code Composer Studio [CCS] - 全局搜索功能 1. Ctrl H&#xff0c;全局搜索功能References 1. Ctrl H&#xff0c;全局搜索功能 References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

VS和QT联合开发

提示:本文为学习记录,若有疑问,请联系作者,谦虚受教。 文章目录 前言一、安装二、新建项目1.VS打不开UI文件2.VS找不到QT对应的版本号三、其他问题1.vs无法识别加载ui新添加的控件2.UI界面出现中文乱码3.修改VS字体颜色4.自动代码补全功能5.添加<QtSerialPort/qserialpo…

【AI大模型】ChatGPT在地学、GIS、气象、农业、生态、环境等领域中的高级应用

以ChatGPT、LLaMA、Gemini、DALLE、Midjourney、Stable Diffusion、星火大模型、文心一言、千问为代表AI大语言模型带来了新一波人工智能浪潮&#xff0c;可以面向科研选题、思维导图、数据清洗、统计分析、高级编程、代码调试、算法学习、论文检索、写作、翻译、润色、文献辅助…