Redis发布订阅机制学习|kafka相关经验

Redis 发布订阅机制
简介:
Redis 发布订阅(Pus/Sub)是一种消息通信模式:发送者通过 PUBLISH发布消息,订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。主要由「发布者」、「订阅者」、「Channel」三个部分组成。
发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。

1 基于频道的发布订阅
//在redisServer中,有一个字典类型字段pubsub_channels 用来保存订阅信息,其中key为频道,value为订阅该频道的客户端
struct redisServer{
pid_t pid;
//将频道映射到已订阅客户端的列表
dict *pubsub_channels
}

2 基于模式的发布订阅
//在redisServer中有一个pubsub_patterns属性,该属性表示一个链表,链表中保存着所有和模式相关的信息
struct redisServer{
list *pubsub_patterns;
}
typedef struct pubsubPattern{
client *client; – 订阅模式客户端
robj *pattern; --被订阅的模式
} pubsubPattern;

需要注意的是,发布消息与监听消息要运行在不同的 JVM,如果使用同一个 redissonClient 发布的话,不会监听到自己的消息。

缺陷:
发布者不知道订阅者是否收到发布的消息
订阅者不知道自己是否收到了发布者发出的所有消息
发送者不能获知订阅者的执行情况
没人知道订阅者何时开始收到消息

实现
生产者代码

 * 发布消息到 Topic* @param message 消息* @return 接收消息的客户端数量
public long sendMessage(String message) {RTopic topic = redissonClient.getTopic(CHANNEL);long publish = topic.publish(message);log.info("生产者发送消息成功,msg = {}", message);return publish;
}

消费者代码

public void onMessage() {// in other thread or JVMRTopic topic = redissonClient.getTopic(CHANNEL);topic.addListener(String.class, (channel, msg) -> {log.info("channel: {} 收到消息 {}.",  channel, msg);});
}

Spring boot整合redis

消息监听配置
@Configuration
public class RedisSubConfig {public static final String SUB_KEY = "chat";//频道channel* redis消息监听器容器* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理* @param connectionFactory* @param listenerAdapter* @return@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅了一个频道container.addMessageListener(listenerAdapter, new PatternTopic(RedisSubConfig.SUB_KEY));return container;}* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法@BeanMessageListenerAdapter listenerAdapter(RedisReceiver receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}/*** redis 读取内容的template* @param connectionFactory* @return*/@BeanStringRedisTemplate template(RedisConnectionFactory connectionFactory) {return new StringRedisTemplate(connectionFactory);}
}

接收消息

@Service
public class RedisReceiver {public void receiveMessage(String message) {System.out.println("接收消息:" + message);}
}

采用定时器发布消息

@EnableScheduling //开启定时器功能
@Component
public class MessageSender {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Scheduled(fixedRate = 5000) //间隔5s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息public void sendMessage(){stringRedisTemplate.convertAndSend("chat", "hello "+ new Date());}
}

kafka相关
消息头格式
RecordHeaders(headers = [RecordHeader(key = messageType, value = [0, 0, 0, 1]), RecordHeader(key = operationCode, value = [0, 0, 0, 1]), RecordHeader(key = messageId, value = [52, 52, 52, 53, 53, 53])], isReadOnly = false)
使用java读取消息头

private MsgHeader parseMsgHeaders(Headers headers) {MsgHeader msgHeader = new MsgHeader();Header xxxHeader = headers.lastHeader("xxx");if (xxxHeader != null) {msgHeader.setXXX(new String(xxxHeader.value()));}return msgHeader;}

使用go发送消息头

headers := []sarma.RecordHeader{sarama.RecordHeader{Key: []byte("kkk"),Value: []byte("vvv"),
}}
msg := &sarama.ProducerMessage{Topic: "topic",Key: sarama.StringEncoder("  "),Value: sarama.StringEncode("  "),Headers: headers,
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/16437.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux -网络编程-多线程并发服务器

目录 1.三次握手和四次挥手 2 滑动窗口 3 函数封装思想 4 高并发服务器 学习目标: 掌握三次握手建立连接过程掌握四次握手关闭连接的过程掌握滑动窗口的概念掌握错误处理函数封装实现多进程并发服务器实现多线程并发服务器 1.三次握手和四次挥手 思考: 为什么…

[SQL挖掘机] - 多表连接: union

介绍: sql中的union是用于合并两个或多个select语句的结果集的操作符。它将多个查询的结果合并成一个结果集,并自动去除重复的行。请注意,union操作要求被合并的查询返回相同数量和类型的列。 用法: union的基本语法如下: select_stateme…

【javaSE】 面向对象程序三大特性之继承

目录 为什么需要继承 继承的概念 继承的语法 注意事项 父类成员访问 子类中访问父类的成员变量 子类和父类不存在同名成员变量 子类和父类成员变量同名 访问原则 子类中访问父类的成员方法 成员方法名字不同 总结: 成员方法名字相同 总结: …

深入理解JVM:Java使用new创建对象的流程

1、创建对象的几种方式 ①new 对象 ②反射 ③对象的复制 ④反序列化 2、创建对象流程 先看看常量池里面有没有,如果有,就用常量池的看这个类有没有被加载过,如果没有,就执行类加载以及类的初始化。(对象的大小&#…

【决策树-鸢尾花分类】

决策树算法简介 决策树是一种基于树状结构的分类与回归算法。它通过对数据集进行递归分割,将样本划分为多个类别或者回归值。决策树算法的核心思想是通过构建树来对数据进行划分,从而实现对未知样本的预测。 决策树的构建过程 决策树的构建过程包括以…

数据结构:单链表的实现(C语言)

个人主页 : 水月梦镜花 个人专栏 : 《C语言》 《数据结构》 文章目录 前言一、单链表实现思路和图解1.节点的定义(SListNode)2.申请一个节点(BuySListNode)3.单链表打印(SListPrint)4.单链表尾插(SListPushBack)5.单链表的头插(SListPushFront)6.单链表的…

【目标检测】基于yolov5的水下垃圾检测(附代码和数据集,7684张图片)

写在前面: 首先感谢兄弟们的订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。 路虽远,行则将至;事虽难,做则必成。只要有愚公移山的志气、滴水穿石的毅力,脚踏实地,埋头苦干,积跬步以至千里,就…

Spring Boot概述:构建稳定、高效的分布式应用

目录 ✨正文 ✨特性 ✨与spring的关系 ✨Spring Cloud的关系 ✨总结 ✨正文 Spring Boot是一个备受欢迎的Java开发框架,旨在简化和加速Spring应用程序的开发和部署。作为Spring生态系统的一部分,Spring Boot通过提供合理的默认配置和开箱即用的功能…

行云管家荣获CFS第十二届财经峰会 “2023产品科技创新奖”

7月26日至27日,CFS第十二届财经峰会暨2023可持续商业大会在京盛大召开。峰会主题为“激活高质量发展澎湃活力”,超1000位政商领袖、专家学者、企业及媒体代表出席了本次盛会,共同分享新技术新产品新趋势、研判全球新挑战与新变局下企业的机遇…

【方法】PDF可以转换成Word文档吗?如何操作?

很多人喜欢在工作中使用PDF,因为PDF格式可以准确地保留文档的原始格式,比如字体、图像、布局和颜色等。 但如果编辑文档的话,PDF还是没有Word文档方便。那可以将PDF转换成Word格式,再来编辑吗?如何操作呢?…

接口自动化测试平台

下载了大神的EasyTest项目demo修改了下<https://testerhome.com/topics/12648 原地址>。也有看另一位大神的HttpRunnerManager<https://github.com/HttpRunner/HttpRunnerManager 原地址>&#xff0c;由于水平有限&#xff0c;感觉有点复杂~~~ 【整整200集】超超超…

Java:Java程序通过执行系统命令调用Python脚本

本文实现功能&#xff1a;Java程序调用Python脚本 Python脚本 import sysdef add(x, y):return x yif __name__ "__main__":print(add(int(sys.argv[1]), int(sys.argv[2])))直接执行 $ python math.py 1 2 3Java程序调用Python脚本 package io.github.mouday.…

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(13)-Fiddler请求和响应断点调试

1.简介 Fiddler有个强大的功能&#xff0c;可以修改发送到服务器的数据包&#xff0c;但是修改前需要拦截&#xff0c;即设置断点。设置断点后&#xff0c;开始拦截接下来所有网页&#xff0c;直到取消断点。这个功能可以在数据包发送之前&#xff0c;修改请求参数&#xff1b…

JVM复习(史上最全!!!)

一、JDK、JRE、JVM的区别 JDK: 全称Java Development Kit&#xff0c;是 Java 语言的软件开发工具包&#xff0c;主要用于移动设备、嵌入式设备上的Java应用程序。JDK是整个Java开发的核心。 JRE: JRE&#xff0c;全称Java Runtime Environment&#xff0c;是指Java的运行环境&…

c++通过自然语言处理技术分析语音信号音高

对于语音信号的音高分析&#xff0c;可以使用基频提取技术。基频是指一个声音周期的重复率&#xff0c;也就是一个声音波形中最长的周期。 通常情况下&#xff0c;人的声音基频范围是85Hz到255Hz。根据语音信号的基频可以推断出其音高。 C中可以使用数字信号处理库或语音处理库…

开放麒麟1.0发布一个月后,到底怎么样?另一款操作系统引发热议

具有里程碑意义 7月5日&#xff0c;国产首个开源桌面操作系统“开放麒麟1.0”正式发布。 标志着我国拥有了操作系统组件自主选型、操作系统独立构建的能力&#xff0c;填补了我国在这一领域的空白。 举国欢庆&#xff0c;算的上是里程碑意义了&#xff01; 发布后用着如何&a…

【业务功能篇57】Springboot + Spring Security 权限管理 【上篇】

4.权限管理模块开发 4.1 权限管理概述 4.1.1 权限管理的意义 后台管理系统中&#xff0c;通常需要控制不同的登录用户可以操作的内容。权限管理用于管理系统资源&#xff0c;分配用户菜单、资源权限&#xff0c;以及验证用户是否有访问资源权限。 4.1.2 RBAC权限设计模型 …

redis的并发安全问题:redis的事务VSLua脚本

redis为什么会发生并发安全问题&#xff1f; 在redis中&#xff0c;处理的数据都在内存中&#xff0c;数据操作效率极高&#xff0c;单线程的情况下&#xff0c;qps轻松破10w。反而在使用多线程时&#xff0c;为了保证线程安全&#xff0c;采用了一些同步机制&#xff0c;以及多…

windows中注册redis服务启动时报1067错误

注册完redis服务&#xff0c;打开计算机 服务时确实有redis服务存在&#xff0c;但是点击启动时却报1067错误&#xff0c;而命令行用redis-server.exe redis.windows.conf 命令却也可以启动 查看6379的端口也没有被占用&#xff08;netstat -ano | findstr :6379&#xff09; …

Mac 定时重启 TouchBar 脚本(缓解闪烁问题)

背景 Mac 笔记本 TouchBar 是真的脆啊&#xff0c;合盖使用一段时间就废了&#xff0c;右侧一直闪烁简直亮瞎眼 &#x1f602; 经过观察&#xff0c;总结出闪烁规律如下&#xff1a; 工作状态&#xff1a;不断操作电脑时&#xff0c;触控栏处于工作状态&#xff0c;几乎不闪…