新手怎样在手机上做电商/怎么快速优化关键词

新手怎样在手机上做电商,怎么快速优化关键词,互联网网站备案流程,用ps做企业网站分辨率是多少Kafka生产者相关-CSDN博客 消费者消费数据基本流程 package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache…

Kafka生产者相关-CSDN博客

消费者消费数据基本流程

package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-1");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));//从Kafka主题中获取数据while (true){ConsumerRecords<String, String> poll = consumer.poll(100);for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//关闭消费者对象 因上面在无线循环//consumer.close();}
}

消费数据偏移量问题

Kafka中的 偏移量(offset)是用于标识每个消费者在某个分区内消费到的位置。每个分区的消息都有一个唯一的偏移量,消费者会根据这个偏移量来读取消息。

Kafka偏移量的管理

Kafka默认提供两种方式来管理偏移量:

  1. 自动提交偏移量(默认方式)
  2. 手动提交偏移量(需要显式配置)

consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");  //默认设置

consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); //默认设置 每5秒提交一次

也就是说默认情况下   消费者如果后启动  无法读取到生产者已经发送的消息

偏移量的重置

如果需要重新消费数据,可以通过 auto.offset.reset 配置项来控制消费者的偏移量重置行为。这个配置项有几个常用的值:

  • earliest:如果没有找到偏移量(比如第一次消费),消费者会从最早的消息开始消费。
  • latest:如果没有找到偏移量,消费者会从最新的消息开始消费。
  • none:如果没有找到偏移量,消费者会抛出异常。

例如,设置为从最早的消息开始消费:

consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

如果想从指定偏移量获取消息

解决消费者重复消费的问题(不能完全解决)

以上示例,偏移量默认都是5秒一次提交

例如先启动消费者

然后生产者发送了10000条数据  不好演示的话可以在生产者那边每发送一条数据然后

Thread.sleep 1秒

如果消费者在消费到一定程度之后  突然停止   观察再次启动消费者  存在消费者重复消费的情况

原因就是消费者偏移量默认5秒提交一次的原因

那么可以将消费者默认5秒提交偏移量缩短为1秒  但是这样不能完全解决问题

示例

消费者

代码

package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//设置从最早的消息读取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));
//        boolean flg = true;
//        while (flg) {
//            // 拉取数据
//            consumer.poll(Duration.ofMillis(100));
//            final Set<TopicPartition> assignment = consumer.assignment();
//
//            if (assignment != null && !assignment.isEmpty()) {
//                // 检查分配的分区
//                for (TopicPartition topicPartition : assignment) {
//                    if ("test".equals(topicPartition.topic())) {
//                        // 将偏移量设置为2
//                        consumer.seek(topicPartition, 2);
//                        // 停止循环
//                        flg = false;
//                    }
//                }
//            }
//        }//从Kafka主题中获取数据while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//关闭消费者对象 因上面在无线循环//consumer.close();}
}

生产者 

代码

package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10000;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据  send方法还可以接收一个参数,就是回调函数  kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});//这样变成同步了send.get();Thread.sleep(1000);}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}

先启动消费者

然后启动生产者

观察消费者控制台

过会我再次启动消费者

1.缩短自动提交偏移量的时间

因为默认消费者每5秒自动提交提交

可以缩短自动提交偏移量的时间   但这样只能减少重复消费的量  并不能彻底解决重复消费的问题

2.手动提交偏移量

package com.hrui;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//默认自动提交  改成false 变成手动提交偏移量consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//设置从最早的消息读取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));
//        boolean flg = true;
//        while (flg) {
//            // 拉取数据
//            consumer.poll(Duration.ofMillis(100));
//            final Set<TopicPartition> assignment = consumer.assignment();
//
//            if (assignment != null && !assignment.isEmpty()) {
//                // 检查分配的分区
//                for (TopicPartition topicPartition : assignment) {
//                    if ("test".equals(topicPartition.topic())) {
//                        // 将偏移量设置为2
//                        consumer.seek(topicPartition, 2);
//                        // 停止循环
//                        flg = false;
//                    }
//                }
//            }
//        }//从Kafka主题中获取数据while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}//设置自动提交偏移量之后   这里要手动去保存偏移量//这里有两种方式  同步提交 和异步提交偏移量consumer.commitAsync();//异步consumer.commitSync();//同步}//关闭消费者对象 因上面在无线循环//consumer.close();}
}

说明以上方式都不能彻底解决重复消费问题

重复消费问题还是存在

如果要进行原子绑定  并非做不到,Kafka本身没有提供相关功能

例如把拉取到的数据全部处理完了,才进行事务提交

一旦出现意外,业务数据恢复  但是Kafka本身没有提供相关功能 和与其他支持事务处理的应用结合使用

消费数据-事务隔离级别

生产者事务图

这个报错 写在提交之前即可

消费者组介绍

如果两个应用都是同一个消费者组

生产者A生产消息      消费者B和C在同一个消费者组  那么A的消息如果被B消费过了那么C是消费不到的  B和C默认是竞争关系

如果生产者A生产消息  消费者B和C在不同消费者组   那么消息会被B和C都消费

第一个场景:消费者B和C在同一个消费者组

如果消费者B和消费者C在同一个消费者组内,消息会按照负载均衡的方式分配给它们。这意味着生产者A生产的消息会被消费者B或消费者C中的一个消费,而不是同时被两个消费者消费。所以,如果B已经消费了某条消息,消费者C就无法再消费到这条消息。这种行为是消费者组的基本特性,主要用于确保每条消息只被某个消费者处理一次。

第二个场景:消费者B和C在不同的消费者组

如果消费者B和消费者C在不同的消费者组中,那么生产者A生产的消息会分别被B和C都消费到。因为每个消费者组有自己独立的消费进度(每个组有独立的偏移量),所以每个消费者组都能独立消费该消息。

第三个场景:消费者B和C同时开启从头消费

如果B和C都在同一个消费者组,并且设置了从头消费,那么它们将从消息队列的最开始位置开始消费。这种情况下,B和C是共享消费队列的,它们会根据负载均衡规则交替消费消息,而不是同时消费同一条消息。因此,A的消息仍然不会同时被B和C消费。每条消息仍然只会被消费者组内的某个消费者消费,并且消息的消费是共享的,但并不是同时共享。

总结来说,在同一个消费者组内,消息的消费是竞争式的。即使B和C同时开启从头消费,它们也不会同时消费同一条消息。每条消息只会由其中一个消费者处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/70910.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【软考-架构】备战2025软考

新老教材对比 科目1&#xff08;信息系统综合&#xff09;考点详解 科目2&#xff08;系统架构设计案例&#xff09;考点详解 科目3&#xff08;系统架构设计论文&#xff09;考点详解 趋于越来越具体 学习方法推荐 第一阶段 – 基础知识阶段 建议一个半月&#xff1b; 先过…

MMW-1碳棒磨损机设计

摘 要 为了更好的测量在一定压力下碳棒的磨损量&#xff0c;提高碳棒磨损量的测量精度&#xff0c;本文设计了一种MMW-1碳棒磨损机&#xff0c;该碳棒磨损机属于柱盘式摩擦磨损试验机的一种。该机器主要用于做和碳棒有关的摩擦磨损试验&#xff0c;可以更准确的获得相关的参数…

网络运维学习笔记(DeepSeek优化版)005网工初级(HCIA-Datacom与CCNA-EI)链路层发现协议与VLAN技术

文章目录 一、链路层发现协议1.1 思科CDP协议1.2 华为LLDP协议 二、VLAN&#xff08;Virtual Local Area Network&#xff0c;虚拟局域网&#xff09;技术详解2.1 基本概念2.2 技术特性2.3 接口工作原理2.3.1 Access模式2.3.2 Trunk模式 2.4 厂商配置对比思科配置华为配置 2.5 …

SOME/IP-SD -- 协议英文原文讲解5

前言 SOME/IP协议越来越多的用于汽车电子行业中&#xff0c;关于协议详细完全的中文资料却没有&#xff0c;所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块&#xff1a; 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 5.1.2.5 S…

个人电脑小参数GPT预训练、SFT、RLHF、蒸馏、CoT、Lora过程实践——MiniMind图文版教程

最近看到Github上开源了一个小模型的repo&#xff0c;是真正拉低LLM的学习门槛&#xff0c;让每个人都能从理解每一行代码&#xff0c; 从零开始亲手训练一个极小的语言模型。开源地址&#xff1a; GitHub - jingyaogong/minimind: &#x1f680;&#x1f680; 「大模型」2小时…

Java使用ZXing库生成带有Logo的二维码图片,并去除白边动态伸缩上传到阿里云OSS

文章目录 引言二维码基本原理1、二维码概述2、QR Code结构3、错误纠正级别 QR Code生成技术1、ZXing库2、生成二维码的步骤 图像处理技术1、嵌入Logo2. 去除白边 阿里云OSS基本概念1、OSS概述2. 主要功能3. 基本概念 实战演示1、依赖库2、类结构3、生成普通二维码4. 去除白边5、…

AI工具箱最新使用教程

先克隆项目 电脑需要先安装 git &#xff0c;安装的画看这个 Git安装教程&#xff08;超详细&#xff09;。 git镜像 git clone https://github.com/Escaflowne1985/MyToolsWebBackendUser.gitgitee镜像 git clone https://gitee.com/escaflowne/MyToolsWebBackendUser.git…

数据安全管理的AI工具有哪些?

数据安全管理的AI工具在不断演进&#xff0c;它们凭借强大的算法和学习能力&#xff0c;为企业筑起了一道坚不可摧的数字防线。 在数据安全管理领域&#xff0c;AI工具的应用日益广泛&#xff0c;以下是一些常见的工具及其功能&#xff1a; AI驱动的数据分类与标记 工具: Micr…

Android 常用命令和工具解析之存储相关

1 基本概念 2 命令解读 2.1 adb shell df df 命令主要用于需要检查文件系统上已使用和可用的磁盘空间的数量。如果没有指定文件名&#xff0c;则显示在当前所有挂载的文件系统上可用的空间。其原理是从proc/mounts 或 /etc/mtab 中检索磁盘信息。 注意&#xff1a;df命令并…

使用ZFile打造属于自己的私有云系统结合内网穿透实现安全远程访问

文章目录 前言1.关于ZFile2.本地部署ZFile3.ZFile本地访问测试4.ZFile的配置5.cpolar内网穿透工具安装6.创建远程连接公网地址7.固定ZFile公网地址 前言 在数字化的今天&#xff0c;我们每个人都是信息的小能手。无论是职场高手、摄影达人还是学习狂人&#xff0c;每天都在创造…

HarmonyOS 5.0应用开发——鸿蒙接入高德地图实现POI搜索

【高心星出品】 文章目录 鸿蒙接入高德地图实现POI搜索运行结果&#xff1a;准备地图编写ArkUI布局来加载HTML地图 鸿蒙接入高德地图实现POI搜索 在当今数字化时代&#xff0c;地图应用已成为移动设备中不可或缺的一部分。随着鸿蒙系统的日益普及&#xff0c;如何在鸿蒙应用中…

idea + Docker + 阿里镜像服务打包部署

一、下载docker desktop软件 官网下载docker desktop&#xff0c;需要结合wsl使用 启动成功的画面(如果不是这个画面例如一直处理start或者是stop需要重新启动&#xff0c;不行就重启电脑) 打包成功的镜像在这里&#xff0c;如果频繁打包会导致磁盘空间被占满&#xff0c;需…

【监督学习】ARIMA预测模型步骤及matlab实现

ARIMA预测模型 ARIMA预测模型1.算法步骤2.参数选择(1)拖尾截尾判断法(2) AIC 准则(3) BIC 准则 3.MATLAB 实现参考资料 ARIMA预测模型 #mermaid-svg-mDhjwpnuA0YcEGnE {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…

使用git管理uniapp项目

1.本地管理 1. 在项目根目录中新建 .gitignore 忽略文件&#xff0c;并配置如下&#xff1a; # 忽略 node_modules 目录 /node_modules /unpackage/dist 2. 打开终端&#xff0c;切换到项目根目录中&#xff0c;运行如下的命令&#xff0c;初始化本地 Git 仓库&#xff1…

Unity中动态切换光照贴图的方法

关键代码&#xff1a;LightmapSettings.lightmaps lightmapDatas; LightmapData中操作三张图&#xff1a;lightmapColor,lightmapDir,以及一张ShadowMap 这里只操作前两张&#xff1a; using UnityEngine; using UnityEngine.EventSystems; using UnityEngine.UI;public cl…

1.2 Kaggle大白话:Eedi竞赛Transformer框架解决方案02-GPT_4o生成训练集缺失数据

目录 0. 本栏目竞赛汇总表1. 本文主旨2. AI工程架构3. 数据预处理模块3.1 配置数据路径和处理参数3.2 配置API参数3.3 配置输出路径 4. AI并行处理模块4.1 定义LLM客户端类4.2 定义数据处理函数4.3 定义JSON保存函数4.4 定义数据分片函数4.5 定义分片处理函数4.5 定义文件名排序…

pycharm远程连接服务器运行pytorch

Linux部署pytorch 背景介绍 不同的开源代码可能需要不同的实验环境和版本&#xff0c;这时候的确体现出Anaconda管理环境的好处了&#xff0c;分别搞一个独立环境方便管理。 有的教程建议选择较旧的版本&#xff0c;但笔者建议在条件允许的情况下安装最新版&#xff0c;本次…

组件注册方式、传递数据

组件注册 一个vue组件要先被注册&#xff0c;这样vue才能在渲染模版时找到其对应的实现。有两种注册方式&#xff1a;全局注册和局部注册。&#xff08;组件的引入方式&#xff09; 以下这种属于局部引用。 组件传递数据 注意&#xff1a;props传递数据&#xff0c;只能从父…

ROS的action通信——实现阶乘运算(三)

在ROS中除了常见的话题(topic&#xff09;通信、服务(server)通信等方式&#xff0c;还有action通信这一方式&#xff0c;由于可以实时反馈任务完成情况&#xff0c;该通信方式被广泛运用于机器人导航等任务中。本文将通过三个小节的分享&#xff0c;实现基于action通信的阶乘运…

四款 AI 协作办公工具,AI工具库革新办公效率

在数字化办公时代&#xff0c;AI 技术正深刻改变着我们的工作方式。AIDH.NETAI工具库汇聚了众多先进的工具&#xff0c;今天我们来了解 AI协作办公工具&#xff0c;探索它们如何助力企业和团队在办公场景中脱颖而出。 Taskade&#xff1a;智能工作流的领航者 Taskade 是一款将…