网站图片相册代码/推广普通话手抄报内容简短

网站图片相册代码,推广普通话手抄报内容简短,wordpress如何把菜单加入导航栏,怎样办网站文章目录 1.定义2.生产者拦截器2.1 示例 3.消费者拦截器3.1 示例 1.定义 拦截器主要用于实现clients端的定制化需求,包括消息在生产者发送到 Kafka 或者在消费者接收消息之前进行一些定制化的操作。用于在消息发送和接收的关键步骤中进行拦截和处理。可以修改消息&…

文章目录

  • 1.定义
  • 2.生产者拦截器
    • 2.1 示例
  • 3.消费者拦截器
    • 3.1 示例


1.定义

拦截器主要用于实现clients端的定制化需求,包括消息在生产者发送到 Kafka 或者在消费者接收消息之前进行一些定制化的操作。用于在消息发送和接收的关键步骤中进行拦截和处理。可以修改消息,日志记录,统计等。由生产者拦截器和消费者拦截器组成。生产者的拦截器是在发送前和确认后调用,而消费者的则是在接收后和提交前。

2.生产者拦截器

对于 producer 而言,interceptor使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor 按序作用于同一条消息从而形成一个拦截链(imterceptorchain)。intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法如下。

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send 方法中,即它运行在用户主线程中。producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata,Exception):该方法会在消息成功提交被应答之前或消息发送失败时调用,并且通常都是在 producer 回调 callback 逻辑触发之前。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。onAcknowledgement 运行在producer的I/O线程中,因此不要在该方法中放入很“重”的逻辑,否则会拖慢producer的消息发送效率。当exception为null时可以获取到消息的确认信息,包括分区、偏移量等。当exception不为null时可以执行一些异常处理逻辑。
  • close:关闭interceptor,主要用于执行一些资源清理工作。

interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外,若指定了多个interceptor,则producer 将按照指定顺序调用它们,同时把每个interceptor 中捕获的异常记录到错误日志中而不是向上传递。

2.1 示例

下面实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
首先创建TimeStampPrependerInterceptor,代码如下:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeStampPrependerInterceptor implements ProducerInterceptor<String,String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return new ProducerRecord<>(record.topic(),record.partition(), record.timestamp(), record.key(),System.currentTimeMillis()+","+record.value().toString());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

下面定义第二个interceptor,代码如下:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String,String> {private int errorCounter =0;private int successCounter =0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(exception==null){successCounter++;}else{errorCounter++;}}@Overridepublic void close() {System.out.println("Successful sent:"+successCounter);System.out.println("Failed sent:"+errorCounter);}@Overridepublic void configure(Map<String, ?> configs) {}
}

生产者代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.customPartitioner.AuditPartitioner");//构建拦截链List<String> interceptors =new ArrayList<>();interceptors.add("com.exm.collectcodenew.kafka.producer.customInterceptor.TimeStampPrependerInterceptor");interceptors.add("com.exm.collectcodenew.kafka.producer.customInterceptor.CounterInterceptor");props.put("interceptor.classes",interceptors);Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord record = new ProducerRecord("topic-test1","test_"+i);producer.send(record).get();}producer.close();}
}

3.消费者拦截器

消费者拦截器(Consumer Interceptor)是一种允许你在消息到达消费者客户端之前或之后对其进行拦截并执行特定操作的功能。不过,如果你的目的是在消费消息的确认阶段做一些操作,可以考虑使用消费者回调或者在消费者代码中显式处理确认逻辑。
要使用消费者拦截器,你需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。其定义的方法如下。

  • onConsume(ConsumerRecords<K, V> records): 在消费者拉取poll到消息之后,但还没被返回给用户之前被调用。可以根据需要修改消息集。
  • onCommit(Map<TopicPartition, OffsetAndMetadata> offsets): 在消费者成功提交偏移量之后调用。可以用来记录偏移量提交的详细信息。如果提交过程中出现异常,可能不会触发这个方法。
  • close(): 拦截器关闭时调用,可以用来释放资源。

3.1 示例

定义interceptor,代码如下:

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class LoggingOffsetInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("Before consuming records: " + records);// 在这里可以进行一些预处理操作,例如修改消息内容等// 例如:修改每条消息的内容records.forEach(record -> System.out.println("Original message: " + new String(record.value())));// 你可以在这里返回一个新的ConsumerRecords对象来改变原始的记录集return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {System.out.println("Committing offsets: " + offsets);// 在这里可以执行一些在偏移量提交之后的操作,例如记录偏移量信息等}@Overridepublic void close() {// 清理资源等操作}@Overridepublic void configure(Map<String, ?> configs) {}
}

消费者代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("group.id","test-group");//必须指定props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定props.put("enable.auto.commit","true");props.put("auto.commit.interval.ms","1000");props.put("auto.offset.reset","earliest");//从最早的消息开始读取// 添加拦截器配置props.put("interceptor.classes", "com.exm.collectcodenew.kafka.producer.customInterceptor.LoggingOffsetInterceptor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//创建consumer实例consumer.subscribe(Arrays.asList("topic-test1"));while(true){ConsumerRecords<String,String> records=consumer.poll(1000);for (ConsumerRecord<String, String> record: records){System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/74464.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

进程间通信(匿名管道) ─── linux第22课

目录 进程间通信 进程间通信目的 进程间通信的发展 进程间通信分类 1. 管道 2. System V IPC 3. POSIX IPC 管道 什么是管道 站在文件描述符角度-深度理解管道 站在内核角度-管道本质 ​编辑 匿名管道 测试匿名管道的读写 匿名管道的四大现象&#xff1a; 匿…

架构思维:通用系统设计方法论_从复杂度分析到技术实现指南

文章目录 Question订单履约原始架构痛点目标架构架构图说明关键设计点优点 设计方法论复杂来源解决方案评估标准从设计原则出发 技术实现 &#xff08;以选型Redis为例&#xff09;Redis消息队列的实现细节高可用设计 总结 Question 我们经常聊如何设计一个比较完善的系统&…

Excel(实战):INDEX函数和MATCH函数、INDEX函数实战题

目录 经典用法两者嵌套查值题目解题分析 INDEX巧妙用法让数组公式&#xff0c;自动填充所有、有数据的行/列INDEX函数和SEQUENCE函数 经典用法两者嵌套查值 题目 根据左表查询这三个人的所有数据 解题分析 INDEX函数的参数&#xff1a;第1个参数是选定查找范围&#xff0c…

ECharts仪表盘-仪表盘25,附视频讲解与代码下载

引言&#xff1a; ECharts仪表盘&#xff08;Gauge Chart&#xff09;是一种类似于速度表的数据可视化图表类型&#xff0c;用于展示单个或多个变量的指标和状态&#xff0c;特别适用于展示指标的实时变化和状态。本文将详细介绍如何使用ECharts库实现一个仪表盘&#xff0c;…

清华大学.智灵动力-《DeepSeek行业应用实践报告》附PPT下载方法

导 读INTRODUCTION 今天分享是由清华大学.智灵动力&#xff1a;《DeepSeek行业应用实践报告》&#xff0c;主要介绍了DeepSeek模型的概述、优势、使用技巧、与其他模型的对比&#xff0c;以及在多个行业中的应用和未来发展趋势。为理解DeepSeek模型的应用和未来发展提供了深入的…

VSCODE上ckg_server_linux进程占用CPU过多

现象描述 每次一打开VSCODE&#xff0c;ckg_server_linux进程就启动了&#xff0c;并且一直运行&#xff0c;且占用CPU过高&#xff1b; 推测应该是某个插件的问题导致的&#xff1b; 问题处理 本地搜索了一下&#xff0c;发现是 marscode插件影响的&#xff1b; 禁用marsc…

【大模型理论篇】CogVLM:多模态预训练语言模型

1. 模型背景 前两天我们在《Skywork R1V: Pioneering Multimodal Reasoning with Chain-of-Thought》中介绍了将ViT与推理模型结合构造多模态推理模型的案例&#xff0c;其中提到了VLM的应用。追溯起来就是两篇前期工作&#xff1a;Vision LLM以及CogVLM。 今天准备回顾一下Cog…

2021年蓝桥杯第十二届CC++大学B组真题及代码

目录 1A&#xff1a;空间&#xff08;填空5分_单位转换&#xff09; 2B&#xff1a;卡片&#xff08;填空5分_模拟&#xff09; 3C&#xff1a;直线&#xff08;填空10分_数学排序&#xff09; 4D&#xff1a;货物摆放&#xff08;填空10分_质因数&#xff09; 5E&#xf…

Python入门基础

python基础类型转换 str()与int()类型转换 name 张三 age 20 print(type(name),type(age))print(我叫name 今年&#xff0c; str(age)岁 )a10 b198.8 cFalse print(type(a),type(b),type(c)) print(str(a),str(b),str(c))s1 128 f198.7 s276.77 ffTrue s3hello print(type(s…

OPENCV数字识别(非手写数字/采用模板匹配)

这篇文章的重点在于 模板匹配 的使用。模板匹配是计算机视觉中的一项基本技术&#xff0c;它通过比对输入图像与模板图像的相似度&#xff0c;来进行目标识别。对于数字识别&#xff0c;特别是标准数字的识别&#xff0c;模板匹配非常有效。 请看效果&#xff1a; 文章结构 …

Cursor安装注册+基础配置+入门实操

一、安装注册 官网地址&#xff1a;https://www.cursor.com/ 下载按钮会根据电脑系统来匹配&#xff0c;点击对应「Download」按钮进行下载。完成后&#xff0c;按步骤安装即可。 安装完成后&#xff0c;即可点击图标打开软件。 基础设置完成后&#xff0c;就需要选择注册账号…

秒杀业务优化之从分布式锁到基于消息队列的异步秒杀

一、业务场景介绍 优惠券、门票等限时抢购常常出现在各类应用中&#xff0c;这样的业务一般为了引流宣传而降低利润&#xff0c;所以一旦出现问题将造成较大损失&#xff0c;那么在业务中就要求我们对这类型商品严格限时、限量、每位用户限一次、准确无误的创建订单&#xff0c…

MiniMax GenAI 可观测性分析:基于阿里云 SelectDB 构建 PB 级别日志系统

“阿里云SelectDB作为MiniMax日志存储服务的核心支撑&#xff0c;为在线和离线业务提供了高效、稳定的查询与聚合分析能力。其支持实时物化视图、租户资源隔离、冷热分离等企业级特性&#xff0c;不仅有效解决了日志场景下PB级别数据查询的性能瓶颈&#xff0c;还通过智能化的资…

【YOLO V3】目标检测 Darknet 训练自定义模型

【YOLO V3】目标检测 Darknet 训练自定义模型 前言整体思路环境检查与依赖配置克隆 YOLOv3 Darknet 并编译Clone Darknet 项目文件修改 Makefile 文件修改模型保存频率项目编译 准备数据集配置训练文件数据集&#xff1a;datasets &#xff08;自制&#xff09;权重文件 yolov3…

DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)之添加导出数据功能

前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏+关注哦 💕 目录 DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)之添加导出数据功能📚页面效果📚指令输入�…

《Python实战进阶》第31集:特征工程:特征选择与降维技术

第31集&#xff1a;特征工程&#xff1a;特征选择与降维技术 摘要 特征工程是机器学习和数据科学中不可或缺的一环&#xff0c;其核心目标是通过选择重要特征和降低维度来提升模型性能并减少计算复杂度。本集聚焦于特征选择与降维技术&#xff0c;涵盖过滤法、包裹法、嵌入法等…

Excel第41套全国人口普查

2. 导入网页中的表格&#xff1a;数据-现有链接-考生文件夹&#xff1a;网页-找到表格-点击→变为√-导入删除外部链接关系&#xff1a;数据-点击链接-选中连接-删除-确定&#xff08;套用表格格式-也会是删除外部链接&#xff09;数值缩小10000倍&#xff08;除以10000即可&am…

WPS宏开发手册——使用、工程、模块介绍

目录 系列文章前言1、开始1.1、宏编辑器使用步骤1.2、工程1.3、工程 系列文章 使用、工程、模块介绍 JSA语法 第三篇练习练习题&#xff0c;持续更新中… 前言 如果你是开发人员&#xff0c;那么wps宏开发对你来说手拿把切。反之还挺吃力&#xff0c;需要嘻嘻&#xf…

EtherCAT转CANopen配置CANopen侧的PDO映射

EtherCAT转CANopen配置CANopen侧的PDO映射 在工业自动化领域&#xff0c;EtherCAT和CANopen是两种广泛应用的通信协议。它们各自具有独特的优势&#xff0c;但在某些应用场景下&#xff0c;需要将这两种协议进行转换以实现设备间的高效数据交换。本文将详细介绍如何在使用Ethe…

【QT】Qt creator快捷键

Qt creator可以通过以下步骤快捷键査看调用关系&#xff1a; 1.打开代码文件。 2.将光标放在你想要查看调用关系的函数名上。 3.按下键盘快捷键 CtrlshiftU。 4.弹出菜单中选择“调用路径”或“被调用路径” 5.在弹出的窗口中可以查看函数的调用关系 折叠或展开代码快捷键&…