某马头条——day06

自媒体文章上下架

使用消息队列在自媒体下架时通知文章微服务。 

kafka概述

 

kafka环境搭建

docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

安装kafka

docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门案例

 

依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

生产者类 

package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功producer.close();}}

 消费者类

package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

存在一个消费者组和多个消费者组的区别

 分区机制

相当于数据分片?redis也有这个机制。

高可用设计方案

kafka生产者详解

发送类型

参数详解

acks=0和UDP很像

kafka消费者详解

多个分区无法保证消息的有序性,相当于,a先后发了两条消息给b,两条消息进了不同的分区,结果因为网络原因导致b先接收到了第二条消息。

提交和偏移量

这里因为不是每次消费都提交就会出现丢失和重复的问题。

偏移量提交方式

SpringBoot集成kafka收发消息

1.导入spring-kafka依赖信息

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>

2.在resources下创建文件application.yml

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 3.消息生产者

package com.heima.kafka.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic","黑马程序员");return "ok";}
}

4.消息消费者

package com.heima.kafka.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
}

传递消息为对象时

发送

@GetMapping("/hello")
public String hello(){User user = new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";
}

接受

@Component
public class HelloListener {@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user);}}
}

自媒体文章上下架功能实现

  • 已发表且已上架的文章可以下架

  • 已发表且已下架的文章可以上架

流程说明

 文章表存在一个属性字段是否上架

功能接口开发

在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return wmNewsService.downOrUp(dto);
}

在WmNewsDto中新增enable属性 

    /*** 上下架 0 下架  1 上架*/private Short enable;

在WmNewsService新增方法

/*** 文章的上下架* @param dto* @return*/
public ResponseResult downOrUp(WmNewsDto dto);

实现方法

/*** 文章的上下架* @param dto* @return*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {//1.检查参数if(dto.getId() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews = getById(dto.getId());if(wmNews == null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()).eq(WmNews::getId,wmNews.getId()));}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

测试通过

消息通知article数据同步

 

在heima-leadnews-common模块下导入kafka依赖

<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

在自媒体端的nacos配置中心配置kafka的生产者

spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){Map<String,Object> map = new HashMap<>();map.put("articleId",wmNews.getArticleId());map.put("enable",dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}

在article端的nacos配置中心配置kafka的消费者

spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

在article端编写监听,接收数据

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}

新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {/*** 修改文章配置* @param map*/public void updateByMap(Map map);
}

实现

@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {/*** 修改文章配置* @param map*/@Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}//修改文章配置update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));}
}

测试成功tmd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/636071.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第四十五天| 70.爬楼梯(进阶)、322.零钱兑换、279.完全平方数

代码随想录算法训练营第四十五天| 70.爬楼梯&#xff08;进阶&#xff09;、322.零钱兑换、279.完全平方数 题目 70.爬楼梯&#xff08;进阶&#xff09; 57.爬楼梯&#xff08;第八期模拟笔试&#xff09; https://kamacoder.com/problempage.php?pid1067 题目描述 假设…

【C++PCL】点云处理DBSCAN点云聚类分割

作者:迅卓科技 简介:本人从事过多项点云项目,并且负责的项目均已得到好评! 公众号:迅卓科技,一个可以让您可以学习点云的好地方 本专栏特色:根据经验和大家分享每个参数的调试规范,解决大家因为参数的问题而产生的苦恼。 目录 1.原理介绍 2.代码效果 3.源码展示

后台管理系统: spu管理模块

spu管理模块业务 spu 可以理解为类 例如 people类【spu】 sku可以理解为实例 例如&#xff1a;小明 18 男 spu跟sku可以理解为类跟多个实例的关系 spu管理模块静态 <template><div><el-card style"margin: 20px 0px"><CategorySelect get…

ARM安装与项目结构

1. 安装环境 参考E:\peixunQianrushi\arm\ziliao\FS4412新版&#xff08;学生资料&#xff09;\环境相关资料 这边建议全部默认路径 安装注意事项&#xff1a; 1、在接下来的安装过程中&#xff0c;对于使用win10、win8的操作系统的用户&#xff0c;所有的安装请均以管理员身份…

POKT Network (POKT) :进军百亿美元市场规模的人工智能推理市场

POKT Network&#xff08;又称 Pocket Network&#xff09;是一个去中心化的物理基础设施网络&#xff08;DePIN&#xff09;&#xff0c;它能够协调并激励对任何开放数据源的访问&#xff0c;最初专注于向应用程序和服务提供商提供区块链数据。 自 2020 年主网上线以来&#x…

第十五章 : Spring Cloud全链路监控(Pinpoint实战)

第十五章 : Spring Cloud全链路监控(Pinpoint实战) 前言 本章知识点: Pinpoint的发展历程、特点、优势以及整体架构;数据结构以及对集成框架的兼容性以及Pinpoint实战。 Springboot 2.3.12.RELEASE,spring cloud Hoxton.SR12,spring cloud alibaba 2.2.9.RELEASE发展历…

【GitHub项目推荐--推荐 5 个炫炫炫的可视化项目】【转载】

数据可视化就是将抽象的数据通过视觉的方式进行展示&#xff0c;能让用户直观的看到数据中蕴含的信息和规律。 本篇文章&#xff0c;整理了 5 个可视化开源项目&#xff0c;其中包括可视化制作低代码平台、大屏可视化、地图可视化、热图、图标可视化等等。 00. 数据大屏可视化…

Rust基础语法1

所有权转移&#xff0c;Rust中没有垃圾收集器&#xff0c;使用所有权规则确保内存安全&#xff0c;所有权规则如下&#xff1a; 1、每个值在Rust中都有一个被称为其所有者&#xff08;owner&#xff09;的变量&#xff0c;值在任何时候只能有一个所有者。 2、当所有者离开作用域…

WordPress微信一键关注免认证登录插件

插件介绍 WordPress微信免认证快捷登录插件&#xff1a;订阅号也能一键通行 这款WordPress插件专为个人用户打造&#xff0c;无需繁琐的服务号申请与认证流程。即使您只有未认证的订阅号&#xff0c;也能轻松实现关注公众号后一键登录网站的功能&#xff01; 配置步骤简单明…

【算法详解】力扣88.合并两个有序数组

一、题目介绍 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。 注意&#xff1a;最终&#xf…

动态规划基础(二)最长公共子序列 LCS

讲解求两个串中最长的公共的子序列长度或输出子序列等 poj1458 题目大意 给定两个字符串&#xff0c;要求输出两个字符串中最长公共子序列长度 思路 我们定义 a [ i ] [ j ] a[i][j] a[i][j]为&#xff0c;当字串 s t r 1 str1 str1到 i i i位置&#xff0c;字串 s t r 2 s…

采埃孚: 优化1/4员工;苹果Vision Pro开抢,黄牛9万一台 ;招商银行:2023年净利润1466亿元

今日精选 • 采埃孚: 优化1/4员工• 苹果Vision Pro开抢&#xff0c;黄牛9万一台• 招商银行&#xff1a;2023年净利润1466亿元&#xff0c;同比增长6.22% 科技动态 • OpenAI CEO 拟募集百亿级美元&#xff0c;建立 AI 芯片工厂网络以满足需求• 中西医结合“数智岐黄”大模…

【接上篇】二、Flask学习之CSS(下篇)

上篇&#xff1a;二、Flask学习之CSS 3.8hover hover是用来美化鼠标悬停的效果的&#xff0c;当鼠标停放在某个区域&#xff0c;就会执行对应的hover操作。可以操作本标签的内容&#xff0c;也可以操作本标签下某一个标签的内容 3.9after <!DOCTYPE html> <html l…

Unity3d C#实现场景编辑/运行模式下3D模型XYZ轴混合一键排序功能(含源码工程)

前言 在部分场景搭建中需要整齐摆放一些物品&#xff08;如仓库中的货堆、货架等&#xff09;&#xff0c;因为有交互的操作在单个模型上&#xff0c;每次总是手动拖动模型操作起来也是繁琐和劳累。 在这背景下&#xff0c;我编写了一个在运行或者编辑状态下都可以进行一键排序…

C#设计模式教程(10):装饰器模式

装饰器模式的定义 装饰器模式(Decorator Pattern)是一种结构型设计模式,它允许用户在不修改现有对象结构的情况下,动态地给一个对象添加额外的职责。这种模式创建了一个装饰类,用来包装原有的类。 这种模式创建了一个装饰类,用于包装原有的类,并在保持类方法签名完整性…

JS-元素尺寸与位置

通过js的方式&#xff0c;得到元素在页面中的位置 获取宽高 元素.offsetWidth 元素.offsetHeight 1&#xff09;获取元素的自身宽高、包括元素自身设置的宽高paddingborder 2&#xff09;获取出来的是数值&#xff0c;方便计算 3&#xff09;注意&#xff1a;获取的是可视…

C++中的指针、引用;左值、右值;左值引用、右值引用

一、指针、引用 引用指的是为已经创建的对象重新起一个名字。创建引用的时候&#xff0c;编译器只是将这个别名绑定到引用的对象上。 对象名提供了一种直接访问数据的方式&#xff0c;因为对象名本质上是数据所在的内存地址空间的一个地址映射。 引用提供了一种简介访问数据…

在 Python 中实现语音合成的四种方法

1 离线合成 pytts 配置环境 $ apt-get update $ apt-get install espeak $ pip install pyttsx3 $ apt-get install ffmpeg $ apt-get install alsa-utils运行程序 import pyttsx3engine pyttsx3.init() engine.setProperty(rate, 150) engine.setProperty(volume, 0.7)tex…

Python项目——搞怪小程序(PySide6+Pyinstaller)

1、介绍 使用python编写一个小程序&#xff0c;回答你是猪吗。 点击“是”提交&#xff0c;弹窗并退出。 点击“不是”提交&#xff0c;等待5秒&#xff0c;重新选择。 并且隐藏了关闭按钮。 2、实现 新建一个项目。 2.1、设计UI 使用Qt designer设计一个UI界面&#xff0c…

C#,入门教程(20)——列表(List)的基础知识

上一篇&#xff1a; C#&#xff0c;入门教程(19)——循环语句&#xff08;for&#xff0c;while&#xff0c;foreach&#xff09;的基础知识https://blog.csdn.net/beijinghorn/article/details/124060844 List顾名思义就是数据列表&#xff0c;区别于数据数组&#xff08;arr…