大数据之Kafka

Kafka概述

传统定义:一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
最新定义:一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。最主要的功能是做数据的缓冲,相较于flume的channel, 能力更强。

应用场景:

  1. 缓冲/消峰:解决生产消息和消费消息的处理速度不一致的情况。
  2. 解耦:只需知道如何连接kafka,作用类似于交换机。
  3. 异步通信:可以将事务给kafka,自己去处理其他事务。

消息队列的两种模式:

  • 点对点模式:消费者拉取数据后删除数据。优点是简单速度快,缺点是不方便实现多用户需要获取同一数据的情况。
  • 发布/订阅模式:可以有多个topic主题,消费者拉取数据后不删除数据。

Kafka的基础架构

  1. 为方便扩展,并提高吞吐量,一个topic分为多个partition
  2. 配合分区的设计,提出消费者组的概念,组内内个消费者并行消费,以线程为单位。
  3. 为提高可用性,为每个partiton增加若干副本,类似NameNode HA
  4. 借助zookeeper来实现leader和follower的选举机制,leader是原数据,follower是副本数据。leader主要用于发送和传输,follower主要作为副本保证安全性。

Kafka的安装部署

官网下载地址:http://kafka.apache.org/downloads.html

  1. 上传安装包
  2. 解压安装包
  3. 修改配置文件
  4. 配置环境变量
  5. 编写群启群关脚本kf.sh
#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac

kafka主题相关操作

kafka-topics.sh脚本里面定义了对应相关操作。

  1. 增加主题,kafka-topics.sh --bootstrap-server hadoop102:9092 -- create --topic second --replication-facotr 1 --partitions 1
  2. 删除,是标记删除,预计在1分钟后完全删除。bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
  3. 修改,只能增加分区数量。bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
  4. 查看,bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

生产和消费

  1. 启动生产者:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
  2. 启动消费者:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
  3. 如果单独启动生产者,发送数据,之后再启动消费者,默认不发送之前发送的数据。在消费者启动命令后面添加--from-beginning关键字可以修改为从头拿取数据。

发送流程

  1. Kafka Producer生产者
    • main线程Producer的生产方法
    • interceptors拦截器
    • Serializer序列化器
    • Partitioner分区器,按照批次随机分区。每个批次默认是16K,默认的等待时间是0ms。
  2. RecordAccumulator里面创建双端队列,队列个数等于分区个数
  3. sender进程复制双端队列中的数据发送到Kafka集群,如果成功接收则返回ack应答,否则重新发送,最多重试21亿次。

异步发送和同步发送

sender进程发送请求默认是异步执行,即向kafka集群发送时不管是否收到回复,一直发送,由selector来接收ack和关闭对应的请求进程。在Producer的send方法中有一个Callback对象参数,该对象需要实现一个onCompletetion方法。可以在里面查看到对应方法参数中的元数据的值,里面有主题名称、分区号和偏移量。异步执行时可以发现同一批次分区号是一样的,同步时由于需要等待ack,同一批次的分区号是不同的。

生产者分区

  1. 分区策略
    • 默认分区器
      • 如果指定了分区号,到指定分区
      • 如果是key-value,使用key进行hash分区
      • 粘性分区,如果上一个有分区,跟上一个分区一样,直到数据达到分区容量上限或者等待时间上限进行随机更换分区。
    • UniformStickyPartition分区器:如果key值是固定的,可以使用该分区器
    • 轮询分区器:需要维护一个列表,效率更低。

生产者如何提高吞吐量

  1. 修改从双端队列拿取数据的等待时间,从0ms修改为5-100ms
  2. compression.type: 压缩snappy
  3. 修改批次大小:默认为16K,修改为32KB.

数据可靠性

ack应答级别:

  • 0:生产者发送过来的数据,不需要等数据落盘应答,也就是最多一次。
  • 1:生产者发送过来的数据,Leader收到后应答
  • -1(all): 生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据并落盘后应答。Leader维护了一个动态的in-sync replica set ISR, 如果有某个节点30s内没有回复,则认为该节点死亡。数据有可能重复

数据的去重

幂等性

指producer不论向Broker发送多少次重复数据,Broker都只会持久化一条,保证精准一次。重复数据的判断标准,根据sqlNumber来判断,重发的数据其seqNumber是一样的。
缺点:如果生产者中途宕机,然后重新建立会话时,不能保证不同会话时PID是一样,这时候重新发送重复数据时无法保证幂等性。
解决方案:在Kafka集群中将生产者的信息保存到集群中的某个主题中,如果生产者宕机后重启需要先去读取Kafka集群的状态信息,以保证多会话情况下的幂等性。

数据的有序性

  1. 因为不能保证多分区之间是有序的,只能指定单分区。
  2. 开启幂等性,且元数据request个数小于5个,如果发送失败导致顺序异常,Kafka会按照SeqNumber重新排序。

Flume和Kafka

为何Kafka全方面碾压flume,还会有人使用flume?
这是由于flume使用上只需配置一个文件即可使用,无需编写代码。并且可以使用flume将数据灌入到kafka中,既简单又利用到了kafka的性能,flume和kafka结合使用才是日常开发的常用操作。

Kafka Broker总体工作流程

  1. broker在zk中注册
  2. controller谁先注册,谁说了算
  3. 由选举出来的Controller监听brokers节点变化
  4. Controller决定Leader选举:在isr存活为前提,轮询选举
  5. Controller将节点信息上传到zk
  6. 其他controller从zk同步相关信息

Broker节点的服役和退役

  1. 启动新主机的zookeeper和kafka
  2. 创建一个要均衡的主题vim topics-to-move.json
{"topics": [{"topic": "first"}],"version": 1
}
  1. 生成一个负载均衡的计划bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
  2. 保存生成的计划到文件中
  3. 执行负载均衡计划
  4. 在kafka/datas目录下查看是否正确。
kafka副本

为了提高数据可靠性,副本数量一般设置为两个。

Follower故障处理

LEO(log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1
HW (High Watermark): 所有副本中最小的LEO

高效读取

1. 多分区

2. 稀疏索引

3. 顺序写磁盘

4. 页缓存和零拷贝

页缓存:其实就是把尽可能多的空闲内存当做磁盘缓存来使用。
零拷贝:数据加工处理操作交给生产者和消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/89343.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一表总结前端axios传参与后端springboot接收

方法参数形式后端示例注意get,deleteurl中拼接(RestFul风格)PathVariable url: /test ‘/’ name ‘/’ age, GetMapping("/test/{name}/{age}") public Result find( PathVariable("name") String name, PathVari…

深耕能源+政务 | 云畅科技入选中国信通院2023低代码无代码产业图谱

由中国信息通信研究院(以下简称“中国信通院”)、中国通信标准化协会联合主办的2023数字化转型发展大会暨首届数字原生大会于9月13-14日在北京举办,“2023低代码无代码产业图谱”正式发布! 01 2023年,中共中央、国务院…

北斗卫星导航系统:引领现代林业发展的先锋

北斗卫星导航系统:引领现代林业发展的先锋 随着人类社会的发展,林业作为生态环境保护和经济发展的重要组成部分,也在不断向前发展。为了更好地管理和保护森林资源,我们必须寻求一种新的方式来提高林业管理的效率。而北斗技术的应用…

SpringBoot的excel模板导出

Word的模板导出(参考:https://easyexcel.opensource.alibaba.com/docs/current/quickstart/fill) 创建有两个sheet的excel文件模板 将模板文件放入resource\templates/doc下使用 public void exportUavInfoExcel(HttpServletResponse response, CaseExportRPO cas…

如何去开展软件测试工作

1. 软件测试 在一般的项目中,一开始均为手动测试,由于自动化测试前期投入较大,一般要软件项目达到一定的规模,更新频次和质量均有一定要求时才会上自动化测试或软件测试。 1.1. 项目中每个成员的测试职责 软件测试从来不是某一…

flink使用kryo支持自定义的序列化器

背景 这里所说的序列化器不是指实现TypeSerializer的状态序列化器,而是指flink在使用KryoSerializer序列化器时遇到kryo无法序列化的类型时,通过往kryo中注册某个序列化器类来让kryo可以序列化某个类的实例,所以这里严格意义上应该是说&…

TypeScript学习记录

一、TS开发环境的搭建 1、下载并安装node.js 2、使用npm全局安装typeScript 进入命令行输入:npm i -g typescript 3、创建一个ts文件 4、使用tsc对ts文件进行编译 进入命令行进入ts文件所在目录执行命令:tsc 文件名.ts 二、TS基本变量 1、类型声…

ChatGPT:使用FastJSON库关闭JSON引用检测的方法

ChatGPT&#xff1a;使用FastJSON库关闭JSON引用检测的方法 下面我将输入一个方法&#xff0c;请你记住&#xff1a; ChatGPT&#xff1a; 好的&#xff0c;请输入方法。 RequestMapping("/getLianZhengLabels") public String getLianZhengLabels() {HashMap<St…

Kubernetes(K8s):未来云原生应用的引擎

文章目录 Kubernetes的核心概念和架构为什么K8s是构建云原生应用的首选工具&#xff1f;云原生应用的好处和挑战容器编排的重要性&#xff1a;Docker和KubernetesKubernetes生态系统&#xff1a;核心组件和附加工具实际应用&#xff1a;企业如何在生产环境中使用K8s未来展望&am…

rust trait对象

在拥有继承的语言中&#xff0c;可以定义一个名为shape的基类&#xff0c;该类上有一个draw方法。其他的类比如Button、SelectBox继承shape。它们各自覆盖draw方法。调用这些子类的draw方法时&#xff0c;就可以把它们统一当作shape来使用。不过Rust并没有继承&#xff0c;如果…

做一个贪吃蛇小游戏happy一下

直接Vue上代码 <template><div><div>贪吃蛇</div><canvas id"canvas" width"400" height"400"></canvas></div> </template><script> export default {data() {return {ctx: null,inter…

Android 遍历界面所有的View

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、实践四、 推荐阅读 一、导读 我们…

结构型设计模式——组合模式

摘要 组合模式(composite pattern): 允许你将对象组合成树形结构来表现"整体/部分"层次结构. 组合能让客户以一致的方式处理个别对象以及对象组合。 一、组合模式的意图 将对象组合成树形结构来表示“整体/部分”层次关系&#xff0c;允许用户以相同的方式处理单独…

AVL Cruise 2020.1 安装教程

文章目录 安装包安装破解 安装包 链接&#xff1a;https://pan.baidu.com/s/1GxbeDj_SyvKFyPeTsstvTQ?pwd6666 提取码&#xff1a;6666 安装 安装文件&#xff1a; 双击setup.exe&#xff1a; 一直netx&#xff0c;中间要修改两次路径&#xff0c;第一次是安装位置&#xf…

Flume最简单使用

文章目录 一、简介1、定义2、基础架构 二、快速入门1、解压Flume2、案例一&#xff1a;监控端口号3、案例二&#xff1a;将空目录下文件 三、Flume进阶1、Flume事务2、Flume Agent内部原理3、案例一&#xff1a;监控日志4、案例二&#xff1a;多路复用和拦截器适应4.1 原理4.2 …

二进制十六机制CRC总和异或等工具类

package com.dc.util;import org.apache.http.util.TextUtils; import java.nio.ByteBuffer; import

LCP 06. 拿硬币/2582. 递枕头[java]

LCP 06. 拿硬币 - 力扣&#xff08;LeetCode&#xff09; 桌上有 n 堆力扣币&#xff0c;每堆的数量保存在数组 coins 中。我们每次可以选择任意一堆&#xff0c;拿走其中的一枚或者两枚&#xff0c;求拿完所有力扣币的最少次数。 示例 1&#xff1a; 输入&#xff1a;[4,2,1…

Linux部署elk日志监控系统

目录 一、简介 二、部署elasticsearch 2.1 安装jdk11&#xff08;jdk版本>11&#xff09; 2.2 下载安装包 2.3 授权elk用户 2.4 配置elasticsearch.yml 2.5 启动elasticsearch 三、部署logstash 3.1 启动测试 3.2 可能出现的报错 3.3 指定配置文件启动logstash 3.4 安装El…

Ubuntu 设置开机自动执行脚本

1. 建立service文件 sudo vim /etc/systemd/system/redis-server.service2. redis service文件 [Unit] DescriptionAdvanced key-value store Afternetwork.target Documentationhttp://redis.io/documentation, man:redis-server(1)[Service] Typenotify ExecStart/usr/bin/…

MySQL数据库管理

一、sql语句&#xff1a; SQL语句用于维护管理数据库&#xff0c;包括数据查询、数据更新、访问控制、对象管理等功能。 二、SQL语言分类&#xff1a; DDL&#xff1a;数据定义语言&#xff0c;用于创建数据库对象&#xff0c;如库、表、索引等 DML&#xff1a;数据操纵语言&a…