Kafka+PostgreSql,构建一个总线服务

之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。

环境安装

安装Kafka

官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。

我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可

下载安装包

注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。

wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz

安装

tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

这里安装完成后的路径是这样子的

重点关注的就是bin,config和logs这3个目录。

启动服务

官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper

先启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

在启动kafka服务

bin/kafka-server-start.sh config/server.properties

后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇

启动后控制台的输出是这样

这样,一个kafka的服务节点就启动了。

对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。

*端口转发(仅WSL2环境)

在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79

后面那个ip地址就写宿主机给WSL环境下发的地址

此外,宿主机和wsl环境都放开9092(或者你设置的)端口

链接测试

这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令

本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可

至此,一个本地的Kafka节点就基本配置完成了

安装PostgreSql

这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT

也可以用docker,快速部署一个节点做本地的测试。

docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

开发测试

新建项目

这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。

总之创建完成后,我的项目长这样

安装依赖

我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。

这里我的项目文件长这样

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net8.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Dapper" Version="2.1.35" /><PackageReference Include="DotNetCore.CAP" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /></ItemGroup></Project>

注入服务

这里主要注入pg和Kafka

builder.Services.AddCap(x =>
{x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");x.UseKafka("localhost:9092");x.UseDashboard();
});

测试的业务代码

在常规的controller中注入服务

public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher = capPublisher;}
}

写一个生产者接口

public async Task<IActionResult> Producer()
{Console.WriteLine("生产者发布消息: " + DateTime.Now);await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();
}

再写一个延时发送消息的生产者接口

public async Task<IActionResult> ProducerDelay()
{Console.WriteLine("生产者发布延时消息: " + DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);return Ok();
}

创建消费者

[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{Console.WriteLine("订阅到消息: " + value);
}

我们访问下接口看下控制台的打印效果

可以看到,订阅到的时间和生产者发送的实际是一致的。

再试下延时发送

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

再看下PostG里保存的消息记录

这是生产记录

这是消费记录

注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。

CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。

好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。

小总结

实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。

至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。

更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。

好了,就这些吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/54123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为CNA VRM搭建(使用vmware worfstartion搭建)

创建虚拟机&#xff1a; 自定义→高级 选择硬件兼容性&#xff1a;默认安装版本&#xff0c;如果未来想要将此虚拟机安装到其他电脑&#xff0c;其他电脑版本过低&#xff0c;此时可以向下兼容&#xff0c;这里我们默认版本 稍后安装操作系统&#xff1a; CNA采用Euler OS系统…

MySQL练手题--体育馆的人流量(困难)

一、准备工作 Create table If Not Exists Stadium (id int, visit_date DATE NULL, people int); Truncate table Stadium; insert into Stadium (id, visit_date, people) values (1, 2017-01-01, 10); insert into Stadium (id, visit_date, people) values (2, 2017-01-02…

springboot luttuc redis 集成protobuf,手动序列化反序列化

前置需知&#xff1a; 1.本文章和网上大部分博客配置不太一样&#xff0c;各位看官要分析一下自己的需求。集成protobuf 本文章主要是手动调用protobuf的序列化方法&#xff0c;而不是交由springboot 去做&#xff0c;会偏向原生java 使用方式 2.由于为了和公司其他的项目达成…

如何修改BP神经网络的传递函数

BP神经网络每种传递函数都有自己的特点,输入输出值不同,线性和非线性不同,对于有些模型,需要做出有针对性的调整,需要自定义传递函数,这是修改的原因之一,有些模型,数据有一定的物理意义,或者其他特殊要求,有些因子需要单独处理,这个时候也需要自定义传递函数,这是…

HTML + CSS - 网页布局之一般布局浮动布局

1. 一般布局 1.1 一般布局相关参数 元素内容常常可以想像为放在一个盒子里&#xff0c;然后在周边加上内边距&#xff0c;边框和外边距&#xff0c;是盒子模型 默认一个块级区域会填充父类所有的行向空间&#xff0c;并且沿着块伸长容纳其内容&#xff0c;可以为块状体设置某…

实习项目|苍穹外卖|day10

Spring Task cron 表达式 入门案例 订单状态定时处理 通知用户支付&#xff01;通知商家完成订单&#xff01; Scheduled(cron "0 0/1 * * * ? ")public void processTimeoutOrder(){log.info("定时处理超时订单: {}", LocalDateTime.now());//答案是…

Stable Diffusion AI算法,实现一键式后期处理与图像修复魔法

在当今数字影像时代&#xff0c;后期处理技术已成为将原始图像转化为视觉上令人惊叹艺术作品的点睛之笔。随着人工智能技术的飞速发展&#xff0c;尤其是Stable Diffusion技术在图像处理领域的应用&#xff0c;图片后期处理已达到前所未有的高度&#xff0c;为摄影师、设计师及…

如何从github中克隆指定文件夹

一般来说&#xff0c;我们使用git clone <url> 是会克隆整个仓库下来的&#xff0c;但是某些时候我们可能只需要仓库中的某个/某几个文件夹即可&#xff0c;从而避免下载很多冗余的内容 Git 2.25.0 (Jan 2020)提供了sparse-checkout来实现此内容&#xff0c;一下给出具体…

【STM32 HAL库】IIC通信与CubeMX配置

【STM32 HAL库】IIC通信与CubeMX配置 前言理论IIC总线时序图IIC写数据IIC读数据 轮询模式CubeMX配置应用示例AHT20初始化初始化函数读取说明读取函数 中断模式CubeMX配置状态机图fsm.caht20.c DMA模式CubeMX配置代码 前言 本文为笔者学习 IIC 通信的总结&#xff0c;基于keysk…

证券api接口,一个开源Python量化交易平台项目需要考虑哪些方面

炒股自动化&#xff1a;申请官方API接口&#xff0c;散户也可以 python炒股自动化&#xff08;0&#xff09;&#xff0c;申请券商API接口 python炒股自动化&#xff08;1&#xff09;&#xff0c;量化交易接口区别 Python炒股自动化&#xff08;2&#xff09;&#xff1a;获取…

如何进行数字化基础设施的构建呢?

数字化基础设施的构建是一个复杂而系统的过程&#xff0c;它涉及多个方面和层次的建设。以下是一个详细的构建步骤和关键点&#xff1a; 一、明确建设目标和需求 战略规划&#xff1a;结合企业的长期发展目标&#xff0c;明确数字化基础设施建设的总体方向和具体目标。需求分析…

24年9月通信基础知识补充2

看文献过程中不断发现有太多不懂的基础知识,故长期更新这类blog不断补充在这过程中学到的知识。由于这些内容与我的研究方向并不一定强相关,故记录不会很深入请见谅。 【通信基础知识补充2】9月通信基础知识补充2 一、卫星通信中的 AoI 和 UoI 技术详解1.1. AoI(Age of Info…

LabVIEW中AVI帧转图像数据

在LabVIEW中&#xff0c;有时需要将AVI视频文件的帧转换为图像数据进行进一步处理。下面详细讲解了如何从AVI视频提取单帧并将其转换为图像数据集群&#xff0c;以便与其他图像处理VI兼容。 问题背景&#xff1a; 用户已经拥有能够处理JPEG图像数据集群的VI&#xff0c;现在希…

房产销售系统:SpringBoot技术应用案例

第二章关键技术的研究 2.1相关技术 房产销售系统是在Java MySQL开发环境的基础上开发的。Java是一种服务器端脚本语言&#xff0c;易于学习&#xff0c;实用且面向用户。全球超过35&#xff05;的Java驱动的互联网站点使用Java。MySQL是一个数据库管理系统&#xff0c;因为它的…

学习大数据DAY58 增量抽取数据表

作业 1 SQL 优化的常见写法有哪些 - 面试经常被问 使用索引&#xff1a;合理创建和使用索引是提高查询效率的关键。索引可以加速数据的检 索速度&#xff0c;但是索引也会占用额外的存储空间&#xff0c;并且在插入、删除和更新操作时会 有额外的开销。 避免全表扫描&…

线段树介绍及线段树的使用场景

1.线段树用来解决什么问题 假如说你有一个数组&#xff0c;数组下标为 0-1000&#xff0c;然后对外提供一些方法&#xff0c; 1.1比如说你对外提供add方法&#xff0c;add方法&#xff08;1,200,6&#xff09;&#xff0c;请你把从1 到 200 位置所有的值 加上6 1.2更新&…

华为OD机试真题-水仙花数-2024年OD统一考试(E卷)

最新华为OD机试考点合集&#xff1a;华为OD机试2024年真题题库&#xff08;E卷D卷C卷&#xff09;_华为od机试题库-CSDN博客 题目描述 所谓 水仙花数&#xff0c;是指一个n位的正整数&#xff0c;其各位数字的n次方和等于该数本身。 例如153是水仙花数&#xff0c;153是一个3…

C#+EmguCV合并视频文件

EmguCV是opencv的C#库&#xff0c;该库可以用来处理图像&#xff0c;还可以处理视频。以下是视频合并的方法&#xff0c;不过效率比较低。 /// <summary> /// 合并多个视频为新的视频() /// </summary> /// <param name"videoFiles"></p…

Qt与MQTT交互通信

MQTT全称是&#xff08;Message Queuing Telemetry Transport&#xff09;&#xff0c;即消息队列遥测传输协议 是一种基于发布/订阅&#xff08;Publish/Subscribe&#xff09;模式的轻量级通讯协议&#xff0c;并且该协议构建于TCP/IP协议之上&#xff0c;常用于互联网中&am…

leetcode hot100刷题【持续更新】

leetcode hot100刷题【持续更新】 一、哈希 1.两数之和 ​ 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案&#xff0c;并且你不…