【RocketMQ】快速入门

文章目录

  • 消费模式
  • 同步消息
  • 异步消息
  • 单向消息
  • 延迟消息
  • 批量消息
  • 顺序消息
  • 事务消息
  • Tag标签和Key键
    • Tag的使用
    • Key的使用

首先引入rocketmq的依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version>
</dependency>

然后我们编写一个简单的生产者和消费者

@SpringBootTest
public class RocketMQTest {/*** 对于生产者 同一组的生产者可以向不同的topic队列发送消息*/@Testpublic void produce() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();Message message = new Message("testTopic","一个简单的消息".getBytes());SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());producer.shutdown();}/*** 对于消费者 同一组的消费者只能接收同一个topic的消息* 并且如果存在多个消费者组,他们都监听同一个topic的消息* 那么就可以选择使用  负载均衡策略 或者 广播策略*/@Testpublic void consume() throws MQClientException, IOException {//创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);// * 标识订阅这个主题中的所有消息  后期会有消息过滤consumer.subscribe("testTopic", "*");//设置一个监听器 (他会一直监听,然后是一个异步回调的机制)//那么我们就不能让他start之后这个方法就返回结束 需要挂起当前的JVM(test模式得这样子)//正常运行项目的时候项目的JVM会正常运行的 不需要挂起consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {//这个就是对应的消费方法  业务处理//消息如果消费失败 那么就要重新放入到消费队列System.out.println("我是消费者");System.out.println(list.get(0).toString());System.out.println("消息上下文"+context);//返回值如果为null/报错/RECONSUMER_LATER 代表消费失败//消息会重新回到队列 然后过一会在投递给当前消费者或者其他消费者return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前的JVMSystem.in.read();}
}

这里需要注意的是,对于Rocketmq,如果在你的监听器中,也就是这个MessageListenerConcurrently中,你的返回值为null,或者ConsumeConcurrentlyStatus.RECONSUME_LATER,亦或者抛出了一个异常,那么这条消息都会重新的被放回到我们的队列中,等待其他消费者或者当前消费者再一次消费。

消费模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式

同步消息

上面的快速入门就是发送同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式

异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

@Test
public void testAsyncProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();Message msg = new Message("testTopic", ("异步消息").getBytes());producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable e) {System.out.println("发送失败");}});System.out.println("看看谁先执行");// 挂起jvm 因为回调是异步的不然测试不出来System.in.read();// 关闭实例producer.shutdown();
}

单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

@Test
public void testOnewayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();Message msg = new Message("testTopic", ("单向消息").getBytes());// 发送单向消息producer.sendOneway(msg);// 关闭实例producer.shutdown();
}

延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
这里注意的是RocketMQ不支持任意时间的延时
只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

@Test
public void testDelayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();Message msg = new Message("TopicTest", ("延迟消息").getBytes());// 给这个消息设定一个延迟等级// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);// 发送单向消息producer.send(msg);// 打印时间System.out.println(new Date());// 关闭实例producer.shutdown();
}

批量消息

批量消息就是一次性发送一个消息集合出去。

@Test
public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();List<Message> messages = Arrays.asList(new Message("testTopic", "批量消息1".getBytes()),new Message("testTopic", "批量消息2".getBytes()),new Message("testTopic", "批量消息3".getBytes()));producer.send(messages);System.out.println("批量执行任务");// 挂起jvm 因为回调是异步的不然测试不出来System.in.read();// 关闭实例producer.shutdown();
}

顺序消息

我们知道一个topic中可以有多个队列,那么如果我们的消息发送到多个队列中去,那么很明显我们的消息消费就是并行消费的,也就是没有了顺序性。
因此如果我们需要发送顺序消息,也就是希望MQ那边的消费者顺序的消费一些消息,我们就得按照如下方式发送顺序消息。
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。
可能大家会有疑问,mq不就是FIFO吗?
rocketMq的broker的机制,导致了rocketMq会有这个问题
因为一个broker中对应了四个queue。

不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。

package zhang.blossom.seckillbyrocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import zhang.blossom.seckillbyrocketmq.constant.MQConstant;
import zhang.blossom.seckillbyrocketmq.entity.MsgModel;import java.io.IOException;
import java.util.Arrays;
import java.util.List;/*** @author: 张锦标* @date: 2023/8/17 9:58* OrderedRocketMQTest类*/@SpringBootTest
public class OrderedRocketMQTest {private List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1L, "下单"),new MsgModel("qwer", 1L, "短信"),new MsgModel("qwer", 1L, "物流"),new MsgModel("zxcv", 2L, "下单"),new MsgModel("zxcv", 2L, "短信"),new MsgModel("zxcv", 2L, "物流"));//发送顺序消息@Testpublic void orderedProducer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();//发送顺序消息 发送时要确保有序  并且要发送到同一个队列下面去msgModels.forEach(msgModel -> {Message message = new Message("testTopic",msgModel.toString().getBytes());try {//发送  相同的订单号应该去相同的队列producer.send(message, new MessageQueueSelector() {//这里的send方法的第三个参数arg 就是这个队列选择器的第三个参数 会传递过来@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object arg) {//这个方法的返回值就是要选择的队列//这里可以用hash的方式就可以选择到同样的队列了int hash = arg.toString().hashCode();int index = hash % list.size();return list.get(index);}}, msgModel.getOrderSn());} catch (MQClientException e) {throw new RuntimeException(e);} catch (RemotingException e) {throw new RuntimeException(e);} catch (MQBrokerException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}});producer.shutdown();System.out.println("发送完毕");}@Testpublic void orderedConsumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe("testTopic", "*");//MessageListenerConcurrently 并发模式  多线程的  失败后最多重试16次 然后放入死信队列//MessageListenerOrderly 顺序模式 单线程的  失败后无限次重试 Integer.MAX_VALUEconsumer.registerMessageListener(new MessageListenerOrderly() {//顺序模式只有一个线程来执行消费@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {//这里的一个线程是一个队列一个线程System.out.println(new String(list.get(0).getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();}
}

事务消息

一般我们不使用RocketMQ的事务消息,所以有兴趣的可以看看其他的实现。

Tag标签和Key键

Rocketmq提供消息过滤功能,通过tag或者key进行区分。
我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待。

Tag的使用

@Test
public void tagProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(MQConstant.NAMESRV);producer.start();Message message1 = new Message("testTopic", "test1","test1的消息".getBytes() );Message message2 = new Message("testTopic", "test2","test2的消息".getBytes() );producer.send(message1);producer.send(message2);producer.shutdown();System.out.println("消息发送成功");
}@Test
public void test1Consumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe("testTopic", "test1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("消费test1的消息"+new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
@Test
public void test2Consumer() throws MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");consumer.setNamesrvAddr(MQConstant.NAMESRV);consumer.subscribe("testTopic", "test1 || test2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("消费test1/test2的消息"+new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

什么时候该用Topic,什么时候该用 Tag?
总结:不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分
可以从以下几个方面进行判断:
1.消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
2.业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
3.消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
4.消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

Key的使用

在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询。

@Test
public void testKeyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 设置nameServer地址producer.setNamesrvAddr(MQConstant.NAMESRV);// 启动实例producer.start();Message msg = new Message("testTopic","test1","key", "我是一个带标记和key的消息".getBytes());SendResult send = producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown();
}@Test
public void testKeyConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-producer-group");// 设置nameServer地址consumer.setNamesrvAddr(MQConstant.NAMESRV);// 订阅一个主题来消费   表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费consumer.subscribe("testTopic", "test1 || test2 || test3");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));System.out.println(msgs.get(0).getTags());System.out.println(msgs.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/43243.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HackNos 3靶场

配置 进入控制面板配置网卡 第一步&#xff1a;启动靶机时按下 shift 键&#xff0c; 进入以下界面 第二步&#xff1a;选择第二个选项&#xff0c;然后按下 e 键&#xff0c;进入编辑界面 将这里的ro修改为rw single init/bin/bash&#xff0c;然后按ctrlx&#xff0c;进入…

数据结构的图存储结构

目录 数据结构的图存储结构 图存储结构基本常识 弧头和弧尾 入度和出度 (V1,V2) 和 的区别,v2> 集合 VR 的含义 路径和回路 权和网的含义 图存储结构的分类 什么是连通图&#xff0c;&#xff08;强&#xff09;连通图详解 强连通图 什么是生成树&#xff0c;生…

springboot集成ES

1.引入pom依赖2.application 配置3.JavaBean配置以及ES相关注解 3.1 Student实体类3.2 Teacher实体类3.3 Headmaster 实体类4. 启动类配置5.elasticsearchRestTemplate 新增 5.1 createIndex && putMapping 创建索引及映射 5.1.1 Controller层5.1.2 service层5.1.3 ser…

leetcode做题笔记85最大矩形

给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵&#xff0c;找出只包含 1 的最大矩形&#xff0c;并返回其面积。 示例 1&#xff1a; 思路一&#xff1a;单调栈 int maximalRectangle(char** matrix, int matrixSize, int* matrixColSize){int dp[matrixSize…

使用MAT分析OOM问题

OOM和内存泄漏在我们的工作中&#xff0c;算是相对比较容易出现的问题&#xff0c;一旦出现了这个问题&#xff0c;我们就需要对堆进行分析。 一般情况下&#xff0c;我们生产应用都会设置这样的JVM参数&#xff0c;以便在出现OOM时&#xff0c;可以dump出堆内存文件&#xff…

基于libevent的tcp服务器

libevent使用教程_evutil_make_socket_nonblocking_易方达蓝筹的博客-CSDN博客 一、准备 centos7下安装libevent库 yum install libevent yum install -y libevent-devel 二、代码 server.cpp /** You need libevent2 to compile this piece of code Please see: http://li…

专访 BlockPI:共建账户抽象未来的新一代 RPC 基础设施

在传统 RPC 服务板块上&#xff0c;开发者一直饱受故障风险、运行环境混乱等难题的折磨。实现 RPC 服务的去中心化&#xff0c;且保持成本优势和可扩展性&#xff0c;始终是区块链基础设施建设的重要命题之一。从 2018 年观察中心化 RPC 供应商服务现状开始&#xff0c;BlockPI…

内存管理(1)

内存管理&#xff08;1&#xff09; 1、各类型数据在内存中的存储空间2、C内存管理方式2.1 针对于内置类型分析2.2 针对于自定义类型分析2.3 C语言与C在申请动态内存失败时的区别 3、operator new 和 operator delete函数&#xff08;重点&#xff09;3.1 底层知识解析3.2 实现…

linux-shell脚本收集

创建同步脚本xsync mkdir -p /home/hadoop/bin && cd /home/hadoop/bin vim xsync#!/bin/bash#1. 判断参数个数 if [ $# -lt 1 ] thenecho Not Arguementexit; fi#2. 遍历集群所有机器 for host in node1 node2 node3 doecho $host #3. 遍历所有目录&#xff0c;挨…

web3:使用Docker-compose方式部署blockscout

最近做的项目,需要blockscout来部署一个区块链浏览器,至于blockscout是什么,咱们稍后出一篇文章专门介绍下,本次就先介绍一下如何使用Docker-compose方式部署blockscout,以及过程中遇到的种种坑 目录 先决条件我的环境准备工作Docker-compose1.安装方式一:下载 Docker Co…

财务数据分析之现金流量表模板分享

现金流量表是我们常说的财务数据分析三表之一。它可以呈现一个企业的现金流情况&#xff0c;揭示企业经营管理健康状态&#xff0c;但在实际使用中却有总给人一种用不上、用不好的矛盾感。怎么才能把现金流量表做好&#xff1f;不如借鉴下大神的现金流量表模板。 下面介绍的是…

RabbitMQ-消息中间件学习记录(what-how-why)

什么是消息中间件 简单的来说就是消息队列中间件&#xff0c;生产者发送消息到中间件&#xff0c;消息中间件用于 保存消息并发送消息到消费者。 消息中间件RabbitMQ的基本组件 1&#xff09;producer -生产者 2&#xff09;customer -消费者 3&#xff09;broker (经纪人)- M…

【Java 动态数据统计图】动态数据统计思路案例(动态,排序,数组)四(116)

需求&#xff1a;&#xff1a;前端根据后端的返回数据&#xff1a;画统计图&#xff1b; 1.动态获取地域数据以及数据中的平均值&#xff0c;按照平均值降序排序&#xff1b; 说明&#xff1a; X轴是动态的&#xff0c;有对应区域数据则展示&#xff1b; X轴 区域数据降序排序…

LabVIEW调用DLL传递结构体参数

LabVIEW 中调用动态库接口时&#xff0c;如果是值传递的结构体&#xff0c;可以根据字段拆解为多个参数&#xff1b;如果参数为结构体指针&#xff0c;可用簇&#xff08;Cluster&#xff09;来匹配&#xff0c;其内存连续相当于单字节对齐。 1.值传递 接口定义&#xff1a; …

【FAQ】调用视频汇聚平台EasyCVR的iframe地址,视频无法播放的原因排查

有用户反馈&#xff0c;在调用iframe地址后嵌入用户自己的前端页面&#xff0c;视频无法播放并且要求登录。 安防监控视频汇聚平台EasyCVR基于云边端一体化架构&#xff0c;具有强大的数据接入、处理及分发能力&#xff0c;可提供视频监控直播、云端录像、视频云存储、视频集中…

视频集中存储EasyCVR视频汇聚平台定制项目增加AI智能算法

安防视频集中存储EasyCVR视频汇聚平台&#xff0c;可支持海量视频的轻量化接入与汇聚管理。平台能提供视频存储磁盘阵列、视频监控直播、视频轮播、视频录像、云存储、回放与检索、智能告警、服务器集群、语音对讲、云台控制、电子地图、平台级联、H.265自动转码等功能。为了便…

【Unity每日一记】Physics.Raycast 相关_Unity中的“X光射线”

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

05_bitmaphyperloglogGEO

Bitmap&hyperloglog&GEO 面试问 记录对集合中的数据进行统计在移动应用中&#xff0c;需要统计每天的新增用户数和第2天的留存用户数&#xff1b;在电商网站的商品评论中&#xff0c;需要统计评论列表中的最新评论&#xff1a;在签到打卡中&#xff0c;需要统计一个月内…

Python “贪吃蛇”游戏,在不断改进中学习pygame编程

目录 前言 改进过程一 增加提示信息 原版帮助摘要 pygame.draw pygame.font class Rect class Surface 改进过程二 增加显示得分 改进过程三 增加背景景乐 增加提示音效 音乐切换 静音切换 mixer.music.play 注意事项 原版帮助摘要 pygame.mixer pygame.mix…

kvm和vmware有什么区别?如何选择?

一、kvm和vmware的区别 VMware vSphere 平台 VMware 可以提供 ESXi 虚拟机监控程序和 vSphere 虚拟化平台。VMware ESXi 是一个能够直接安装到物理服务器上的裸机虚拟机监控程序&#xff0c;可以帮你整合硬件。你可以用 VMware 的虚拟化技术来创建和部署虚拟机&#xff08;VM…