使用kafka-clients操作数据(java)

一、添加依赖

     <!--    kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version></dependency>

二、生产者

自定义分区,可忽略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPatitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msgStr = value.toString();if(msgStr.contains("a")){return 1;}return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//异步发送数据for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));//回调异步发送kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());}}});kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");}}});Thread.sleep(500);}//同步for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();}//关闭资源kafkaProducer.close();}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//指定事务IDproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");properties.put("enable.idempotence", "true");//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//事务消息 初始化kafkaProducer.initTransactions();//开始事务kafkaProducer.beginTransaction();try {kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();//提交事务kafkaProducer.commitTransaction();} catch (Exception e) {//终止事务kafkaProducer.abortTransaction();} finally {//关闭资源kafkaProducer.close();}}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13804.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于光子实验的指数级加速的量子同态加密理论

前言 量子计算机不仅有望在某些重要任务上超越经典计算机&#xff0c;而且还能保护计算的隐私。例如&#xff0c;盲量子计算协议支持安全委托量子计算&#xff0c;其中客户端可以保护其数据和算法的隐私&#xff0c;不受分配来运行计算的量子服务器的影响。然而&#xff0c;这…

【雕爷学编程】MicroPython动手做(14)——掌控板之OLED屏幕2

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

Visual Studio2022报错 无法打开 源 文件 “openssl/conf.h“解决方式

目录 问题起因问题解决临时解决方案 问题起因 近一段时间有了解到Boost 1.82.0新添加了MySQL库&#xff0c;最近一直蠢蠢欲动想要试一下这个库 所以就下载了源码并进行了编译&#xff08;过程比较简单&#xff0c;有文档的&#xff09; 然后在VS2022中引入了Boost环境&#xf…

vue vite ts electron ipc arm64

初始化 npm init vue # 全选 yes npm i # 进入项目目录后使用 npm install electron electron-builder -D npm install commander -D # 额外组件增加文件 新建 plugins 文件夹 src/background.ts 属于主进程 ipcMain.on、ipcMain.handle 都用于主进程监听 ipc&#xff0c;…

nodejs + express 调用本地 python程序

假设已经安装好 nodejs ; cd /js/node_js ; 安装在当前目录的 node_modules/ npm install express --save 或者 cnpm install express --save web 服务器程序 server.js const http require(http); const express require(express); const path require(path); const …

软件外包开发的需求分析

需求分析是软件开发中的关键步骤&#xff0c;其目的是确定用户需要什么样的软件&#xff0c;以及软件应该完成哪些任务。需求分析是软件工程的早期工作&#xff0c;也是软件项目成功的基础&#xff0c;因此花费大量精力和时间去做好需求分析是值得的。今天和大家分享软件需求分…

【开发问题】flink-cdc不用数据库之间的,不同类型的转化

不同的数据库之期间数据类型转化 问题来源与原因解决过程&#xff0c;思路错误&#xff0c;导致各种错误错误思路是什么 正确解决方式&#xff0c;找官网对应的链接器&#xff0c;数据转化 问题来源与原因 我一开始是flink-cdc&#xff0c;oracle2Mysql&#xff0c;sql 我一开…

idea中设置maven本地仓库和自动下载依赖jar包

1.下载maven 地址&#xff1a;maven3.6.3 解压缩在D:\apache-maven-3.6.3-bin\apache-maven-3.6.3\目录下新建文件夹repository打开apache-maven-3.6.3-bin\apache-maven-3.6.3\conf文件中的settings.xml编辑&#xff1a;新增本地仓库路径 <localRepository>D:\apache-…

[元带你学: eMMC协议 29] eMMC 断电通知(PON) | 手机平板电脑断电通知

依JEDEC eMMC及经验辛苦整理,原创保护,禁止转载。 专栏 《元带你学:eMMC协议》 内容摘要 全文 2000 字, 主要内容 前言 断电通知是什么? 断电通知过程

【Kafka】常用操作

1、基本概念 1. 消息&#xff1a; Kafka是一个分布式流处理平台&#xff0c;它通过消息进行数据的传输和存储。消息是Kafka中的基本单元&#xff0c;可以包含任意类型的数据。 2. 生产者&#xff08;Producer&#xff09;&#xff1a; 生产者负责向Kafka主题发送消息。它将消息…

Vue3项目中没有配置 TypeScript 支持,使用 TypeScript 语法

1.安装 TypeScript&#xff1a;首先&#xff0c;需要在项目中安装 TypeScript。在终端中运行以下命令 npm install typescript --save-dev2.创建 TypeScript 文件&#xff1a;在 Vue 3 项目中&#xff0c;可以创建一个以 .ts 后缀的文件&#xff0c;例如 MyComponent.ts。在这…

103、Netty是什么?和Tomcat有什么区别?特点是什么?

Netty是什么&#xff1f;和Tomcat有什么区别&#xff1f;特点是什么&#xff1f; 一、Netty是什么二、Netty和Tomcat有什么区别三、Netty的特点 一、Netty是什么 Netty是一个基于NIO的异步网络通信框架&#xff0c;性能高&#xff0c;封装了原生NIO编码的复杂度&#xff0c;开…

Python web实战 | Docker+Nginx部署python Django Web项目详细步骤【干货】

概要 在这篇文章中&#xff0c;我将介绍如何使用 Docker 和 Nginx 部署 Django Web 项目。一步步讲解如何构建 Docker 镜像、如何编写 Docker Compose 文件和如何配置 Nginx。 1. Docker 构建 Django Web 项目 1.1 配置 Django 项目 在开始之前&#xff0c;我们需要有一个 D…

QT自定义控件实现并导入

QT自定义控件 介绍 QT Creator自定义控件和designer控件导入 1.安装QT5.7.1 2.将QT编译器目录、lib目录、include目录导入path 使用说明 使用说明按照 1.创建QtDesigner自定义控件工程&#xff0c;打开Qt Creator,创建一个Qt 设计师自定义控件&#xff0c;如下图所示&#xf…

CK_03靶机详解

CK_03靶机详解 靶场下载地址&#xff1a;https://download.vulnhub.com/ck/MyFileServer_3.zip 这个靶机开放的端口特别多&#xff0c;所以给我们的误导也很多&#xff0c;我直接按照正确的思路来。 因为开着445所以就枚举了一下靶机上共享的东西&#xff0c;发现两个share的…

c# 创建一个未定义类的临时对象列表

使用场景&#xff1a;要使用的数据太多&#xff0c;列表/字典无法满足需求&#xff0c;需要传入对象&#xff0c;但是又不想创建模型 new[] 是一种用于创建匿名类型数组的写法。它是 C# 中的一种语法糖&#xff0c;用于简化数组的初始化过程。 在下面代码示例中&#xff0c;ne…

elment-ui的侧边栏 开关及窗口联动

<template><div class"asders"><el-aside width"200px"><div class"boxbody"><div>源码外卖</div><el-switch v-model"isCollapse" :active-value"true" :inactive-value"fals…

通过Filebeat进行日志监控

对系统的日志监控&#xff0c;通用做法是使用ELK&#xff08;Elasticsearch、Logstash、Kibana&#xff09;进行监控和搜索日志&#xff0c;这里给出另一种方案&#xff1a;通过Filebeat接收日志到Kafka&#xff0c;监控平台接收Kafka&#xff0c;并通过WebSocket实时展示。 这…

CAD Voronoi3D V1.0.1 版本更新说明

更新说明 CAD Voronoi3D V1.0.1版本对泰森多边形晶格进行进一步的优化。 采用新算法大幅度减少形体边界出现小晶格的可能性&#xff0c;使区块更均匀&#xff1a; 优化曲边边界晶格曲率问题&#xff0c;消除曲边形体晶格边界曲率过大现象&#xff1a; 优化生成算法&#xff…

Bash编程

目录&#xff1a; bash编程语法bash脚本编写 1.bash编程语法 Bash 编程基础 变量引号数组控制语句函数 Bash 变量 语法&#xff1a; Variable_namevalue Bash 变量定义的规则 变量名区分大小写&#xff0c;a和A为两个不同的变量。变量名可以使用大小写字母混编的形式进行…