Kafka-多线程消费及分区设置

目录

  • 一、Kafka是什么?
    • 消息系统:Publish/subscribe(发布/订阅者)模式
    • 相关术语
  • 二、初步使用
    • 1.yml文件配置
    • 2.生产者类
    • 3.消费者类
    • 4.发送消息
  • 三、减少分区数量
    • 1.停止业务服务进程
    • 2.停止kafka服务进程
    • 3.重新启动kafka服务
    • 4.重新启动业务服务
  • 参考文章

一、Kafka是什么?

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。可满足每秒百万级的消息生产和消费;有一套完善的消息存储机制,确保数据高效安全且持久化;Kafka作为一个集群运行在一个或多个服务器上,可以跨多个机房,当某台故障时,生产者和消费者转而使用其他的Kafka。

消息系统:Publish/subscribe(发布/订阅者)模式

1.消息发布者发布消息到主题中,有多个订阅者消费该消息。
2.当发布者发布消息时,不管是否有订阅者都不会报错。
3.一定要先有消息发布者,后有消息订阅者。

相关术语

1.Broker:Kafka服务器,负责创建topic、消息存储和转发。
2.Topic:消息类别(主题),用于区分消息。
3.Partition:分区,真正的存储数据单元。每个Topic包含一个或多个分区,用于保存消息和维护偏移量。(一般为kafka节点数CPU的总核心数量)
4.offset:分区消息此时被消费的位置。分区中消息的唯一id。
5.Producer:消息生产者。
6.Consumer:消息消费者。
7.Consumer Group:消费者组。由消费不同的分区的多个消费者实例组成,共用同一个Group-id。
8.Message:消息,由offset(分区上的消息id)、MessageSize(消息内容data大小)、data(消息具体内容)组成。

二、初步使用

1.yml文件配置

spring:kafka:bootstrap-servers: http://127.0.0.1:9002properties:security:protocol: SASL_PLAINTEXTsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456";producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消费者超时时间 6properties:max:poll:interval:ms: 6000listener:# 在侦听器容器中运行的线程数。消费者组中的实例数量。 【本次重点】concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

2.生产者类

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Component
@Slf4j
public class KafkaProducer {// 消费者组public static final String TOPIC_GROUP2 = "topic.group2";@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public void send(String topic,Object obj) {String obj2String = JSONObject.toJSONString(obj);log.info("准备发送消息为:{}", obj2String);//发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {//发送失败的处理log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult) {//成功的处理log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}
}

3.消费者类

使用注解的方式来创建主题和分区。

package com.lezhi.szxy.oa.core.kafka;import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class KafkaConsumer {@Resourceprivate addService addService;@Resourceprivate RedisLockUtil redisLockUtil;@ResourceRedissonClient redissonClient;@ResourceRedisTemplate<String,String> redisTemplate;private static final String ADD_LOCK_PREFIX = "ADD_LOCK_PREFIX";ObjectMapper objectMapper = new ObjectMapper();/*** 初始化主题分区* @return*/@Beanpublic NewTopic batchTopic() {log.info("初始化主题分区batchTopic : add_topic,分区:5,副本数:1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");return new NewTopic("add_topic", 5, (short) 1);}/*** 添加消息* @param ack*/@KafkaListener(topics = "add_topic"C,groupId = KafkaProducer.TOPIC_GROUP2)public void handleAddMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("add_topic-队列消费端 topic:{}, 收到消息>>>>>>>>>>>>>>>>>", topic);Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {Object msg = message.get();try {ParamImport param =  objectMapper.readValue(String.valueOf(msg) , ParamImport .class);String fullKey = redisLockUtil.getFullKey(ADD_LOCK_PREFIX , String.valueOf(msg));if(redisLockUtil.getLock(fullKey , 10000)){// 业务代码...log.info("add_topic 消费了: Topic:" + topic + ",Message:" + String.valueOf(msg));}else {log.info("add_topic 已经被消费: Topic:" + topic + ",Message:" + String.valueOf(msg));}ack.acknowledge();} catch (Exception e) {e.printStackTrace();log.error("解析 <"+OaConstant.SALARY_SEND_MESSAGE_KAFKA_TOPIC+"> 数据异常");}}}
}

配置消费端主题分区启动后,查看kafka,add_topic主题生成五个分区实例
kafka配置
注意:一个消费线程,可以对应若干分区。但是为了保证数据的一致性,同一个分区同时只能备一个消费者实例消费,所以超过分区数量的消费者实例个数是多余的,会被闲置。

将消费者实例(消费线程)比为一个人,分区消息相当于一个办公位。办公位数>人数时,哪个办公位有消息待消费,人就到哪一个工位处理消息。当办公位数<人数时,后面的人数需要排队等待前面的人离开,才可以进入办公位消费。
当人再多时,只有一个办公位,人也得排队办公,属于同步消费;当办公位有多个时,才能实现多人同时操作。

单机kafka分区最好不超过5。默认使用轮询策略。

4.发送消息

public void addTopicMsg(ParamImport param) throws ServiceException {String json;try {json = objectMapper.writeValueAsString(param);} catch (JsonProcessingException e) {log.error("addTopicMsg-发送消息,kafka消息转换失败:{}", e);throw new ServiceException("发送失败");}log.info("addTopicMsg-发送消息,发送kafka请求>>>>>>>>>>>>>>>>>>>>>>>");kafkaTemplate.send("add_topic", json);}

三、减少分区数量

上文中,我们使用了new NewTopic()的方式创建分区,分区数量只能动态增加不能减少。所以我们需要根据以下步骤来重新生成分区,达成减少分区的目的。

1.停止业务服务进程

停止业务服务进程,使得不会重复生成分区。修改代码内配置的new NewTopic()配置分区数。

2.停止kafka服务进程

停止kafka服务进程,清空分区、主题等数据。

3.重新启动kafka服务

4.重新启动业务服务

此时就会根据修改后的分区设置重新生成分区。

参考文章

【SpringBoot】在Springboot中怎么设置Kafka自动创建Topic
SpringBoot+Kafka之如何优雅的创建topic
想弄明白Kafka到底是什么吗?看完这篇你就知道了!(概念、数据存储、生产者、消费者)
图解Kafka,看本篇就足够啦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/633079.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode刷题笔记题解(C++):200. 岛屿数量

思路&#xff1a;利用深度优先搜索的思路来查找1身边的1&#xff0c;并且遍历之后进行0替换防止重复dfs&#xff0c;代码如下所示 class Solution { public:int numIslands(vector<vector<char>>& grid) {int row grid.size();int col grid[0].size();int n…

滚动菜单+图片ListView

目录 Fruit.java FruitAdapter MainActivity activity_main.xml fruit.xml 整体结构 Fruit.java public class Fruit {private String name;private int imageId;public Fruit(String name, int imageId) {this.name name;this.imageId imageId;}public String getNam…

【从零开始学习Redis | 第七篇】利用Redis构造全局唯一ID(含其他构造方法)

目录 前言&#xff1a; 什么是全局唯一ID&#xff1f; 尝试构造全局唯一ID&#xff1a; 其他构造全局唯一ID的方法 1.基于数据库自增构造全局唯一ID&#xff1a; 2.基于UUID构造全局唯一ID&#xff1a; 3.基于雪花算法构造全局唯一ID&#xff1a; 总结&#xff1a; 前…

在客户端访问远程Linux服务器的私有IP地址的URL

文章目录 环境背景SSH tunnel和正向/反向代理步骤第一步第二步效果考一考 其它多次跳转另一种方法&#xff1a;正向代理 参考 环境 服务器&#xff1a;Ubuntu 22.04客户端&#xff1a;Mac 14.2.1 背景 在远程Linux服务器上搭建了minikube环境。minikube提供了dashboard功能&…

ChatGPT付费创作系统V2.5.8独立版+前端

小狐狸ChatGPT付费创作系统V2.5.8版本最大特点新增PC端绘画功能全新升级。该版本为编译版无开源&#xff0c;本版本同样处理了后台弹窗、暗链网址。单独制作了2.5.5升级至2.5.8数据库升级包及升级文件&#xff0c;直接导入即可使用。本版本升级后唯一BUG后台绘画功能新增的翻译…

酷开科技将AR技术多方应用 打造全能酷开系统

酷开系统AR技术的核心是通过计算机视觉、图形渲染和深度感知等技术&#xff0c;将虚拟物体或信息精确地叠加到现实世界的场景中。通过智能摄像头捕捉真实环境的图像和视频&#xff0c;结合3D渲染技术&#xff0c;生成与现实场景相融合的虚拟图像&#xff0c;实现虚实结合的视觉…

最新ChatGPT/GPT4科研应用与AI绘图及论文高效写作

详情点击链接&#xff1a;最新ChatGPT/GPT4科研应用与AI绘图及论文高效写作 一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析&#xff0c;AI画图&#xff0c;图像识别&#xff0c;文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Gemini以及大模型Clau…

编译FFmpeg4.3.1 、x264并移植到Android

1、前言 FFmpeg 既是一款音视频编解码工具&#xff0c;同时也是一组音视频编解码开发套件。 2、准备工作 系统&#xff1a;LinuxNDK&#xff1a;android-ndk-r21b-linux-x86_64.zipFFmpeg&#xff1a;ffmpeg-snapshot.tar.bz2x264&#xff1a;x264 3、下载NDK 在linux环境中…

hanlp,pkuseg,jieba,cutword分词实践

总结&#xff1a;只有jieba,cutword,baidu lac成功将色盲色弱成功分对,这两个库字典应该是最全的 hanlp[持续更新中] https://github.com/hankcs/HanLP/blob/doc-zh/plugins/hanlp_demo/hanlp_demo/zh/tok_stl.ipynb import hanlp # hanlp.pretrained.tok.ALL # 语种见名称最…

RabbitMQ交换机(3)-Topic

1.Topic模式 RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题&#xff08;topic&#xff09;发布消息&#xff0c;同时&#xff0c;订阅者也可以针对自己感兴趣的主题进行订阅。 在Topic模式中&#xff0c; 主题通过一个由单词和点号组成的字…

Git教程学习:07 打标签

文章目录 0 前言1 列出标签2 创建标签3 附注标签4 轻量标签5 后期打标签6 共享标签7 删掉标签8 检查标签 0 前言 像其他版本控制系统&#xff08;VCS&#xff09;一样&#xff0c;Git 可以给仓库历史中的某一个提交打上标签&#xff0c;以示重要。 比较有代表性的是人们会使用…

Unity使用Protobuf

1.下载Protobuf ProtoBuf 2.打开它并且编译 如果有报错下载相应的.net版本即可 这里默认是6.0.100 由于我本机是8.0.100所以我改了这个文件 3.编译后的文件复制到Unity Assets/Plugins下 4.写个测试的proto文件 5.然后使用protoc生成 这里实现了一个简单的bat批量生成 Protos C…

微软与沃达丰签订10年合作,提供Copilot等生成式AI服务

1月16日&#xff0c;微软在官网宣布&#xff0c;与全球最大电信公司之一沃达丰&#xff08;Vodafone&#xff09;签订10年合作协议&#xff0c;将为3亿多企业、消费者提供生成式AI、云和数字服务等。 通过此次合作&#xff0c;沃达丰将利用微软的Copilot等生成式AI来改变客户、…

新版AndroidStudio dependencyResolutionManagement出错

在新版AndroidStudio中想像使用4.2版本或者4.3版本的AndroidStudio来构造项目&#xff1f;那下面这些坑我们就需要来避免了&#xff0c;否则会出各种各样的问题。 一.我们先来看看新旧两个版本的不同。 1.jdk版本的不同 新版默认是jdk17 旧版默认是jdk8 所以在新版AndroidSt…

FlinkAPI开发之状态管理

案例用到的测试数据请参考文章&#xff1a; Flink自定义Source模拟数据流 原文链接&#xff1a;https://blog.csdn.net/m0_52606060/article/details/135436048 Flink中的状态 概述 有状态的算子 状态的分类 托管状态&#xff08;Managed State&#xff09;和原始状态&…

如何用GPT进行数据处理?

详情点击链接&#xff1a;如何用GPT进行数据处理&#xff1f; 一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析&#xff0c;AI画图&#xff0c;图像识别&#xff0c;文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Gemini以及大模型Claude2二定制自己…

线性代数基础【5】特征值和特征向量

第五章 特征值和特征向量 第一节、特征值和特征向量的基本概念 一、特征值和特征向量的理论背景 在一个多项式中,未知数的个数为任意多个,且每一项次数都是2的多项式称为二次型,二次型分为两种类型:即非标准二次型及标准二次型 注意: ①二次型X^T AX为非标准二次型的充分必…

docker部署项目,/var/lib/docker/overlay2目录满了如何清理?

docker部署项目&#xff0c;/var/lib/docker/overlay2目录满了如何清理&#xff1f; 一、问题二、解决1、查看 /var/lib/docker 目录&#xff08;1&#xff09;、containers 目录&#xff08;2&#xff09;、volumes 目录&#xff08;3&#xff09;、overlay2 目录 2、清理&…

总结1094

昨天又摆烂了&#xff0c;总结后面补的。 记录一个as中的错误&#xff1a; 一年没碰android了&#xff0c;下载安装AS,发现出现Gradle问题&#xff1a; ERROR: Connection timed out: connect&#xff1a;错误:连接超时:连接 这个错误 一般来说是因为.gradle引起的 搞了半…

Python编辑开发---pycharm pro 2023 中文

PyCharm Pro 2023是一款功能强大的Python集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在提高Python开发人员的生产力。它提供了智能代码编辑、实时代码分析和调试工具&#xff0c;支持版本控制和数据库工具&#xff0c;以及可扩展的插件系统。PyCharm Pro 2023可在多…