天津手机网站开发/网站是怎么做出来的

天津手机网站开发,网站是怎么做出来的,做爰在线网站,非凡免费建网站平台1. 生产消息 1.1 生产消息的基本步骤 (一)创建Map类型的配置对象,根据场景增加相应的配置属性: 参数名参数作用类型默认值推荐值bootstrap.servers集群地址,格式为:brokerIP1:端口号,brokerIP2:端口号必…

1. 生产消息

1.1 生产消息的基本步骤

(一)创建Map类型的配置对象,根据场景增加相应的配置属性:

参数名参数作用类型默认值推荐值
bootstrap.servers集群地址,格式为:brokerIP1:端口号,brokerIP2:端口号必须
key.serializer对生产数据Key进行序列化的类完整名称必须Kafka提供的字符串序列化类:StringSerializer
value.serializer对生产数据Value进行序列化的类完整名称必须Kafka提供的字符串序列化类:StringSerializer
interceptor.classes拦截器类名,多个用逗号隔开可选
batch.size数据批次字节大小。此大小会和数据最大估计值进行比较,取大值。估值=61+21+(keySize+1+valueSize+1+1)可选16K
retries重试次数可选整型最大值0或整型最大值
request.timeout.ms请求超时时间可选30s
linger.ms数据批次在缓冲区中停留时间可选
acks请求应答类型:all(-1), 0, 1可选all(-1)根据数据场景进行设置
retry.backoff.ms两次重试之间的时间间隔可选100ms
buffer.memory数据收集器缓冲区内存大小可选32M64M
max.in.flight.requests.per.connection每个节点连接的最大同时处理请求的数量可选5小于等于5
enable.idempotence幂等性,可选true根据数据场景进行设置
partitioner.ignore.keys是否放弃使用数据key选择分区可选false
partitioner.class分区器类名可选null

(二)创建待发送数据

在 Kafka 中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据类型:

在这里插入图片描述

在这里插入图片描述

相关属性必须在构建数据模型时指定,其中主题和value的值时必须要传递的。如果配置中开启了自动创建主题,那么 Topic 主题可以不存在。value 就是我们需要真正传递的数据了,而 Key 可以用于数据的分区定位。

(三)创建生产者对象,发送生产的数据:

根据前面提供的配置信息创建生产者对象,通过这个生产者对象向 Kafka 服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构件的多个组件实现的,多个组件的关系类似与生产者消费者模式。

在这里插入图片描述

(1)数据生产者(KafkaProducer):生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者。

  • 如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
  • 因为发送的数据为 KV 数据,所以需要根据配置信息中的序列化对象对数据中 Key 和 Value 分别进行序列化处理。
  • 计算数据嗦发送的分区位置。
  • 将数据追加到数据收集器中。

(2)数据收集器(RecordAccumulator):用于收集,转换我们生产的数据,蕾西与生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka 并不是生产一条数据就向 Broker 发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。

  • 默认情况下,一个发送批次的数据容量为 16k,这个可以通过参数 batch.size进行改善。
  • 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
  • 如果当前批次能容纳数据,那么直接将数据追加到批次中即可,如果不能容纳数据,那么会产生新的批次放入到当前分区的批次队列中,这个队列使用的是 Java 双端队列 Deque。旧的批次关闭不再接收新的数据,等待发送。

(3)数据发送器(Sender):线程对象,用于从收集器中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到 Broker 节点中

  • 因为数据真正发送的地方是 Broker 节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用 Broker 节点重新组合成List集合。
  • 将组合后的<节点,List<批次>>的数据封装成客户端请求(请求键为:Produce)发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给 Broker 节点。
  • Broker 节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。

在这里插入图片描述

1.2 生产消息的基本代码

// TODO 配置属性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// TODO 创建Kafka生产者对象,建立Kafka连接
//      构造对象时,需要传递配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 准备数据,定义泛型
//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1"
);
// TODO 生产(发送)数据
producer.send(record);
// TODO 关闭生产者连接
producer.close();

1.3 发送消息

1.3.1 拦截器

生产者 API 在数据准备好发送给 Kafka 服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过 Kafka 提供的拦截器完成。

这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。

在这里插入图片描述

1.3.1.1 增加拦截器类

(1)实现生产者拦截器接口 ProducerInterceptor

/*** TODO 自定义数据拦截器*      1. 实现Kafka提供的生产者接口ProducerInterceptor*      2. 定义数据泛型 <K, V>*      3. 重写方法*         onSend*         onAcknowledgement*         close*         configure*/
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Override	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

(2)实现接口中的方法,根据业务功能重写具体的方法

方法名作用
onSend数据发送前,会执行此方法,进行数据发送前的预处理
onAcknowledgement数据发送后,获取应答时,会执行此方法
close生产者关闭时,会执行此方法,完成一些资源回收和释放的操作
configure创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换。
1.3.1.2 配置拦截器
public class ProducerInterceptorTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());KafkaProducer<String, String> producer = null;try {producer = new KafkaProducer<>(configMap);for ( int i = 0; i < 1; i++ ) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);final Future<RecordMetadata> send = producer.send(record);}} catch ( Exception e ) {e.printStackTrace();} finally {if ( producer != null ) {producer.close();}}}
}
1.3.2 回调方法

Kafka 发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理,具体代码实现采用匿名类或 Lambda 表达式都可以。

public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循环生产数据for ( int i = 0; i < 1; i++ ) {// TODO 创建数据ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后,会回调此方法System.out.println("数据发送成功:" + recordMetadata.timestamp());}});}producer.close();}
}
1.3.3 异步发送

Kafka 发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker 接收到数据后进行后续处理。

如果 Kafka 通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送下一条数据的场合,我们就称之为异步发送。

在这里插入图片描述

public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循环生产数据for ( int i = 0; i < 10; i++ ) {// TODO 创建数据ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后,会回调此方法System.out.println("数据发送成功:" + recordMetadata.timestamp());}});// TODO 发送当前数据System.out.println("发送数据");}producer.close();}
}
1.3.4 同步发送

Kafka 发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker 接收到数据后进行后续处理。

如果 Kafka 通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送下一条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待线程的应答(响应)结果。

代码实现上,采用的是 JDK1.5 增加的JUC 并发编程的 Future 接口的 get 方法实现。

在这里插入图片描述

public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循环生产数据for ( int i = 0; i < 10; i++ ) {// TODO 创建数据ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后,会回调此方法System.out.println("数据发送成功:" + recordMetadata.timestamp());}}).get();// TODO 发送当前数据System.out.println("发送数据");}producer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/899407.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新手村:逻辑回归-理解04:熵是什么?

新手村&#xff1a;逻辑回归04&#xff1a;熵是什么? 熵是什么? 前置条件 在开始学习逻辑回归中的熵理论之前&#xff0c;需要掌握以下基础知识&#xff1a; 概率论与统计学&#xff1a; 概率分布&#xff08;如伯努利分布、正态分布&#xff09;。条件概率和贝叶斯定理。期…

STM32通用定时器结构框图

STM32单片机快速入门 通用定时器框图 TIM9和TIM12 通用定时器框图 TIM9和TIM12 &#xff08;二&#xff09; 通用定时器框图

3.28-2 jmeter读取mysql

jmeter操作mysql 1.下载数据驱动&#xff0c;安装数据驱动 &#xff08;1&#xff09;存放四个路径 a.jre下的lib C:\Program Files\Java\jre1.8.0_60\lib &#xff08;2&#xff09;存放在jre 下的lib 中的ext 路径&#xff1a; C:\Program Files\Java\jre1.8.0_60\lib\…

Postman CORS 测试完全指南:轻松模拟跨域请求,排查 CORS 相关问题

在使用 Postman 进行 API 测试时&#xff0c;通常不会遇到跨域问题&#xff0c;因为 Postman 是一个独立的客户端应用程序&#xff0c;不同于在浏览器中运行的 JavaScript 代码&#xff0c;它没有同源策略&#xff08;SOP&#xff09;的限制。跨域资源共享&#xff08;CORS&…

基于SpringBoot和Vue的SQL TO API平台的设计与实现

文章目录 前言一、系统功能模块二、数据库设计1. 实体属性图1. 实体属性图1.1 职员表1.2 数据源配置表1.3 接口配置表1.4 请求记录表 2. E-R图 三、系统实现1. 登录页面2. 职员管理页面1.1 创建用户1.2 编辑用户 2. 数据源管理2.1 创建数据源2.2 编辑数据源 3. 接口管理3.1 创建…

【Portainer】Docker可视化组件安装

Portainer Portainer 是用于管理容器化环境的一体化平台工程解决方案&#xff0c;提供广泛的定制功能&#xff0c;以满足个人开发人员和企业团队的需求。 官方地址: https://www.portainer.io/ 安装 在 WSL / Docker Desktop 上使用 Docker 安装 Portainer CE 通过命令或UI页…

unity中Xcharts图表鼠标悬浮表现异常

鼠标悬浮在面板附近&#xff0c;只显示单独的一个项目 而且无论鼠标如何移动&#xff0c;根本没有效果。 解决方案&#xff1a; 需要在对应的Canvas上绑定主相机才可以 鼠标移动到项目上就有信息展示了

CentOS 安装LAMP全过程 - 完整步骤与最佳实践

在开始搭建 LAMP 环境之前&#xff0c;需要确保系统已经满足以下条件&#xff1a; 1、操作系统&#xff1a;CentOS 7 或 CentOS 8 2、网络连接&#xff1a;系统必须能够访问互联网以下载所需的软件包 3、权限&#xff1a;需要 root 权限或者通过sudo 提权执行命令 先更新系…

Java基础关键_031_反射(一)

目 录 一、概述 二、获取 Class 的四种方式 1.Class.forName("完整全限定类名") 2.getClass() 3.class 属性 4.通过类加载器获取 三、通过反射机制实例化对象 1.newInstance()&#xff08;已过时&#xff09; 2.配置文件利用反射机制实例化对象 四、反射 Cla…

33.[前端开发-JavaScript基础]Day10-常见事件-鼠标事件-键盘事件-定时器-案例

1 window定时器 window定时器方法 setTimeout的使用 setInterval的使用 2 轮播消息提示 案例实战一 – 轮播消息提示 3 关闭隐藏消息 案例实战二 – 关闭隐藏消息 4 侧边栏展示 案例实战三 – 侧边栏展示 5 tab切换实现 案例实战四 – 登录框&#xff08;作业&#xff09;…

react ant design树穿梭框实现搜索并展开到子节点、同级节点选择及同时选择数量限制功能

功能点&#xff1a; 点击节点前的箭头&#xff0c;可以手动展开或折叠该节点的子节点。在搜索框中输入关键词&#xff0c;匹配的节点及其父节点会自动展开。清空搜索框后&#xff0c;恢复到用户手动控制的展开状态。勾选节点时仍然遵守 "最多勾选 6 个节点" 和 &quo…

阿里云云效 Maven

阿里云云效 Maven 官网&#xff1a;https://developer.aliyun.com/mvn/guide 阿里云Maven中央仓库为 阿里云云效 提供的公共代理仓库&#xff0c;帮助研发人员提高研发生产效率&#xff0c;使用阿里云Maven中央仓库作为下载源&#xff0c;速度更快更稳定。 阿里云云效 是企业…

如何在 Postman 中正确设置 Session 以维持用户状态?

在 Postman 里面设置有 session 的请求。如果你还不知道什么是 session&#xff0c;那么请看这里—— session 是一种记录客户端和服务器之间状态的机制&#xff0c;用于保持用户的登录状态或者其他数据&#xff0c;从而让用户在不同页面之间保持一致的体验。 Postman 设置带 …

DQN与PPO在算法层面的核心区别

DQN与PPO在算法层面的核心区别 1. 学习目标不同 DQN(基于价值): 核心:学习动作价值函数 Q ( s , a ) Q(s, a)

解析 HTML 网站架构规范

2025/3/28 向全栈工程师迈进&#xff01; 一、网页基本的组成部分 网页的外观多种多样&#xff0c;但是除了全屏视频或游戏&#xff0c;或艺术作品页面&#xff0c;或只是结构不当的页面以外&#xff0c;都倾向于使用类似的标准组件。 1.1页眉 通常横跨于整个页面顶部有一…

Three.js 快速入门教程【二十】3D模型加载优化实战:使用gltf-pipeline与Draco对模型进行压缩,提高加载速度和流畅性

系列文章目录 Three.js 快速入门教程【一】开启你的 3D Web 开发之旅 Three.js 快速入门教程【二】透视投影相机 Three.js 快速入门教程【三】渲染器 Three.js 快速入门教程【四】三维坐标系 Three.js 快速入门教程【五】动画渲染循环 Three.js 快速入门教程【六】相机控件 Or…

基于51单片机的速度检测报警器proteus仿真

地址&#xff1a; https://pan.baidu.com/s/1I7roZEjrk349Is_YdMcsxQ 提取码&#xff1a;1234 仿真图&#xff1a; 芯片/模块的特点&#xff1a; AT89C52/AT89C51简介&#xff1a; AT89C51 是一款常用的 8 位单片机&#xff0c;由 Atmel 公司&#xff08;现已被 Microchip 收…

DeepSeek 本地化部署教程

1 概述 1.1 配置参考图 科普&#xff1a; B&#xff0c;Billion&#xff08;十亿&#xff09;&#xff0c;是 “参数量” 的单位。 模型量超过 一亿&#xff0c;可称之为 “大模型”。 2 软件安装 2.1 下载 Ollama 官方主页&#xff1a;https://ollama.com/download主页截图…

matlab打开两个工程

1、问题描述 写代码时&#xff0c;需要实时参考别人的代码&#xff0c;需要同时打开2个模型&#xff0c;当模型在同一个工程内时&#xff0c;这是可以直接打开的&#xff0c;如图所示 2、解决方案 再打开一个MATLAB主窗口 这个时候就可以同时打开多个模型了 3、正确的打开方…

mac 下配置flutter 总是失败,请参考文章重新配置flutter 环境MacOS Flutter环境配置和安装

一、安装和运行Flutter的系统环境要求 想要安装并运行 Flutter&#xff0c;你的开发环境需要最低满足以下要求&#xff1a; 操作系统:macOS磁盘空间:2.8 GB(不包括IDE/tools的磁盘空间)。工具:Flutter使用git进行安装和升级。我们建议安装Xcode&#xff0c;其中包括git&#x…