1)自媒体文章上下架
需求分析
2)kafka概述
消息中间件对比
特 性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
开 发 语 言 | java | erlang | java | scala |
单 机 吞 吐 量 | 万级 | 万级 | 10万级 | 100万级 |
时 效 性 | ms | us | ms | ms级以内 |
可 用 性 | 高(主从) | 高(主从) | 非常高(分布 式) | 非常高(分布 式) |
功 能 特 性 | 成熟的产品、较全的 文档、各种协议支持 好 | 并发能力强、 性能好、延迟 低 | MQ功能比较 完善,扩展性 佳 | 只支持主要的MQ功能, 主要应用于大数据领域 |
消息中间件对比-选择建议
消息中间 件 | 建议 |
Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | 靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的 RabbitMQ |
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apach e.org/
kafka介绍-名词解释
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
3)kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
https://blog.csdn.net/m0_70325779/article/details/137248462
4)kafka入门
生产者发送消息,多个消费者只能有一个消费者接收到消息生产者发送消息,多个消费者都可以接收到消息
(1)创建kafka-demo项目,导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
(2)生产者发送消息
/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException,
InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.se
rialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.
serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>
(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = new
ProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());//4.关闭消息通道 必须要关闭,否则消息发送不成功producer.close();}
}
(3)消费者接收消息
/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}
}
使用情景:
- 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息(一对一)
- 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息(一对多)
springboot集成kafka入门
1.导入spring-kafka依赖信息
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
2.在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:
org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
4.消息消费者
传递消息为对象
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
- 发送消息
- 接收消息
自媒体文章上下架功能完成
需求分析
已发表且已下架的文章可以上架
流程说明
接口定义
说明 | |
接口路径 | /api/v1/news/down_or_up |
请求方式 | POST |
参数 | DTO |
响应结果 | ResponseResult |
DTO
@Data
public class WmNewsDto {private Integer id;/*** 是否上架 0 下架 1 上架*/private Short enable;}
ResponseResult
自媒体文章上下架-功能实现
接口定义
water-wemedia工程
在water-wemedia工程下的WmNewsController新增方法
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return null;
}
在WmNewsDto中新增enable属性,完整的代码如下:
@Data
public class WmNewsDto {private Integer id;/*** 标题*/private String title;/*** 频道id*/private Integer channelId;/*** 标签*/private String labels;/*** 发布时间*/private Date publishTime;/*** 文章内容*/private String content;/*** 文章封面类型 0 无图 1 单图 3 多图 -1 自动*/private Short type;/*** 提交时间*/private Date submitedTime; /*** 状态 提交为1 草稿为0*/private Short status;/*** 封面图片列表 多张图以逗号隔开*/private List<String> images;/*** 上下架 0 下架 1 上架*/private Short enable;
}
业务层编写
在WmNewsService新增方法
/*** 文章的上下架* @param dto* @return*/
public ResponseResult downOrUp(WmNewsDto dto);
实现方法
/*** 文章的上下架* @param dto* @return*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {//1.检查参数if(dto.getId() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews = getById(dto.getId());if(wmNews == null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不
存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章
不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){update(Wrappers.
<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
.eq(WmNews::getId,wmNews.getId()));}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
控制器
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){return wmNewsService.downOrUp(dto);
}
测试
消息通知article端文章上下架
在water-common模块下导入kafka依赖
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
在自媒体端的nacos配置中心配置kafka的生产者
spring:kafka:bootstrap-servers: localhost:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){Map<String,Object> map = new HashMap<>();map.put("articleId",wmNews.getArticleId());map.put("enable",dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONS
tring(map));
}
常量类:
public class WmNewsMessageConstants {public static final String
WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:
org.apache.kafka.common.serialization.StringDeserializer
在article端编写监听,接收数据
@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}
修改ap_article_config表的数据新建ApArticleConfigService
public interface ApArticleConfigService extends IService<ApArticleConfig> {/*** 修改文章配置* @param map*/public void updateByMap(Map map);
}
实现类:
@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends
ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements
ApArticleConfigService {/*** 修改文章配置* @param map*/@Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}//修改文章配置update(Wrappers.
<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articl
eId")).set(ApArticleConfig::getIsDown,isDown));}
}