kafka及异步通知文章上下架

1)自媒体文章上下架

需求分析

2)kafka概述

消息中间件对比

特 性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

开 发 语 言

java

erlang

java

scala

单 机 吞 吐 量

万级

万级

10万级

100万级

时 效 性

ms

us

ms

ms级以内

可 用 性

高(主从)

高(主从)

非常高(分布 式)

非常高(分布 式)

功 能 特 性

成熟的产品、较全的 文档、各种协议支持 好

并发能力强、 性能好、延迟 低

MQ功能比较 完善,扩展性 佳

只支持主要的MQ功能, 主要应用于大数据领域

消息中间件对比-选择建议

消息中间 件

建议

Kafka

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

RocketMQ

靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验

RabbitMQ

性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的 RabbitMQ

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apach e.org/

kafka介绍-名词解释

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

3)kafka安装配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

https://blog.csdn.net/m0_70325779/article/details/137248462

4)kafka入门

生产者发送消息,多个消费者只能有一个消费者接收到消息生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
(2)生产者发送消息
/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException, 
InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.se
rialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.
serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>
(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = new 
ProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());//4.关闭消息通道 必须要关闭,否则消息发送不成功producer.close();}
}
(3)消费者接收消息
/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : 
consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}
}
使用情景:
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息(一对多)

springboot集成kafka入门

1.导入spring-kafka依赖信息
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
2.在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: 
org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
4.消息消费者

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
  • 接收消息

自媒体文章上下架功能完成

需求分析

已发表且已下架的文章可以上架

流程说明

接口定义

说明

接口路径

/api/v1/news/down_or_up

请求方式

POST

参数

DTO

响应结果

ResponseResult

DTO
@Data
public class WmNewsDto {private Integer id;/*** 是否上架 0 下架 1 上架*/private Short enable;}
ResponseResult

自媒体文章上下架-功能实现

接口定义

water-wemedia工程

在water-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return null;
}

在WmNewsDto中新增enable属性,完整的代码如下:

@Data
public class WmNewsDto {private Integer id;/*** 标题*/private String title;/*** 频道id*/private Integer channelId;/*** 标签*/private String labels;/*** 发布时间*/private Date publishTime;/*** 文章内容*/private String content;/*** 文章封面类型 0 无图 1 单图 3 多图 -1 自动*/private Short type;/*** 提交时间*/private Date submitedTime; /*** 状态 提交为1 草稿为0*/private Short status;/*** 封面图片列表 多张图以逗号隔开*/private List<String> images;/*** 上下架 0 下架 1 上架*/private Short enable;
}
业务层编写

在WmNewsService新增方法

/*** 文章的上下架* @param dto* @return*/
public ResponseResult downOrUp(WmNewsDto dto);
实现方法
/*** 文章的上下架* @param dto* @return*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {//1.检查参数if(dto.getId() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews = getById(dto.getId());if(wmNews == null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不
存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章
不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){update(Wrappers.
<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
.eq(WmNews::getId,wmNews.getId()));}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
控制器
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return wmNewsService.downOrUp(dto);
}
测试
消息通知article端文章上下架
在water-common模块下导入kafka依赖
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
在自媒体端的nacos配置中心配置kafka的生产者
spring:kafka:bootstrap-servers: localhost:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){Map<String,Object> map = new HashMap<>();map.put("articleId",wmNews.getArticleId());map.put("enable",dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONS
tring(map));
}
常量类:
public class WmNewsMessageConstants {public static final String 
WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: 
org.apache.kafka.common.serialization.StringDeserializer
在article端编写监听,接收数据
@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}

修改ap_article_config表的数据新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {/*** 修改文章配置* @param map*/public void updateByMap(Map map);
}
实现类:
@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends 
ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements 
ApArticleConfigService {/*** 修改文章配置* @param map*/@Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}//修改文章配置update(Wrappers.
<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articl
eId")).set(ApArticleConfig::getIsDown,isDown));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/51952.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何从 Bak 文件中恢复 SQL数据库?(3种方法)

如何从 .bak 文件恢复 SQL数据库&#xff1f; 在数据库管理和维护过程中&#xff0c;数据的安全性和完整性至关重要。备份文件&#xff08;.bak 文件&#xff09;是 SQL Server 中常用的数据库备份格式&#xff0c;它包含了数据库的完整副本&#xff0c;用于在数据丢失、系统故…

flutter与原生怎么交互的

Flutter 与原生平台(如 Android 和 iOS)之间的交互可以通过**平台通道(Platform Channels)**实现。这允许你在 Flutter 应用中调用原生代码,或者从原生代码中调用 Flutter 代码。这种机制使得你可以利用原生平台提供的特性和 API,同时保持大部分应用代码在 Flutter 中。 …

4. 第一个3D案例—创建3D场景

入门Three.js的第一步&#xff0c;就是认识场景Scene、相机Camera、渲染器Renderer三个基本概念&#xff0c;接下来&#xff0c;咱们通过三小节课&#xff0c;大家演示“第一个3D案例”完成实现过程。 学习建议&#xff1a;只要你能把第一个3D案例搞明白&#xff0c;后面学习就…

二百六十、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(复杂JSON)

一、目的 由于部分数据类型频率为1s&#xff0c;从而数据规模特别大&#xff0c;因此完整的JSON放在Hive中解析起来&#xff0c;尤其是在单机环境下&#xff0c;效率特别慢&#xff0c;无法满足业务需求。 而Flume的拦截器并不能很好的转换数据&#xff0c;因为只能采用Java方…

SEO之网站结构优化(十四-内部链接及权重分配3)

初创企业搭建网站的朋友看1号文章&#xff1b;想学习云计算&#xff0c;怎么入门看2号文章谢谢支持&#xff1a; 1、我给不会敲代码又想搭建网站的人建议 2、“新手上云”能够为你开启探索云世界的第一步 博客&#xff1a;阿幸SEO~探索搜索排名之道 7、锚文字分布及变化 前面…

新手c语言讲解及题目分享(十四)--函数专项练习(一)

目录 前言 一.函数的定义 1.函数定义包括的内容&#xff1a; Ⅰ.指定函数类别 Ⅱ.指定函数类型 Ⅲ.指定函数名 Ⅳ.指定函数的参数名称和类型 Ⅴ.指定函数的函数体 2.函数定义的一般形式&#xff1a; Ⅰ.有参函数的定义形式&#xff1a; Ⅱ.无参函数的定义形式&#x…

C语言从头学55——学习头文件errno.h、float.h

1、头文件 errno.h 中的变量 errno 的使用 在 errno.h 定义了一个 int 类型的变量 errno&#xff08;错误码&#xff09;&#xff0c;如果发现这个变量出现非零值&#xff0c;表示已经执行的函数发生了错误。这个变量一般多用于检查数学函数运算过程中发生的错误。 …

部署 Web 项目到 Linux,可以使他人也访问项目的方法

目录 一、环境配置 二、建构项目并打包 三、上传Jar包到服务器, 并运行 3.1 上传Jar包 3.2 运行 jar 包 3.3 开放端口号 四、其他问题 4.1 运行异常问题 4.2 杀掉进程 五、总结 一、环境配置 如果本地项目是SpringBoot项目&#xff0c;使用的数据库是MySQL&#xff…

ES6 类-总结

我们现在用一段代码&#xff0c; 在注释中总结所有关于JavaScript类的所有用法 class Student extends Person {//这里的Student是子类&#xff0c;Person是父类&#xff0c;extends是实现类之间的继承&#xff0c;它可以自动设置原型university 家里蹲大学; //公共字段(类似…

APP 数据抓取 - Charles 抓包工具的使用(Charles 端口配置、CA 证书配置、Charles Android 模拟器配置)

前言说明 此文章是我在学习 Charles APP 抓包时编写&#xff0c;内容都是亲测有效&#xff0c;文章内容也有参考其他人&#xff0c;参考文章如下&#xff1a; Android 手机使用 charles 抓 https 请求&#xff08;保姆级教程&#xff09;网易 mumu 模拟器安装下载 charles 的…

计算机网络(八股文)

这里写目录标题 计算机网络一、网络分层模型1. TCP/IP四层架构和OSI七层架构⭐️⭐️⭐️⭐️⭐️2. 为什么网络要分层&#xff1f;⭐️⭐️⭐️3. 各层都有那些协议&#xff1f;⭐️⭐️⭐️⭐️ 二、HTTP【重要】1. http状态码&#xff1f;⭐️⭐️⭐️2. 从输入URL到页面展示…

XSLT 实例:掌握 XML 转换的艺术

XSLT 实例&#xff1a;掌握 XML 转换的艺术 引言 XSLT&#xff08;可扩展样式表语言转换&#xff09;是一种强大的工具&#xff0c;用于将 XML&#xff08;可扩展标记语言&#xff09;文档转换为其他格式&#xff0c;如 HTML、PDF 或纯文本。在本文中&#xff0c;我们将通过一…

从Vuex 到 Pinia,Vue 状态管理的进化

Vue.js,一个轻量级且易于上手的 JavaScript 框架,已经在全球范围内获得了广泛的应用。 Vue.js 的状态管理库 Vuex,也为开发者提供了一个统一的状态管理方案。然而,随著 Vue.js 的发展和进化,我们看到了一个新的状态管理库的诞生 — Pinia。在这篇文章中,我们将探讨 Vuex…

2024年9月3日嵌入式学习

数据结构 1定义 一组用来保存一种或者多种特定关系的数据的集合&#xff08;组织和存储数据&#xff09; 程序的设计&#xff1a;将现实中大量而复杂的问题以特定的数据类型和特定的存储结构存储在内存中&#xff0c; 并在此基础上实现某个特定的功能的操作&am…

Springboot集成WebSocket客户端,发送消息并监测心跳

jar包&#xff08;主要jar包&#xff09; <dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.5.7</version></dependency>服务类 import cn.hutool.json.JSONUtil; impor…

「Python程序设计」条件控制:if-elif-else语句

我们在进行程序设计的过程中&#xff0c;基本上遵循的过程是&#xff0c;找出变量和常量&#xff0c;通过python编程语言&#xff0c;设置变量和常量&#xff0c;以及考虑是否需要赋予初始值。 设计变量和常量&#xff0c;其实就是为了模拟和计算我们的现实世界中&#xff0c;…

学习笔记--Docker

安装 1.卸载旧版 首先如果系统中已经存在旧的Docker&#xff0c;则先卸载&#xff1a; yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine 2.配置Docker的yum库 首先要安…

深入理解 JavaScript DOM 操作

一、DOM 操作分类 &#xff08;一&#xff09;元素查找 根据 ID 值查找&#xff1a;getElementById()&#xff0c;返回符合条件的第一个对象。 var aa document.getElementById("aa");console.log(aa);根据类名查找&#xff1a;getElementsByClassName()&#xff…

IntelliJ IDEA 自定义字体大小

常用编程软件自定义字体大全首页 文章目录 前言具体操作1. 打开设置对话框2. 设置编辑器字体3. 设置编译软件整体字体 前言 IntelliJ IDEA 自定义字体大小&#xff0c;统一设置为 JetBrains Mono 具体操作 【File】>【Settings...】>【Editor】>【Font】 统一设置…

C++:list篇

前言: 观看C的list前需要对链表有一些了解&#xff0c;如C语言的链表结构。本片仅介绍list容器中常用的接口函数概念以及使用。 list的概念&#xff1a; 简而言之&#xff0c;C的list是一个双向带哨兵位的链表容器模板 list的构造&#xff1a; 1.list():默认构造 2.li…