互联网全景消息(10)之Kafka深度剖析(中)

一、深入应用

1.1 SpringBoot集成Kafka

        引入对应的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><!--swagger2增强,官方ui太low , 访问地址: /doc.html  --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>swagger-bootstrap-ui</artifactId><version>1.8.8</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency></dependencies>

        添加配置文件:

spring:application:name: demokafka:bootstrap-servers: 这里换成自己的kafka信息producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

        启动服务:

@SpringBootApplication
@RestController
public class Demo {public static void main(String[] args) {new SpringApplicationBuilder(Demo.class).run(args);}}

        启动信息如下:

1.2 消息发送 

 1.2.1 异步发送

        KafkaTemplate调用的send默认采用的是异步发送,如果需要同步发送获取发送结果,则需要调用get方法。 

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}

        效果如下:

        同步发送代码如下:

    @GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}

1.2.2 序列化 

        序列化详解:

  • 前面我们用到的是Kafka自带的字符串序列化器,
    org.apache.kafka.common.serialization.StringDeserializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long等;
  • 这些序列化接器都实现了org.apache.kafka.common.serialization.Serializer 

        自定义序列化,自己实现序列化对应的接口即可,如下:

public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}}

        然后在yaml配置自己的编辑器:

value-serializer: com.itheima.demo.config.MySerializer

         对应的我们在消费者消费消息的时候也需要按照我们自定义的方式进行解码,具体代码如下:

package com.itheima.demo.config;import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

 1.2.3 分区策略

         分区策略决定了消息根据key投放到那个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定分区里面去;
  • 没有给定了分区号,给定数据的key值,通过key取上hashcode进行分区;
  • 既没有给定分区号,也没有给定key值,直接轮询进行分区;
  • 自定义分区策略,按照自定义需求选择分区号;

        生产者代码如下:

//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//    指定分区发送
//    不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0,key,"key="+key+",msg=指定0号分区");}//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key,"key="+key+",msg=不指定分区");}// 什么也不指定@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}

        消费者代码如下:

//指定消费组消费
@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}

        1)测试默认分区策略,发送什么也不指定的消息

        可以发现发送的是同一条的数据,他是跟partition轮询发送的;

         2)测试指定分区

         3)按照key的hashcode来分区

1.3 消息消费 

 1.3.1 消费者分组

public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}

         启动:

        需要注意的是,注意分区数与消费者数的搭配,如果(消费者数 > 分区数量),消费者将会出现闲置,浪费资源!

1.3.2 位移提交

        1)自动提交,我们在前面设置了以下两个选项,则kafka会延时设置自动提交:

        2)手动提交,有些时候我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复,如下我们配置手动提交:

@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式://          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:////          RECORD//          每处理一条commit一次////          BATCH(默认)//          每次poll的时候批量提交一次,频率取决于每次poll的调用频率////          TIME//          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)////          COUNT//          累积达到ackCount次的ack去commit////          COUNT_TIME//          ackTime或ackCount哪个条件先满足,就commit////          MANUAL//          listener负责ack,但是背后也是批量上去////          MANUAL_IMMEDIATE//          listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/67788.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

G1原理—3.G1是如何提升垃圾回收效率

大纲 1.G1为了提升GC的效率设计了哪些核心机制 2.G1中的记忆集是什么 3.G1中的位图和卡表 4.记忆集和卡表有什么关系 5.RSet记忆集是怎么更新的 6.DCQ机制的底层原理是怎样的 7.DCQS机制及GC线程对DCQ的处理 提升G1垃圾回收器GC效率的黑科技 G1设计了一套TLAB机制 快速…

Elastic-Job相关

文档参考视频&#xff1a;09_SpringBoot案例演示_哔哩哔哩_bilibili 一、Elastic-Job介绍 Elastic-Job 是一个轻量级、分布式的任务调度框架&#xff0c;旨在解决分布式环境下的定时任务调度问题。 1.1. Elastic-Job 的核心组件 Elastic-Job 是由多个核心组件构成的&#x…

Spring Boot3 配合ProxySQL实现对 MySQL 主从同步的读写分离和负载均衡

将 ProxySQL 配合 Spring Boot 使用&#xff0c;主要的目的是在 Spring Boot 应用程序中实现对 MySQL 主从同步的读写分离和负载均衡。这样&#xff0c;你可以利用 ProxySQL 自动将写操作路由到主库&#xff0c;而将读操作路由到从库。 1. 准备工作 确保你的 MySQL 主从同步环…

TypeScript语言的并发编程

TypeScript语言的并发编程 引言 随着现代应用程序的复杂性不断增加&#xff0c;性能和用户体验的重要性显得尤为突出。在这种背景下&#xff0c;并发编程应运而生&#xff0c;成为提升应用程序效率的重要手段。在JavaScript及其超集TypeScript中&#xff0c;尽管语言本身是单…

【Linux】设备驱动中的ioctl详解

在Linux设备驱动开发中&#xff0c;ioctl&#xff08;输入输出控制&#xff09;是一个非常重要的接口&#xff0c;用于用户空间应用程序与内核空间设备驱动之间进行通信。通过ioctl&#xff0c;应用程序可以发送命令给设备驱动&#xff0c;控制设备的行为或获取设备的状态信息。…

再次梳理ISP的大致流程

前言&#xff1a; 随着智能手机的普及&#xff0c;相机与我们的生活越来越紧密相关。在日常生活中&#xff0c;我们只需要轻轻按下手机上的拍照按钮&#xff0c;就能记录下美好时刻。那么问题来了&#xff1a;从我们指尖按下拍照按钮到一张色彩丰富的照片呈现在我们面前&#x…

基于R语言森林生态系统的结构、功能与稳定性

在生态学研究中&#xff0c;森林生态系统的结构、功能与稳定性是核心研究内容之一。这些方面不仅关系到森林动态变化和物种多样性&#xff0c;还直接影响森林提供的生态服务功能及其应对环境变化的能力。森林生态系统的结构主要包括物种组成、树种多样性、树木的空间分布与密度…

nacos学习笔记(一)

1.前言 何为nacos&#xff0c;nacos是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。可以说集注册中心&#xff0c;配置中心&#xff0c;服务管理于一起的平台。注册中心&#xff1a;相当于我们可以把服务注册到注册中心上&#xff0c;我们以后可以通过服…

在Java中实现集合排序

使用字面量的方式创建一个集合 //使用字面量的方式初始化一个List集合List<User> userList Arrays.asList(new User("小A",5),new User("小鑫",18),new User("小昌",8),new User("小鑫",8));注意&#xff1a;使用Arrays.asLis…

logback日志

一、使用两个以上spring环境变量做三目操作 <springProperty name"application_name" scope"context" source"spring.application.name"/><springProperty name"trace_app_name" scope"context" source"sprin…

mysql和redis的最大连接数

平时我们要评估mysql和redis的最大连接数&#xff0c;可以选择好环境&#xff08;比如4核8G&#xff09;,定好压测方法&#xff08;没有索引的mysql单表&#xff0c;redis单key&#xff09;进行压测&#xff0c;评估其最大并发量。 也可以查看各大云厂商的规格进行评估。 mys…

QEMU通过OVS实现联网

这篇笔记也是记录了一下自己的辛酸历程&#xff0c;仅供有需要的人参考。 首先关于qemu虚拟机的搭建&#xff0c;这不多赘述了&#xff0c;大家应该都会&#xff0c;这里可以给大家提供一个链接和一些命令。 QEMU搭建X86_64 Ubuntu虚拟系统环境https://blog.csdn.net/m0_531…

seleniun 自动化程序,python编程 我监控 chrome debug数据后 ,怎么获取控制台的信息呢

python 好的&#xff0c;使用 Python 来监控 Chrome 的调试数据并获取控制台信息&#xff0c;可以使用 websocket-client 库来连接 Chrome 的 WebSocket 接口。以下是一个详细的示例&#xff1a; 1. 安装必要的库 首先&#xff0c;你需要安装 websocket-client 库。可以使用…

IT面试求职系列主题-Jenkins

想成功求职&#xff0c;必要的IT技能一样不能少&#xff0c;先说说Jenkins的必会知识吧。 1) 什么是Jenkins Jenkins 是一个用 Java 编写的开源持续集成工具。它跟踪版本控制系统&#xff0c;并在发生更改时启动和监视构建系统。 2&#xff09;Maven、Ant和Jenkins有什么区别…

WEBRTC前端播放 播放器组件封装

组件封装 <template><div><div class"option"><input v-model"useStun" type"checkbox" /><label for"use-stun">Use STUN server</label></div><button click"startPlay"&g…

(五)ROS通信编程——参数服务器

前言 参数服务器在ROS中主要用于实现不同节点之间的数据共享&#xff08;P2P&#xff09;。参数服务器相当于是独立于所有节点的一个公共容器&#xff0c;可以将数据存储在该容器中&#xff0c;被不同的节点调用&#xff0c;当然不同的节点也可以往其中存储数据&#xff0c;关…

攻防靶场(34):隐蔽的计划任务提权 Funbox1

目录 1. 侦查 1.1 收集目标网络信息&#xff1a;IP地址 1.2 主动扫描&#xff1a;扫描IP地址段 1.3 搜索目标网站 2. 初始访问 2.1 有效账户&#xff1a;默认账户 2.2 利用面向公众的应用 2.3 有效账户&#xff1a;默认账户 3. 权限提升 3.1 计划任务/作业&#xff1a;Cron 靶场…

嵌入式入门Day38

C Day1 第一个C程序C中的输入输出输出操作coutcin练习 命名空间使用方法自定义命名空间冲突问题 C对字符串的扩充C风格字符串的使用定义以及初始化C风格字符串与C风格字符串的转换C风格的字符串的关系运算常用的成员变量输入方法 布尔类型C对堆区空间使用的扩充作业 第一个C程序…

kubernetes第七天

1.影响pod调度的因素 nodeName 节点名 resources 资源限制 hostNetwork 宿主机网络 污点 污点容忍 Pod亲和性 Pod反亲和性 节点亲和性 2.污点 通常是作用于worker节点上&#xff0c;其可以影响pod的调度 语法&#xff1a;key[value]:effect effect:[ɪˈfek…

docker minio镜像arm64架构

minio版本为RELEASE.2021-09-03T03-56-13Z 原项目信创改造&#xff0c;服务器资源改为了arm64架构&#xff0c;统信uos docker镜像库内没有对应的minio镜像&#xff0c;当前镜像为拉取源码后&#xff0c;自编译打包镜像&#xff0c;亲测可用。 使用方式 将tar包导入到服务器…