RocketMQ消息过滤机制源码详解

#RocketMQ提供了2种消息过滤的方式:

  • TAG 过滤

  • SQL92 过滤

SQL过滤默认是没有打开的,如果想要支持,必须在broker的配置文件中设置:enablePropertyFilter = true

一. 示例代码

1.1 producer 代码

public class Producer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("tag_p_g");// 设置NameServer的地址producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = {"TAG_A", "TAG_B", "TAG_C"};for (int i = 0; i < 10 ; i++) {byte[] body = ("Hi filter message," + i).getBytes();String tag = tags[i % tags.length];//同一个topic下,会发送多种tag消息Message msg = new Message("MY_topic", tag, body);//设置一些属性,消费者SQL过滤时可以使用msg.putUserProperty("age", String.valueOf(i));msg.putUserProperty("name", "name" + (i + 1));msg.putUserProperty("isGender", String.valueOf(new Random().nextBoolean()));SendResult sendResult = producer.send(msg);System.out.println("sendResult = " + sendResult);}producer.shutdown();}
}

1.2 consumer 代码

1.2.1 TAG过滤
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c_tag_group");consumer.setNamesrvAddr("127.0.0.1:9876");/*** 订阅消息过滤* 只订阅 topic = MY_topic 下* tag = TAG_A 或者 tag = TAG_C 的消息,不要 tag = TAG_B 的消息* 订阅多个tag使用 || 分开*/consumer.subscribe("MY_topic", "TAG_A || TAG_C");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(msg);}//消费成功时返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Filter Tag Consumer Started");}
}
1.2.2 SQL92过滤
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");consumer.setNamesrvAddr(MQConstant.NAME_SERVER_ADDR);/*** 订阅消息过滤: 根据消息生产者指定的用户属性进行过滤* 支持的常量类型:*   数值:比如:123,3.1415*   字符:必须用单引号包裹起来,比如:'abc'*   布尔:TRUE 或 FALSE*   NULL:特殊的常量,表示空** 支持的运算符有:*   数值比较:>,>=,<,<=,BETWEEN,=*   字符比较:=,<>,IN*   逻辑运算 :AND,OR,NOT*   NULL判断:IS NULL 或者 IS NOT NULL**   // (age between 6 and 9) AND (name IS NOT NULL) AND (isGender = TRUE)*/consumer.subscribe(MQConstant.FILTER_SQL_TOPIC, MessageSelector.bySql("(age between 6 and 9) AND (name IS NOT NULL) AND (isGender = TRUE)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(msg);}//消费成功时返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Filter SQL Consumer Started");}
}

二. 说明

消费者去broker拉取消息时,先经过broker过滤一次,在经过消费者过滤一次

  1. 如果是 TAG 过滤。broker要先根据ConsumeQueue 中 Tag HashCode过滤一次,消费者在根据 Tag 值过滤一次。因为 ConsumeQueue 为了便于检索,文件中每一个条目都是定长20字节,所以条目在最后八个字节存储的是消息 Tag 的 HashCode,而不是TAG值。这样broker在拉取磁盘中的消息时,只需要对比 ConsumeQueue中 的Tag HashCode,而不需要解析 CommitLog 中的 Tag 值,如果发生Hash冲突,则交给消费者客户端过滤消息中的Tag值。
  2. 如果是 SQL92 过滤。则全部由 broker 过滤。因为 SQL 过滤的是消息中的属性值,所以必须反序列化 CommitLog 中的属性值,既然在broker已经进行了精确匹配,那么客户端自然可以省去这个步骤了。

三. 消费者启动注册订阅信息到broker

consumer订阅信息会保存到SubscriptionData中,当consumer启动后,会通过心跳先将订阅信息发送到broker。broker主要是构建2部分:

  1. 保存consumer发送的订阅信息SubscriptionData对象。
  2. 构建SQL过滤的ConsumerFilterData对象。

那么我们看下consumer构建订阅数据以及发送到broker的过程:

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, org.apache.rocketmq.client.consumer.MessageSelector)
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {try {if (messageSelector == null) {subscribe(topic, SubscriptionData.SUB_ALL);return;}//核心就是创建SubscriptionDataSubscriptionData subscriptionData = FilterAPI.build(topic,messageSelector.getExpression(), messageSelector.getExpressionType());this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}

继续看FilterAPI.build(...)方法:

// org.apache.rocketmq.common.filter.FilterAPI#build
public static SubscriptionData build(final String topic, final String subString,final String type) throws Exception {// 如果是TAG过滤,则执行这里if (ExpressionType.TAG.equals(type) || type == null) {return buildSubscriptionData(topic, subString);}if (subString == null || subString.length() < 1) {throw new IllegalArgumentException("Expression can't be null! " + type);}// 如果是SQL过滤,则执行这里,相对简单,直接原样发送给brokerSubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);subscriptionData.setExpressionType(type);return subscriptionData;}
}

如果是TAG过滤,consumer会做些额外的处理:

// org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionData 
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {// 订阅所有消息subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {// 如果订阅的不是*,则通过 || 分割String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {// 保存分割后的TAG值subscriptionData.getTagsSet().add(trimString);// 保存分割后的TAG HashCodesubscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData;}

这样consumer的订阅信息就准备好了,然后consumer启动,发送心跳数据:

//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#startpublic synchronized void start() throws MQClientException {//......代码省略.......// 发送心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//......代码省略.......
}

我们再看下broker是如何处理心跳数据的:

public class ClientManageProcessor implements NettyRequestProcessor {@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {// 接收客户端心跳指令,保存客户端信息case RequestCode.HEART_BEAT:return this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT:return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null;}
}

heartBeat方法:

// org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat 
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {// 处理消费者心跳for (ConsumerData data : heartbeatData.getConsumerDataSet()) {SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());//...// 注册消费者信息boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);// ...}// ...return response;}

继续往下走:

// org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {//...// 更新topic下消费组信息boolean r2 = consumerGroupInfo.updateSubscription(subList);//...this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);//...
}

继续往里走:

// org.apache.rocketmq.broker.filter.ConsumerFilterManager#register(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long)
public boolean register(final String topic, final String consumerGroup, final String expression,final String type, final long clientVersion) {// 如果是TAG 过滤,则退出if (ExpressionType.isTagType(type)) {return false;}// 如果是SQL过滤,但没有指定过滤规则,则退出if (expression == null || expression.length() == 0) {return false;}FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;}BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);// 构建SQL过滤的ConsumerFilterDatareturn filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}

注册方法内部主要就是构建ConsumerFilterData对象:

// org.apache.rocketmq.broker.filter.ConsumerFilterManager#build
public static ConsumerFilterData build(final String topic, final String consumerGroup,final String expression, final String type,final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;}ConsumerFilterData consumerFilterData = new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression));} catch (Throwable e) {log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());return null;}return consumerFilterData;
}

最终工作的就是:

public class SqlFilter implements FilterSpi {@Overridepublic Expression compile(final String expr) throws MQFilterException {return SelectorParser.parse(expr);}@Overridepublic String ofType() {return ExpressionType.SQL92;}
}

好了,到这里就铺垫好了,接下来我们继续看消息过滤的过程,这个过程中,上面的2个对象将会工作。

四. 拉取消息

broker处理拉取请求的处理器:PullMessageProcessor 方法内容比较多,还是关注和过滤相关的部分

// org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);// .......省略诸多代码........SubscriptionData subscriptionData = null;ConsumerFilterData consumerFilterData = null;// 这里是false, consumer启动时已经将订阅信息发送到了broker,拿来即用即可if (hasSubscriptionFlag) {try {subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData != null;}} catch (Exception e) {log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark("parse the consumer's subscription failed");return response;}} else {ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());// ....省略判断.......// 获取订阅数据,这个就是consumer启动时发送给broker的subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());// .....省略判断.......// SQL过滤 if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {//TODO:前面分析consumer心跳时看到了它,SQL过滤时会创建consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),requestHeader.getConsumerGroup());// ....省略判断......}}// .....省略判断.......MessageFilter messageFilter;if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {// 创建MessageFiltermessageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}// 从broker 拉取消息final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);//....省略大量代码.....和过滤无关        
}      

接下来我们就看下从 CommitLog 读取消息并过滤的过程:

// org.apache.rocketmq.store.DefaultMessageStore#getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums,final MessageFilter messageFilter) {// .....省略大篇幅代码.......// 在去commitlog读取消息之前,对ConsumeQueue条目进行 tag hashcode 过滤if (messageFilter != null&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}// 从CommitLog 读取消息SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);if (null == selectResult) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.MESSAGE_WAS_REMOVING;}nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);continue;}// 在从commitlog读取消息之后,进行 SQL 过滤if (messageFilter != null&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}// release...selectResult.release();continue;}}

主要就是做3件事:

  1. 在去 CommitLog 读取消息之前,先根据 TAG hashcode 过滤一次 ConsumeQueue 中的条目,如果ConsumeQueue中保存Tag HashCode与消费组需要消费Tag HashCode不一致,则不会读取CommitLog中的消息了。

broker先完成tag hashcode 过滤,consumer进一步完成tag 值过滤。

  1. 去 CommitLog 读取消息
  2. 从 CommitLog 读取出消息之后,如果是SQL过滤,则在broker完成过滤。

4.1 Broker完成 TAG HashCode 过滤

TAG 过滤就是ExpressionMessageFilter#isMatchedByConsumeQueue(..)方法:

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {if (null == subscriptionData) {return true;}if (subscriptionData.isClassFilterMode()) {return true;}// by tags code.if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {if (tagsCode == null) {return true;}if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {return true;}// 根据tag hashcode 过滤return subscriptionData.getCodeSet().contains(tagsCode.intValue());} else {// ....省略else.....}return true;
}

这个方法内部会完成TAG 的hashcode 过滤,不过这里只是TAG的初步过滤,因为两个不同TAG也可能有相同的hashcode,所以这里过滤并不完善,真正的TAG过滤是交给消费者来完成的。

4.2 Broker完成 SQL 过滤

SQL的过滤是在ExpressionMessageFilter#isMatchedByCommitLog(..)方法中:

@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {if (subscriptionData == null) {return true;}if (subscriptionData.isClassFilterMode()) {return true;}// 如果是TAG过滤,则直接退出if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {return true;}// SQL过滤的数据(sql表达式等等)ConsumerFilterData realFilterData = this.consumerFilterData;Map<String, String> tempProperties = properties;// .....校验code.....Object ret = null;try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);}log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);if (ret == null || !(ret instanceof Boolean)) {return false;}return (Boolean) ret;
}

这里会根据SQL进行过滤,如果该条消息是消费者想要的,则将其放入容器中,返回给消费者,如果不是消费者想要的,则直接丢弃,继续查询下一条消息。

这里的丢弃只是不返回给消费者,在清除 CommitLog 文件之前,这条消息都是在的。

五. 消费消息

前面说了,如果是TAG 过滤,则Broker会率先完成一次TAG Hashcode过滤,但是这样过滤并不完全,因为不同TAG可能有相同Hashcode,所以消费者要根据TAG 值完成最后的过滤。

如果是SQL过滤,只能由Broker完成,消费者不做其他任何操作。

那么我们还是看消费者消费消息时的过滤逻辑:

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest) {//......PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {// 处理拉取结果,这里将会完成TAG的值过滤pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);}//.......}//.......
}

那么我们继续看下它的内部实现:

// org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);List<MessageExt> msgListFilterAgain = msgList;// 根据TAG 值过滤if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}// 将过滤后的消息给消费者消费pullResultExt.setMsgFoundList(msgListFilterAgain);//........}return pullResult;
}

六. 总结

  1. RocketMQ支持两种方式的消息过滤:TAG/SQL
  2. 要想使用SQL过滤,必须要在broker中配置:enablePropertyFilter = true
  3. TAG 过滤分两个阶段完成:
  • 第一阶段:broker率先根据tag的hashcode完成过滤
  • 第二阶段:consumer根据tag值完成最后的过滤
  1. SQL过滤只能在Broker中完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192650.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实战系统玩转OpenGL和AI,助力实现各种高级酷炫视频特效几个技巧

随着计算机图形学和人工智能的发展&#xff0c;通过将OpenGL和AI相结合&#xff0c;我们可以实现各种令人印象深刻的高级酷炫视频特效。本文将介绍几个技巧&#xff0c;帮助您在实践中更好地应用这些技术&#xff0c;并附上相应的源码。 火焰效果: 利用OpenGL的纹理映射和着色器…

程序员学习方法

https://www.zhihu.com/question/24187324 https://www.zhihu.com/question/505750740 windows系统&#xff1a; 如何业余开展 Windows 系统的学习&#xff1f; - 知乎 wifi工作原理&#xff1a; WiFi的工作原理是什么&#xff1f; - 知乎 发

LLM:《第 3 部分》从数学角度评估封闭式LLM的泛化能力

一、说明 在 OpenAI 或 Anthropic 等封闭式大型语言模型 (LLM) 领域&#xff0c;对智能和多功能性的真正考验在于它们处理高特异性查询并在响应中表现出独特性的能力。在这篇博客中&#xff0c;我的目标是提供测试这些模型泛化能力的机制。 封闭式LLM意味着您不知道训练语料库的…

ORA-00257: archiver error. Connect internal only, until freed 处理方法

1、产生原因 通过PL/SQL登录用户账号提示此信息&#xff0c;导致无法正常登录&#xff0c;查询资料显示出现ORA-00257错误由于归档日志已满&#xff0c;占用了全部的硬盘剩余空间导致的&#xff0c;通过简单删除日志释放存储空间就能够解决。 2、解决办法 2.1 root用户登录服务…

【java智慧工地源码】智慧工地物联网云平台,实现现场各类工况数据采集、存储、分析与应用

“智慧工地整体方案”以智慧工地物联网云平台为核心&#xff0c;基于智慧工地物联网云平台与现场多个子系统的互联&#xff0c;实现现场各类工况数据采集、存储、分析与应用。通过接入智慧工地物联网云平台的多个子系统板块&#xff0c;根据现场管理实际需求灵活组合&#xff0…

设计模式--观察者模式(Observer Pattern)

Observer模式 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为设计模式&#xff0c;它定义了对象之间的依赖关系&#xff0c;当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都会得到通知并被自动更新。 观察者模式主要包含以下几个角色&#xff1…

SSM SpringBoot vue社团事务管理系统

SSM SpringBoot vue社团事务管理系统 系统功能 登录 个人中心 人员信息管理 考勤信息管理 空闲时间管理 现金日记账管理 经费预算管理 物品租借管理 会议信息管理 活动信息管理 项目任务管理 公告通知管理 物资信息管理 开发环境和技术 开发语言&#xff1a;Java 使用框架:…

面试题:千万量级数据中查询 10W 量级的数据有什么方案?

文章目录 前言初版设计方案整体方案设计为&#xff1a;技术方案如下&#xff1a;CK 分页查询使用 ES Scroll Scan 优化深翻页耗时数据 ESHbase 组合查询方案ES 查询的两个阶段组合使用 Hbase RediSearchRedisJSON 优化方案RediSearch 性能数据RedisJSON 性能数据 总结 前言 在…

基于windows系统使用Python对于pc当前的所有窗口的相关操作接口

对于windows系统的电脑使用Python可以对其当前的窗口进行宏观的查询等操作 派生博客1:python对pc的窗口进行操作(windows) 派生博客2python获取当前pc的分辨率(windows) 派生博客3使用uiautomation模块来对基于windows系统的pc中的前端界面进行自动化测试(查找控件&#xff…

【msg_msg】corCTF2021-msgmsg 套题

前言 该套题共两题&#xff0c;一道简单模式 fire_of_salvation&#xff0c;一道困难模式 wall_of_perdition&#xff0c;都是关于 msg_msg 的利用的。这题跟之前的 TPCTF2023 core 的很像&#xff08;应该是 TPCTF2023 core 跟他很像&#xff0c;bushi&#xff09;。 其中 f…

C#编程题分享(5)

判断质数问题 输⼊⼀个正整数&#xff0c;判断该数是否是质数。如果为质数输出 yes&#xff0c;如果不是输出no 样例输⼊113 输出yes int n Convert.ToInt32(Console.ReadLine()); int count 0; for (int i 1; i < n 1; i) {if (n % i 0) // 判断该数能被整除{coun…

传媒行业CRM:打造高效客户管理,提升品牌影响力

传媒行业充满竞争和变化&#xff0c;传媒企业面临着客户管理不透明、业务流程混乱、销售数据分析不足&#xff0c;无法优化营销策略和运营管理等问题。CRM系统是企业实现数智化管理的神器&#xff0c;可以有效解决这些问题。下面说说&#xff0c;传媒行业CRM系统推荐。 1、建立…

docker (容器数据卷、创建、读写规则)-day03

一、容器数据卷概念 卷就是目录或文件&#xff0c;存在于一个或多个容器中&#xff0c;由docker挂载到容器&#xff0c;但不属于联合文件系统&#xff0c;因此能够绕过Union File System提供一些用于持续存储或共享数据的特性&#xff1a;数据卷的设计目的就是数据的持久化&am…

JAVA代码优化:异步任务管理器

异步任务管理器&#xff08;AsyncManager&#xff09;&#xff0c;用于执行异步任务并管理任务调度线程池。 实现了一个简单的异步任务管理器&#xff0c;通过调度线程池来执行异步任务&#xff0c;并提供了对任务调度线程池的关闭方法。这样可以方便地处理需要异步执行的任务…

python列出多重继承类的每个对象属性

1 python列出多重继承类的每个对象属性 1.1 列出实例属性 python通过__dict__列出实例属性。 &#xff08;1&#xff09; 实例.__class__&#xff1a;获取实例所属类。 &#xff08;2&#xff09; 类.__name__&#xff1a;获取类的名称。 &#xff08;3&#xff09; 实例.…

js进阶笔记之Promise

目录 为什么用Promise Promise的基本语法 Promise的then方法 then的链式调用 Promise的相关方法 为什么用Promise JS代码分为同步代码和异步代码 同步: 代码依次执行&#xff0c;上一个任务完成后&#xff0c;下一个任务才执行 异步 遇到耗时任务不必等待其完成&#xf…

Java项目调用C/C++ SDK的方案汇总

Java项目调用C/C SDK的方案汇总 背景调研JNIJNativeJNAJavaCPP 背景 Java项目中需要调用到一个C项目&#xff0c;于是对目前通用的解决方案做了一些调研&#xff0c;这里做一个汇总。 调研 JNI JNI&#xff1a;Java Native Interface&#xff0c;JNI是一套编程接口&#xf…

基于社区电商的Redis缓存架构-库存模块缓存架构(下)

基于缓存分片的下单库存扣减方案 将商品进行数据分片&#xff0c;并将分片分散存储在各个 Redis 节点中&#xff0c;那么如何计算每次操作商品的库存是去操作哪一个 Redis 节点呢&#xff1f; 我们对商品库存进行了分片存储&#xff0c;那么当扣减库存的时候&#xff0c;操作…

《代码整洁之道》摘抄

《代码整洁之道&#xff1a;程序员的职业素养》摘抄 1 专业主义&#xff08;开发人员应该怎么保证代码质量&#xff09; 专业人士&#xff0c;就是能对自己犯下的错误负责的人&#xff0c;哪怕那些错误实际上在所难免。所以&#xff0c;雄心勃勃的专业人士们&#xff0c;你们…

PyEcharts快速上手_Python数据分析与可视化

PyEcharts快速上手 导入图表类型添加数据设置图表样式输出图表链式调用 导入图表类型 和其他库的导入方法一样&#xff0c;在绘图之前首先要在文件开头导入所需图表类型。 from pyecharts.charts import BarBar 类型是柱状图/条形图在 pyEcharts 中的英文名。 pyEcharts 中有…