golang—kafka架构原理快速入门以及自测环境搭建(docker单节点部署)

kafka

Apache Kafka 是一个分布式的流处理平台。它具有以下特点:

  • 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列
  • 支持数据实时处理
  • 能保证消息的可靠性投递
  • 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
  • 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量

架构简介

在这里插入图片描述

Messages and Batches

kafka基本数据单元为消息,为了提高网络使用效率,采用批写入方式

Topics and Partitions

topic为kafka消费主题,每个主题下有若干分区(partitions),Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上。由于多个partition的特性,kafka无法保证topic范围内的消息顺序,但是可以保证单个分区内消息的顺序

broker

broker 对应着一个 kafka 的进程;一个 kafka 集群会包含多个 broker;同时需要在这些 broker中选举出一个controller,选举是通过 zk 来实现;controller 负责协调管理集群状态,同时也负责 partition 的 leader 选举;

Producers And Consumers
  • 消息的生产者,负责将消息发送到不同的 partition 中;消息的生产需要考虑幂等性、正确性以及安全性;kafka 引入了 ack 机制;ack 为 0,则不需要 kafka 回复,此时可能造成数据丢失;ack为 1, 则需要等待 leader 回复,此时其他 replica 可能还没同步 leader 挂掉,数据安全性没法得到保证;ack 为 -1,则需要等待其他 replica 同步完成后,才回复,此时数据最健壮,但是效率最低;
  • 消息的消费者,负责消费消息;一个 partition 对应一个consumer, 而一个 consumer 可以对应多个 partition;消费同一类消息的高吞吐量,可以设置 consumer group;
副本同步策略

每个分区里有多个副本,这些副本有一个leader。只有副本全部同步完成才发送ack。这里指同步策略,是全量同步,而不是半数以上同步了就认为该数据已经commit。不过也可以设置最少同步副本数提高性能(min.insync.replicas)

ISR

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

数据可见性

需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
在这里插入图片描述

kafka读写机制

producer写流程

producer写入消息流程如下:

  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • 将消息按批次发送到partition的Leader上

  • 其他Follower从Leader上复制数据

  • 依次返回ACK

  • 直到所有ISR中的数据写完成,才完成提交,整个写过程结束
    在这里插入图片描述

consumer 读流程
  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • consumer将自己保存的offset发送给Leader

  • Leader根据offset等信息定位到segment(索引文件和日志文件)

  • 根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给consumer

kafka集群选举

副本leader选举

只有完全追上Leader数据的follower才能进行选举,Leader发生故障之后,会从ISR中选出一个新的Leader

controller选举

这部分由ZK完成,不过高本版kafka引入kratf,就可以完成去ZK化了。 ratf是一种简单易理解并且严格复合数学归纳的共识算法。

自测环境搭建

zk

docker pull wurstmeister/zookeeper
docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper

kafka

 docker pull wurstmeister/kafkadocker run -itd --name kafka -p 9092:9092 -e HOST_IP=10.74.18.61 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=10.74.18.61 --link zookeeper:zookeeper wurstmeister/kafka

go链接kafka生产消费

go版本:1.21
生产者

package mainimport ("fmt""github.com/IBM/sarama"
)func main() {config := sarama.NewConfig()// 等待服务器所有副本都保存成功后的响应,对应ack=-1config.Producer.RequiredAcks = sarama.WaitForAll// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区config.Producer.Partitioner = sarama.NewRandomPartitioner// 是否等待成功和失败后的响应config.Producer.Return.Successes = true// 使用给定代理地址和配置创建一个同步生产者producer, err := sarama.NewSyncProducer([]string{"10.74.18.61:9092"}, config)if err != nil {panic(err)}defer producer.Close()//构建发送的消息,msg := &sarama.ProducerMessage{//Topic: "test",//包含了消息的主题Partition: int32(10),                   //Key:       sarama.StringEncoder("key"), //}var value stringvar msgType stringfor {_, err := fmt.Scanf("%s", &value)if err != nil {break}fmt.Scanf("%s", &msgType)fmt.Println("msgType = ", msgType, ",value = ", value)msg.Topic = msgType//将字符串转换为字节数组msg.Value = sarama.ByteEncoder(value)//fmt.Println(value)//SendMessage:该方法是生产者生产给定的消息//生产成功的时候返回该消息的分区和所在的偏移量//生产失败的时候返回errorpartition, offset, err := producer.SendMessage(msg)if err != nil {fmt.Println("Send message Fail", err)}fmt.Printf("Partition = %d, offset=%d\n", partition, offset)}
}

消费者

package mainimport ("fmt""sync""github.com/IBM/sarama"
)var (wg sync.WaitGroup
)func main() {// 根据给定的代理地址和配置创建一个消费者consumer, err := sarama.NewConsumer([]string{"10.74.18.61:9092"}, nil)if err != nil {panic(err)}//Partitions(topic):该方法返回了该topic的所有分区idpartitionList, err := consumer.Partitions("test")if err != nil {panic(err)}for partition := range partitionList {//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者//如果该分区消费者已经消费了该信息将会返回error//sarama.OffsetNewest:表明了为最新消息pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)if err != nil {panic(err)}defer pc.AsyncClose()wg.Add(1)go func(sarama.PartitionConsumer) {defer wg.Done()//Messages()该方法返回一个消费消息类型的只读通道,由代理产生for msg := range pc.Messages() {fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))}}(pc)}wg.Wait()consumer.Close()
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/183699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【华为交换】交换机MSTP+VRRP配置

功能简介 企业用户访问外网的流量,可能会同时包含二层流量和三层流量(例如,企业内二层VPN用户和三层VPN用户访问MPLS公网)。企业用户希望接入网络既能包含多条接入链路(不同链路之间互为备份)以保障接入的…

【brpc学习实践十一】session-local与thread-local应用与brpc抽象工厂模式实践

什么是session-local与thread-local 百度内的检索程序大量地使用了thread-local storage (缩写TLS),有些是为了缓存频繁访问的对象以避免反复创建,有些则是为了在全局函数间隐式地传递状态。你应当尽量避免后者,这样的函数难以测试,不设置thread-local变量甚至无法运行。s…

哪些因素会影响香港服务器的下载速度_Maizyun

哪些因素会影响香港服务器的下载速度? 随着互联网的普及和快速发展,越来越多的企业和个人选择使用香港服务器来托管其网站、应用程序和其他在线服务。 然而,很多因素可能会影响香港服务器的下载速度。 本文将探讨影响香港服务器下载速度的几…

Java中xml映射文件是干什么的

Java中的XML映射文件主要用于将Java对象与XML文档之间进行转换。它通常用于处理数据交换和存储,例如将Java对象转换为XML格式以便在网络上传输或保存到文件中,或者将XML文档解析为Java对象以进行处理。这种转换可以通过Java的JAXB(Java Archi…

springMVC实验(二)—调式工具APIFOX的使用

【知识要点】 后端开发调试工具 前后端分离已经成为互联网类软件开发主流模式,没有前端操作的支持,如何调试后端程序的就是开发人员必须解决的问题。如:get类请求可以直接使用浏览器就能模拟测试,但是post、put等类型的请求&…

layui 日期选择框弹出后消失

原因是窗口太小,日期窗碰撞边缘后会消失,解决方法是增加 trigger: click 属性。 laydate.render({ elem: #kp_date , type: date , trigger: click });

thinkphp5.1 验证器

thinkPHP5——验证器的使用总结-CSDN博客

YoloV8改进策略:AKConv即插即用,轻松涨点

文章目录 摘要1、引言2、相关工作3、方法3.1、定义初始采样位置3.2、可变卷积操作3.3、扩展AKConv4、实验4.1、在COCO2017上的目标检测实验4.2、在VOC 7+12上的目标检测实验4.3、在VisDrone-DET2021上的目标检测实验4.4、比较实验4.5、探索初始采样形状5、分析讨论6、结论Yolov…

数据结构与算法-D1数据结构引入

1、结构体 2、内存(malloc) 意义: 1、提高编程能力 2、可复用性、可维护性、可读性、效率更高 数据结构:研究数据之间关系,包括逻辑结构、存储结构、数据操作 逻辑结构: 按每个元素可能具有的直接前趋数和直接后趋数将逻辑结构…

Ubuntu 环境下 NFS 服务安装及配置使用

需求:公司内部有多台物理服务器,需要A服务器上的文件让B服务器访问,也就是两台服务器共享文件,当然也可以对A服务器上的文件做权限管理,让B服务器只读或者可读可写 1、NFS 介绍 NFS 是 Network FileSystem 的缩写&…

前端纯js导入导出json配置文件

在做后台系统需求的时候,有个需求是需要把当前表单配置导出,在另一个配置项下,导入这些配置,相当于做了一下配置拷贝。通常我们导出下载一个文件,是先向后端发起请求,由后端处理数据后,再返回文…

交调与互调

交调与互调 概念参考: 《高频电子线路》张肃文 《射频技术》于宝明、丁宁 交调(Cross-modulation) 如果接收机的前端电路选择性不够好,是有用信号与干扰信号同时加到接收机的输入端,而且这两种信号都是受音频调制…

jvm 调优参数

-XX:AlwaysPreTouch 指定JVM启动时即刻分配整个堆内存空间;应用启动会变慢,但是运行时变快。 -XX:MaxRAMPercentage60.0 指定JVM最大堆内存使用比例为60%;适用于容器部署 -XX:MinRAMPercentage60.0 指定JVM最小堆内存使用比例为60%&#xff1…

可以在电脑桌面展示工作计划表的软件

很多上班族都表示自己在工作时,会面临大量且复杂的工作任务,这时候就会拖延工作,或者感觉时间不够用,所以需要有明确的工作计划来指导自己如何分类时间和精力,确保每项工作任务都能够按时完成。如果需要制定每天的工作…

力扣labuladong一刷day22天二分搜索共2题

力扣labuladong一刷day22天二分搜索共2题 一、704. 二分查找 题目链接&#xff1a;https://leetcode.cn/problems/binary-search/ 思路&#xff1a;典型的二分查找&#xff0c;如果是左闭右闭那么说明left < right 。如果左闭右开那么说明 left < right class Solutio…

C++学习之路(十二)C++ 用Qt5实现一个工具箱(增加一个XML文本格式化功能)- 示例代码拆分讲解

上篇文章&#xff0c;我们用 Qt5 实现了在小工具箱中添加了《进制转换器功能》功能。为了继续丰富我们的工具箱&#xff0c;今天我们就再增加一个平时经常用到的功能吧&#xff0c;就是「 XML文本格式化 」功能。下面我们就来看看如何来规划开发一个这样的小功能并且添加到我们…

模拟算法【1】

文章目录 &#x1f600;1576. 替换所有的问号&#x1f606;题目&#x1f929;算法原理&#x1f642;代码实现 &#x1f60a;495.提莫攻击&#x1fae0;题目&#x1f609;算法原理&#x1f917;代码实现 模拟算法 通俗的来说&#xff0c;模拟算法就是依葫芦画瓢&#xff0c;将题…

React 签字手写签名组件 react-signature

安装依赖包 npm install uiw/react-signature示例代码 import React, { useRef } from "react"; import Signature from uiw/react-signature;export default function App() {const $svg useRef(null);const handle (evn) > $svg.current?.clear();return (…

Rust语言入门教程(十四) - 闭包Closure

什么是闭包 闭包在 Rust 中是非常强大的功能&#xff0c;允许你编写更灵活和表达性的代码。闭包的语法和功能在某些方面类似于其他语言&#xff08;如 JavaScript 或 Python&#xff09;中的匿名函数或 lambda 表达式。 在Rust中&#xff0c;当我们想要生成一个新的线程&…

[ISCTF2023] Crypto/PWN/Reverse

最近新生赛还挺多&#xff0c;不过这个开始后注册页面就被删了&#xff0c;没注册上。拿别人的附件作了下。 Crypto 七七的欧拉 这题只给了n,e,c这种情况一般正常没法解&#xff0c;猜n不正常 import gmpy2 import libnum from crypto.Util.number import *flagbISCTF{****…