RocketMq——Consume相关源码

摘要

RocketMQ只要有CommitLog文件就可以正常运行了,那为何还要维护ConsumeQueue文件呢?

ConsumeQueue是消费队列,引入它的目的是为了提高消费者的消费速度。毕竟RocketMQ是基于Topic主题订阅模式的,消费者往往只关心自己订阅的消息,如果每次消费都从CommitLog文件中检索数据,无疑性能是非常差的。有了ConsumeQueue,消费者就可以根据消息在CommitLog文件中的偏移量快速定位到消息进行消费了。

Broker会将客户端发送的消息写入CommitLog文件,持久化存储。但是整个流程并没有涉及到ConsumeQueue文件的操作,那么ConsumeQueue文件是如何被构建的呢?

一、CommitLog文件构建分析

ReputMessageService是消息重放服务,请允许我这么命名。Broker在启动的时候,会开启一个线程每毫秒执行一次doReput()方法。

它的目的就是对写入CommitLog文件里的消息进行「重放」,它有一个属性reputFromOffset,记录的是消息重放的偏移量,MessageStore启动的时候会对其进行赋值。

它的工作原理是,根据重放偏移量reputFromOffset去读取CommitLog里的待重放的消息,并构建DispatchRequest对象,然后将DispatchRequest对象分发出去,交给各个CommitLogDispatcher处理。

MessageStore维护了CommitLogDispatcher对象集合,目前只有三个处理器:

  1. CommitLogDispatcherBuildConsumeQueue:构建ConsumeQueue索引。
  2. CommitLogDispatcherBuildIndex:构建Index索引。
  3. CommitLogDispatcherCalcBitMap:构建布隆过滤器,加速SQL92过滤效率。

doReput()方法1毫秒执行一次,它的方法体是一个for循环,只要reputFromOffset没有到达CommitLog文件的最大偏移量,就会一直继续重放消息。

private boolean isCommitLogAvailable() {return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

它首先会根据reputFromOffset去CommitLog文件中截取一段ByteBuffer,这个缓冲区里就是待重放的消息数据。

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {// CommitLog单个文件的大小int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();// 根据索引构建进度找到等待构建的文件,文件名就是起始Offset,遍历文件即可找到MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {// 计算Offset在当前文件的读指针位置int pos = (int) (offset % mappedFileSize);/*** 基于MappedFile的MappedByteBuffer派生出一个ByteBuffer对象* 共享同一块内存,但是拥有自己的指针*/SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null;
}

SelectMappedBufferResult类属性如下:

// 起始偏移量
private final long startOffset;
// 缓冲区
private final ByteBuffer byteBuffer;
// 长度
private int size;
// 关联的MappedFile对象
private MappedFile mappedFile;

有了SelectMappedBufferResult,就可以读取消息数据了。由于消息重放并不需要知道消息主体内容,因此不会读取消息Body,只是读取相关属性,并构建DispatchRequest对象。读取的属性如下:

// 消息所属Topic
private final String topic;
// 消息所属队列ID
private final int queueId;
// 消息在CommitLog文件中的偏移量
private final long commitLogOffset;
// 消息大小
private int msgSize;
// 消息Tag哈希码
private final long tagsCode;
// 消息存盘时间
private final long storeTimestamp;
// 逻辑消费队列位点
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
// 消息唯一键
private final String uniqKey;
// 消息系统标记
private final int sysFlag;
// 事务消息偏移量
private final long preparedTransactionOffset;
// 属性
private final Map<String, String> propertiesMap;

有了DispatchRequest对象,接下来就是调用doDispatch方法将请求分发出去了。此时CommitLogDispatcherBuildConsumeQueue将被触发,它会将请求转交给DefaultMessageStore执行。

DefaultMessageStore.this.putMessagePositionInfo(request);

MessageStore先根据消息Topic和QueueID定位到ConsumeQueue文件,然后将索引追加到文件中。

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {// 根据Topic和QueueID定位到ConsumeQueue文件ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());// 追加索引到文件cq.putMessagePositionInfoWrapper(dispatchRequest);
}

写索引之前,会先确保消息仓库是可写状态:

boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();

然后,初始化一个ByteBuffer,容量为20字节,依次往里面写入:消息Offset、size、tagsCode。

// 每个索引的长度是20字节,byteBufferIndex是循环使用的
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
/**
* 索引结构:Offset+size+tagsCode
* 8字节 4字节 8字节
*/
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

根据消费队列位点和单个索引的长度计算索引应该写入的文件位置,因为是顺序写的嘛,所以获取最新的ConsumeQueue文件,如果文件写满会创建新的继续写。 

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

写之前,校验预期的偏移量和逻辑偏移量是否相等,正常情况下两者应该相等,如果不等说明数据构建错乱了,需要重新构建了。、

if (cqOffset != 0) {// 偏移量:当前文件的写指针位置+文件起始偏移量(文件名)long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();// 正常情况下,expectLogicOffset和currentLogicOffset应该相等if (expectLogicOffset < currentLogicOffset) {log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}
}

检验通过后,就可以正常写了。先更新当前ConsumeQueue记录消息的最大偏移量maxPhysicOffset,再将20个字节的索引数据写入到文件。 

至此,就完成了CommitLog中的消息到ConsumeQueue文件里的索引同步。

ConsumeQueue是RocketMQ用来加速消费者消费效率的索引文件,它是一个逻辑消费队列,并不保存消息本身,只是一个消息索引。索引长度为20个字节,记录了消息在CommitLog文件里的偏移量,消息长度,和消息Tag的哈希值。Consumer消费消息时可以根据Tag哈希值快速过滤消息,然后根据偏移量快速定位到消息,再根据消息长度读取出一条完整的消息。

Broker将消息写入CommitLog后并不会马上写ConsumeQueue,而是由一个异步线程ReputMessageService将消息进行重放,重放的过程中由CommitLogDispatcherBuildConsumeQueue将消息构建到ConsumeQueue文件,构建的频率为1毫秒一次,几乎是近实时的,不用担心消费会延迟。

二、Consumer消息拉取和消费分析

MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic、注册消息监听器、启动生产者开始消费消息。

消费者获取消息的模式有两种:推模式和拉模式,对应的类分别是DefaultMQPushConsumer和DefaultMQPullConsumer,需要注意的是,在4.9.0版本,DefaultMQPullConsumer已经被废弃了。

Push模式下,由Broker接收到消息后主动推送给消费者,实时性较高,但是会增加Broker的压力。Pull模式下,由消费者主动从Broker拉取消息,主动权在消费者,这种方式更灵活,消费者可以根据自己的消费能力拉取适量的消息。

实际上,Push模式也是通过Pull的方式实现的,消息统一由消费者主动拉取,那如何保证消息的实时性呢?

Consumer和Broker会建立长连接,一旦分配到MessageQueue,就会立马构建PullRequest去拉取消息,在不触发流控的情况下,不管有没有拉取到新的消息,Consumer都会立即再次拉取,这样就保证了消息消费的实时性。

如果Broker长时间没有新的消息,Consumer一直拉取,岂不是空转CPU浪费资源?

Consumer在拉取消息时,会携带参数suspendTimeoutMillis,它表示Broker在没有新的消息时,阻塞等待的时间,默认是15秒。如果没有消息,Broker等待15秒再返回结果,避免客户端频繁拉取。如果15秒内有新的消息了,立马返回,保证消息消费的时效性。

2.1 Consumer相关组件

DefaultMQPushConsumer

RocketMQ暴露给开发者使用的基于Push模式的默认生产者类,和DefaultMQProducer一样,它也仅仅是一个外观类,基本没有业务逻辑,几乎所有操作都转交给生产者实现类DefaultMQPushConsumerImpl完成。这么做的好处是RocketMQ屏蔽了内部实现,方便在后续的版本中随时更换实现类,而用户无感知。

DefaultMQPushConsumerImpl

默认的基于Push模式的消费者实现类,拥有消费者的所有功能,例如:拉取消息、执行钩子函数、消费者重平衡等等。

PullAPIWrapper

调用拉取消息API的包装类,它是Consumer拉取消息的核心类,它有一个方法特别重要pullKernelImpl,是拉取消息的核心方法。它会根据拉取的MessageQueue去查找对应的Broker,然后构建拉取消息请求头PullMessageRequestHeader发送到Broker,然后执行拉取回调,在回调里会通知消费者消费拉取到的消息。

OffsetStore

OffsetStore是RocketMQ提供的,用来帮助Consumer管理消费位点(消费进度)的接口,它有两个实现类:LocalFileOffsetStore和RemoteBrokerOffsetStore,从名字就可以看出来,一个是将消费进度存储在本地,一个是将消费进度存储在Broker上。
LocalFileOffsetStore会将消费进度持久化到本地磁盘,Consumer启动后会从指定目录读取文件,恢复消费进度。
RemoteBrokerOffsetStore将消费进度交给Broker管理,Consumer不会存储到文件,没有意义,但是消费消息时会暂存消费进度在内存,然后在拉取消息时上报消费进度,由Broker负责存储。

什么场景下需要将消费进度存储在本地呢?这和RocketMQ消息消费模式有关,RocketMQ支持两种消息消费模式:集群消费和广播消费。一个ConsumerGroup下可以有多个消费者实例,集群模式下,消息只会投递给其中一个Consumer实例消费,而广播模式下,消息会投递给每个Consumer实例。

综上所述,集群模式下,消费进度由Broker管理,使用RemoteBrokerOffsetStore。广播模式下,因为消息需要被每个Consumer实例消费,每个实例消费的进度是不一样的,因此由实例自己存储消费进度,使用LocalFileOffsetStore。

ConsumeMessageService

消费消息的服务,客户端拉取到消息后,是需要有线程去消费的,因此它是一个线程池,线程数由consumeThreadMinconsumeThreadMax设置,默认线程数为20。

它是一个接口,比较重要的两个方法如下:

// 当前线程直接消费消息
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);// 提交消费请求,由线程池去调度
void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume);

一个是由当前线程直接消费消息,另一个是提交消费请求ConsumeRequest由线程池去负责调度,一般情况下使用的还是后者。

RocketMQ提供了两个实现类,分别是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,前者用来并发消费消息,后者用来消费有序消息。

PullMessageService

消息拉取服务,负责从Broker拉取消息,然后提交给ConsumeMessageService消费。它也是一个线程,它的run方法是一个死循环,通过监听阻塞队列来判断是否需要拉取消息。阻塞队列里存放的就是PullRequest对象,当Consumer实例上线后,会做一次负载均衡,从众多MessageQueue中给自己完成分配,当有新的MessageQueue被分配给自己,就会创建PullRequest对象提交到阻塞队列,然后PullMessageService就会开始拉取消息,在拉取完成的回调函数中,不管有没有拉取到新的消息,在不触发流控的情况下,都会一直拉取。

2.2 Consumer相关源码分析

整个Consumer的运行可以大致分为四个过程:

  1. 消费者启动
  2. 消费者组负载均衡
  3. 消息的拉取
  4. 消息的消费

Consumer的启动流程和Producer有部分重合之处。

首先自然是创建DefaultMQPushConsumer,在它的构造函数中,会创建消费者实现类DefaultMQPushConsumerImpl。

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {// 消费组名this.consumerGroup = consumerGroup;// 命名空间this.namespace = namespace;// 消息队列分配策略算法this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 消费者实现类defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

前面说过,DefaultMQPushConsumer只是一个外观类,它更多的职责只是保存Consumer的配置,它的属性可以重点关注一下:

// 消费者实现
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 消费者组名
private String consumerGroup;
// 消费类型:集群消费/广播消费
private MessageModel messageModel = MessageModel.CLUSTERING;
// 新加入的ConsumerGroup,从哪里开始消费消息?
// 只针对Broker没有消费位点的新ConsumerGroup,已经存在的消费组设置无意义。
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 当ConsumeFromWhere设为CONSUME_FROM_TIMESTAMP时,
// 从哪个时间点开始消费?默认半小时前。
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
// 消费者分配消息的策略算法
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// Topic订阅关系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息监听器
private MessageListener messageListener;
/*** 消费进度管理* 1.集群消费:RemoteBrokerOffsetStore* 2.广播消费:LocalBrokerOffsetStore*/
private OffsetStore offsetStore;
// 最小消费线程数
private int consumeThreadMin = 20;
// 最大消费线程数
private int consumeThreadMax = 20;
// 动态调整线程池,代码被删除,暂不支持,忽略。
private long adjustThreadPoolNumsThreshold = 100000;
// 并发消息的最大位点差,超过该值说明客户端消息积压较多,降低拉取速度
private int consumeConcurrentlyMaxSpan = 2000;
// 单个Queue缓存的消息阈值,达到阈值流控处理
private int pullThresholdForQueue = 1000;
// 单个Queue缓存的消息字节数阈值,单位MB
private int pullThresholdSizeForQueue = 100;
// 单个Topic缓存的消息数阈值
private int pullThresholdForTopic = -1;
// 单个Topic缓存的消息字节数阈值
private int pullThresholdSizeForTopic = -1;
// 消息拉取间隔,单位ms
private long pullInterval = 0;
// 消费者批量消费的消息数
private int consumeMessageBatchMaxSize = 1;
// 批量拉取的消息数
private int pullBatchSize = 32;
// 每次拉取消息是否更新订阅关系?
private boolean postSubscriptionWhenPull = false;
private boolean unitMode = false;
// 最大消费重试次数,默认16
private int maxReconsumeTimes = -1;
// 需要降低拉取速度时,暂停拉取的时间
private long suspendCurrentQueueTimeMillis = 1000;
// 消费超时时间,单位:分钟
private long consumeTimeout = 15;
// 关闭消费者时,等待消息消费的时间,默认不等待。
private long awaitTerminationMillisWhenShutdown = 0;
// 消息轨迹跟踪
private TraceDispatcher traceDispatcher = null;

SubscriptionData代表了消费者的订阅关系,属性如下:

// 启用Broker类过滤模式
private boolean classFilterMode = false;
// 订阅的Topic
private String topic;
// 子表达式 Tag/SQL92语法
private String subString;
// Tag集合
private Set<String> tagsSet = new HashSet<String>();
// Tag哈希集合,Broker根据Tag哈希快速过滤消息
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
// 表达式类型,默认是Tag,也可以用SQL92语法
private String expressionType = ExpressionType.TAG;
// 使用FilterClass过滤的源码
private String filterClassSource;

如果使用TAG的方式,会计算出Tag哈希值,Broker在ConsumeQueue索引中记录了Tag哈希,这样就可以根据Tag哈希快速过滤消息了。

订阅完Topic,就是注册消息监听MessageListener,就是一个赋值操作。

以上操作执行完,Consumer就可以启动了,接下来才是重头戏。

外观类启动的时候,会启动消费者实现类DefaultMQPushConsumerImpl,我们直接看它就好。启动主要做了以下事情:

  1. 校验消费者配置
  2. 拷贝订阅关系
  3. 创建MQClientInstance
  4. 设置RebalanceImpl
  5. 创建PullAPIWrapper,消息拉取核心类
  6. 加载消费进度(Local)
  7. 启动ConsumeMessageService
  8. 启动MQClientInstance
  9. 拉取订阅的Topic路由信息
  10. SQL表达式上传到Broker编译
  11. 给Broker发心跳,通知其他Consumer重平衡
  12. 自己重平衡,拉取消息

checkConfig方法,会在Consumer启动前做一系列的校验,确保服务满足启动条件,校验的事项有:

  1. 校验GroupName
  2. 校验消费模式:集群/广播
  3. 校验ConsumeFromWhere
  4. 校验开始消费的指定时间
  5. 校验AllocateMessageQueueStrategy
  6. 校验订阅关系
  7. 校验是否注册消息监听
  8. 校验消费线程数
  9. 校验单次拉取的最大消息数
  10. 校验单次消费的最大消息数
  11. 启动前校验通过,说明配置没有问题,具备启动的基本条件。

copySubscription方法会拷贝订阅关系到RebalanceImpl,Consumer在重平衡时需要用到,除了拷贝给定的Topic订阅关系,Consumer还会自动订阅ConsumerGroup的重试队列。

// 集群消费模式下,自动订阅重试Topic
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);

创建客户端实例MQClientInstance,消息拉取核心对象PullAPIWrapper。

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

根据消息消费模式,创建对应的OffsetStore

switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;
}
// 从磁盘恢复消费进度(Local)
this.offsetStore.load();

创建ConsumeMessageService并启动,如果是有序消息,创建ConsumeMessageOrderlyService,并发消费创建ConsumeMessageConcurrentlyService。

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();

ConsumeMessageService是一个线程池,消息拉取服务拉取到消息后,会构建ConsumeRequest对象交给线程池调度执行。

前置操作完成后,就可以启动客户端实例了。MQClientInstance启动主要做了以下事情:

  1. 发请求获取NameServerAddr
  2. 启动Netty客户端
  3. 启动各种定时任务
  4. 启动消息拉取服务
  5. 启动重均衡服务

客户端如果没有指定NameServer地址,RocketMQ会读取环境变量rocketmq.namesrv.domain,它的期望值是一个URL链接,每隔2分钟发一个请求更新NameServer地址。在集群环境下,NameServer机器数和IP都是不固定的,通过配置中心下发比硬编码更灵活。

if (null == this.clientConfig.getNamesrvAddr()) {// 读取环境变量,发请求更新NameServer地址this.mQClientAPIImpl.fetchNameServerAddr();
}

RocketMQ基于Netty来完成网络通信,Consumer作为客户端是要和Broker通信的,因此还需要启动Netty客户端。

启动各种定时任务,这些任务包括:获取NameServer地址,从NameServer拉取Topic路由信息、清理下线的Broker、给Broker发心跳、持久化消费进度。

启动消息拉取服务PullMessageService,它是一个单独的线程,run方法会监听阻塞队列pullRequestQueue,只要队列中有拉取请求,它就会去Broker拉取消息。

public void run() {while (!this.isStopped()) {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);}
}

启动重平衡服务RebalanceService,也是一个单独的线程,默认会每隔20秒重新做一次负载均衡,给Consumer重新分配MessageQueue。例如,TopicA下有4个MessageQueue,此时只有一个消费者实例订阅了,那么这4个MessageQueue都会分配给它消费。过了一会儿,新的消费者实例上线,此时会做一次重平衡,重新分配,因为有两个消费者实例了,因此每个实例会分配2个MessageQueue。

@Override
public void run() {while (!this.isStopped()) {// 默认20秒做一次重新负载均衡this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}
}

相关服务启动完成后,Consumer会自动向NameServer拉取订阅的Topic路由信息。

private void updateTopicSubscribeInfoWhenSubscriptionChanged() {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}}
}

博文参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/715945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

定制开发一款家政小程序,应知应会

引言 在这个快节奏的现代生活中&#xff0c;人们对高效、便捷的家政服务的需求日益增加。随着社会结构的变化和职业生活的繁忙&#xff0c;许多家庭面临着时间不足、精力不济的挑战。在这种情况下&#xff0c;家政服务成为解决问题的有效途径。然而&#xff0c;传统的家政服务…

Python——桌面摄像头软件(附源码+打包)

目录 一、前言 二、桌面摄像头软件 2.1、下载项目 2.2、功能介绍 三、打包工具&#xff08;nuitka&#xff09; 四、项目文件复制&#xff08;我全部合到一个文件里面了&#xff09; 五、结语 一、前言 看见b站的向军大叔用electron制作了一个桌面摄像头软件 但是&#x…

如何在jupyter notebook 中下载第三方库

在anconda 中找到&#xff1a; Anaconda Prompt 进入页面后的样式&#xff1a; 在黑色框中输入&#xff1a; 下载第三方库的命令 第三方库&#xff1a; 三种输入方式 标准保证正确 pip instsall 包名 -i 镜像源地址 pip install pip 是 Python 包管理工具&#xff0c;…

新项目,Linux上一键安装MySQL,Redis,Nacos,Minio

大家好&#xff0c;我是 jonssonyan 分享一个我的一个开源项目&#xff0c;这是一个在 Linux 平台上一键安装各种软件的脚本项目&#xff0c;脚本使用 Shell 语言编写&#xff0c;后续还会增加更多软件的一键安装&#xff0c;代码在 GitHub 上全部开源的&#xff0c;开源地址如…

【Python】进阶学习:pandas--如何根据指定条件筛选数据

【Python】进阶学习&#xff1a;pandas–如何根据指定条件筛选数据 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望…

2024第二次培训:win11系统下使用nginx、JDK、mysql搭建基于vue2、java前后端分离的web应用运行环境

一.背景 公司安排了带徒弟的任务&#xff0c;给培训写点材料。前面分开介绍了mysql、jdk、nginx的安装&#xff0c;都只是零星的介绍&#xff0c;只能算零散的学习。学习了有什么用呢&#xff1f;能解决什么问题&#xff1f;能完成什么工作&#xff1f; 今天我们要用之前的几篇…

为什么要在业务系统中引入大宽表?

在高度系统化驱动的业务中&#xff0c;查看业务报表已经是一个很常见的需求了。在分工非常明确的大型企业里&#xff0c;往往有专门的数据分析团队 BI 或者数据开发团队&#xff0c;他们能够胜任此类需求&#xff08;但也未必是轻松的&#xff0c;或者说高效的&#xff09;。 …

Stable Diffusion 模型分享:AAM XL (Anime Mix)(动漫截屏风格 XL)

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八 下载地址 模型介绍 AAM XL (Anime Mix) 是一个动漫截屏风格的模型&#xff0c;是 AAM - AnyLoRA Anime Mix 模…

【yolov8部署实战】VS2019环境下使用C++和OpenCV环境部署yolo项目|含详细注释源码

一、前言 之前一阵子一直在做的就是怎么把yolo项目部署成c项目&#xff0c;因为项目需要嵌套进yolo模型跑算法。因为自己也是本科生小白一枚&#xff0c;基本上对这方面没有涉猎过&#xff0c;自己一个人从网上到处搜寻资料&#xff0c;写代码&#xff0c;调试&#xff0c;期间…

黑马JavaWeb开发跟学(三)Web前端开发Vue-Element

黑马JavaWeb开发跟学三.Web前端开发Vue-Element 1 Ajax1.1 Ajax介绍1.1.1 Ajax概述1.1.2 Ajax作用1.1.3 同步异步 1.2 原生Ajax1.3 Axios1.3.1 Axios的基本使用1.3.2 Axios快速入门1.3.3 请求方法的别名1.3.4 案例 2 前后台分离开发2.1 前后台分离开发介绍2.2 YAPI2.2.1 YAPI介…

【GPU驱动开发】-mesa简介

前言 不必害怕未知&#xff0c;无需恐惧犯错&#xff0c;做一个Creator&#xff01; 一、mesa介绍 Mesa 是一个开源的3D图形库&#xff0c;它实现了多种图形API&#xff0c;包括 OpenGL、Vulkan 和 OpenCL。Mesa 的目标是提供一个开源、跨平台的图形库&#xff0c;使得开发者…

ABAP - SALV教程08 列设置热点及绑定点击事件

实现思路&#xff1a;将列设置成热点&#xff0c;热点列是可点击的&#xff0c;再给SALV实例对象注册点击事件即可&#xff0c;一般作用于点击单号跳转到前台等功能 "设置热点方法METHODS:set_hotspot CHANGING co_alv TYPE REF TO cl_salv_table...."事件处理方法M…

合宙esp32-c3 进入深度睡眠无法唤醒解决一例

手贱&#xff0c;昨天收到了嘉立创最新的esp32 s3,想测试一下电流功耗&#xff0c;于是顺便测试了一下以前的合宙esp32 c3 无串口芯片的版本 打算对比一下c3和s3的功耗相差多少&#xff0c;结果把自己玩死了&#xff1a; void setup() {esp_deep_sleep_start();// esp_light_s…

oppo手机备忘录记录怎么转移到华为手机?

oppo手机备忘录记录怎么转移到华为手机?使用oppo手机已经有三四年了&#xff0c;因为平时习惯&#xff0c;在手机系统的备忘录中记录了很多重要的笔记&#xff0c;比如工作会议的要点、读书笔记、购物清单、朋友的生日提醒等。这些记录对我来说非常重要&#xff0c;我可以通过…

2000-2021年300+地级市进出口总额数据

2000-2021年300地级市进出口总额数据 1、时间&#xff1a;2000-2021年 2、指标&#xff1a;进出口总额 3、单位&#xff1a;万美元 4、来源&#xff1a;城市年鉴、各省年鉴、城市公报、2021年为城市统计年鉴中进口额出口额加总之后换算成万美元&#xff0c;已尽最大可能进行…

1.亿级积分数据分库分表:总体方案设计

项目背景 以一个积分系统为例&#xff0c;积分系统最核心的有积分账户表和积分明细表&#xff1a; 积分账户表&#xff1a;每个用户在一个品牌下有一个积分账户记录&#xff0c;记录了用户的积分余额&#xff0c;数据量在千万级积分明细表&#xff1a;用户每次积分发放、积分扣…

数据结构——Top-k问题

Top-k问题 方法一&#xff1a;堆排序&#xff08;升序&#xff09;&#xff08;时间复杂度O(N*logN)&#xff09;向上调整建堆&#xff08;时间复杂度&#xff1a;O(N * logN) &#xff09;向下调整建堆&#xff08;时间复杂度&#xff1a;O(N) &#xff09;堆排序代码 方法二&…

LeetCode---386周赛

题目列表 3046. 分割数组 3047. 求交集区域内的最大正方形面积 3048. 标记所有下标的最早秒数 I 3049. 标记所有下标的最早秒数 II 一、分割数组 这题简单的思维题&#xff0c;要想将数组分为两个数组&#xff0c;且分出的两个数组中数字不会重复&#xff0c;很显然一个数…

人工智能指数报告2023

人工智能指数报告2023 主要要点第 1 章 研究与开发第 2 章 技术性能第 3 章 人工智能技术伦理第 4 章 经济第 5 章 教育第 6 章 政策与治理第 7 章 多样性第 8 章 舆论 人工智能指数是斯坦福大学以人为本的人工智能研究所&#xff08;HAI&#xff09;的一项独立倡议&#xff0c…

Java 石头剪刀布小游戏

一、任务 编写一个剪刀石头布游戏的程序。程序启动后会随机生成1~3的随机数&#xff0c;分别代表剪刀、石头和布&#xff0c;玩家通过键盘输入剪刀、石头和布与电脑进行5轮的游戏&#xff0c;赢的次数多的一方为赢家。若五局皆为平局&#xff0c;则最终结果判为平局。 二、实…