某马头条——day06

自媒体文章上下架

使用消息队列在自媒体下架时通知文章微服务。 

kafka概述

 

kafka环境搭建

docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

安装kafka

docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门案例

 

依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

生产者类 

package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功producer.close();}}

 消费者类

package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

存在一个消费者组和多个消费者组的区别

 分区机制

相当于数据分片?redis也有这个机制。

高可用设计方案

kafka生产者详解

发送类型

参数详解

acks=0和UDP很像

kafka消费者详解

多个分区无法保证消息的有序性,相当于,a先后发了两条消息给b,两条消息进了不同的分区,结果因为网络原因导致b先接收到了第二条消息。

提交和偏移量

这里因为不是每次消费都提交就会出现丢失和重复的问题。

偏移量提交方式

SpringBoot集成kafka收发消息

1.导入spring-kafka依赖信息

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>

2.在resources下创建文件application.yml

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 3.消息生产者

package com.heima.kafka.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic","黑马程序员");return "ok";}
}

4.消息消费者

package com.heima.kafka.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
}

传递消息为对象时

发送

@GetMapping("/hello")
public String hello(){User user = new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";
}

接受

@Component
public class HelloListener {@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user);}}
}

自媒体文章上下架功能实现

  • 已发表且已上架的文章可以下架

  • 已发表且已下架的文章可以上架

流程说明

 文章表存在一个属性字段是否上架

功能接口开发

在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return wmNewsService.downOrUp(dto);
}

在WmNewsDto中新增enable属性 

    /*** 上下架 0 下架  1 上架*/private Short enable;

在WmNewsService新增方法

/*** 文章的上下架* @param dto* @return*/
public ResponseResult downOrUp(WmNewsDto dto);

实现方法

/*** 文章的上下架* @param dto* @return*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {//1.检查参数if(dto.getId() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews = getById(dto.getId());if(wmNews == null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()).eq(WmNews::getId,wmNews.getId()));}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

测试通过

消息通知article数据同步

 

在heima-leadnews-common模块下导入kafka依赖

<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

在自媒体端的nacos配置中心配置kafka的生产者

spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){Map<String,Object> map = new HashMap<>();map.put("articleId",wmNews.getArticleId());map.put("enable",dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}

在article端的nacos配置中心配置kafka的消费者

spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

在article端编写监听,接收数据

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}

新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {/*** 修改文章配置* @param map*/public void updateByMap(Map map);
}

实现

@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {/*** 修改文章配置* @param map*/@Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}//修改文章配置update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));}
}

测试成功tmd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/636071.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

后台管理系统: spu管理模块

spu管理模块业务 spu 可以理解为类 例如 people类【spu】 sku可以理解为实例 例如&#xff1a;小明 18 男 spu跟sku可以理解为类跟多个实例的关系 spu管理模块静态 <template><div><el-card style"margin: 20px 0px"><CategorySelect get…

ARM安装与项目结构

1. 安装环境 参考E:\peixunQianrushi\arm\ziliao\FS4412新版&#xff08;学生资料&#xff09;\环境相关资料 这边建议全部默认路径 安装注意事项&#xff1a; 1、在接下来的安装过程中&#xff0c;对于使用win10、win8的操作系统的用户&#xff0c;所有的安装请均以管理员身份…

POKT Network (POKT) :进军百亿美元市场规模的人工智能推理市场

POKT Network&#xff08;又称 Pocket Network&#xff09;是一个去中心化的物理基础设施网络&#xff08;DePIN&#xff09;&#xff0c;它能够协调并激励对任何开放数据源的访问&#xff0c;最初专注于向应用程序和服务提供商提供区块链数据。 自 2020 年主网上线以来&#x…

【GitHub项目推荐--推荐 5 个炫炫炫的可视化项目】【转载】

数据可视化就是将抽象的数据通过视觉的方式进行展示&#xff0c;能让用户直观的看到数据中蕴含的信息和规律。 本篇文章&#xff0c;整理了 5 个可视化开源项目&#xff0c;其中包括可视化制作低代码平台、大屏可视化、地图可视化、热图、图标可视化等等。 00. 数据大屏可视化…

Rust基础语法1

所有权转移&#xff0c;Rust中没有垃圾收集器&#xff0c;使用所有权规则确保内存安全&#xff0c;所有权规则如下&#xff1a; 1、每个值在Rust中都有一个被称为其所有者&#xff08;owner&#xff09;的变量&#xff0c;值在任何时候只能有一个所有者。 2、当所有者离开作用域…

WordPress微信一键关注免认证登录插件

插件介绍 WordPress微信免认证快捷登录插件&#xff1a;订阅号也能一键通行 这款WordPress插件专为个人用户打造&#xff0c;无需繁琐的服务号申请与认证流程。即使您只有未认证的订阅号&#xff0c;也能轻松实现关注公众号后一键登录网站的功能&#xff01; 配置步骤简单明…

【接上篇】二、Flask学习之CSS(下篇)

上篇&#xff1a;二、Flask学习之CSS 3.8hover hover是用来美化鼠标悬停的效果的&#xff0c;当鼠标停放在某个区域&#xff0c;就会执行对应的hover操作。可以操作本标签的内容&#xff0c;也可以操作本标签下某一个标签的内容 3.9after <!DOCTYPE html> <html l…

Unity3d C#实现场景编辑/运行模式下3D模型XYZ轴混合一键排序功能(含源码工程)

前言 在部分场景搭建中需要整齐摆放一些物品&#xff08;如仓库中的货堆、货架等&#xff09;&#xff0c;因为有交互的操作在单个模型上&#xff0c;每次总是手动拖动模型操作起来也是繁琐和劳累。 在这背景下&#xff0c;我编写了一个在运行或者编辑状态下都可以进行一键排序…

JS-元素尺寸与位置

通过js的方式&#xff0c;得到元素在页面中的位置 获取宽高 元素.offsetWidth 元素.offsetHeight 1&#xff09;获取元素的自身宽高、包括元素自身设置的宽高paddingborder 2&#xff09;获取出来的是数值&#xff0c;方便计算 3&#xff09;注意&#xff1a;获取的是可视…

Python项目——搞怪小程序(PySide6+Pyinstaller)

1、介绍 使用python编写一个小程序&#xff0c;回答你是猪吗。 点击“是”提交&#xff0c;弹窗并退出。 点击“不是”提交&#xff0c;等待5秒&#xff0c;重新选择。 并且隐藏了关闭按钮。 2、实现 新建一个项目。 2.1、设计UI 使用Qt designer设计一个UI界面&#xff0c…

C#,入门教程(20)——列表(List)的基础知识

上一篇&#xff1a; C#&#xff0c;入门教程(19)——循环语句&#xff08;for&#xff0c;while&#xff0c;foreach&#xff09;的基础知识https://blog.csdn.net/beijinghorn/article/details/124060844 List顾名思义就是数据列表&#xff0c;区别于数据数组&#xff08;arr…

WGAN损失函数解读

WGAN是Wasserstein GAN 解读

【大数据Hive】hive 行列转换使用详解

目录 一、前言 二、使用场景介绍 2.1 使用场景1 2.2 使用场景2 三、多行转多列 3.1 case when 函数 语法一 语法二 操作演示 3.2 多行转多列操作演示 四、多行转单列 4.1 concat函数 语法 4.2 concat_ws函数 语法 4.3 collect_list函数 语法 4.4 collect_set函…

dpwwn:02

靶场下载地址 https://download.vulnhub.com/dpwwn/dpwwn-02.zip 环境配置 当打开此虚拟机环境的时候&#xff0c;可能会出现&#xff1a;当前硬件版本不支持设备“sata”。然后启动失败的情况~ 解决办法参考&#xff1a;https://www.cnblogs.com/yaodun55/p/16434468.html …

x-cmd pkg | fanyi - 命令行中英文翻译工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 fanyi 是命令行翻译工具&#xff0c;翻译数据来源于 icba.com 和 fanyi.youdao.com&#xff0c;仅支持中英文互译。支持 ChatGPT&#xff0c;可通过设置 OpenAI API 密钥以启用 ChatGPT 翻译。 注意&#xff1a;在 L…

Flink SQL

Flink SQL 来源&#xff1a;B站尚硅谷 sql-client准备 Table API和SQL是最上层的API&#xff0c;在Flink中这两种API被集成在一起&#xff0c;SQL执行的对象也是Flink中的表&#xff08;Table&#xff09;&#xff0c;所以我们一般会认为它们是一体的。Flink是批流统一的处理…

本地运行LlaMA 2的简易指南

大家好&#xff0c;像LLaMA 2这样的新开源模型已经变得相当先进&#xff0c;并且可以免费使用。可以在商业上使用它们&#xff0c;也可以根据自己的数据进行微调&#xff0c;以开发专业版本。凭借其易用性&#xff0c;现在可以在自己的设备上本地运行它们。 本文将介绍如何下载…

numpy数组的max、min、argmax和argmin计算方法

numpy数组的max、min、argmax和argmin计算方法 官方对numpy.max和numpy.min的说明 numpy.max 参考官方的理解 数组&#xff1a; 24611529 import numpy as npif __name__ __main__:a np.array([[2, 4, 6, 1], [1, 5, 2, 9]])print(a)print(np.argmax(a, axis0)) # ax…

Java医院信息管理系统

技术框架&#xff1a; springboot shiro layui jquery thymeleaf nginx 有需要的可以联系我。 运行环境&#xff1a; jdk8 mysql IntelliJ IDEA maven项目功能&#xff1a; 本项目是用springbootlayuishiro写的医院管理系统&#xff0c;系统的业务比较复杂&#x…

11 - PXC集群|MySQL存储引擎

PXC集群&#xff5c;MySQL存储引擎 数据库系列文章PXC集群配置集群测试集群 MySQL存储引擎存储引擎介绍mysql服务体系结构mysql服务的工作过程处理查询访问的工作过程处理存储insert访问的工作过程 什么是搜索引擎 存储引擎管理查看存储引擎修改存储引擎 存储引擎特点myisam存储…