Kafka篇:Kafka搭建、使用、及Flink整合Kafka文档

一、Kafka搭建

1、上传并解压改名

tar -xvf kafka_2.11-1.0.0.tgz

mv kafka_2.11-1.0.0 kafka-1.0.0

2、配置环境变量

      vim /etc/profile

export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin  

     source /etc/profile (使环境变量生效)

3、修改配置文件

      vim config/server.properties

#将从节点的broker.id修改为1,2

broker.id=0       

#指定数据存放的位置,包含了Kafka集群中所有topic的分区数据 
log.dirs=/usr/local/soft/kafka-1.0.0/data         

#指定Kafka如何连接到其依赖的ZooKeeper集群
zookeeper.connect=master:2181,node1:2181,node2:2181/kafka       

4、将kafka文件同步到node1,node2

4.1  同步kafka文件

scp -r kafka-1.0.0/ node1:`pwd`
scp -r kafka-1.0.0/ node2:`pwd`

4.2  因为kafka不是主从架构的分布式组件,而是去中心化的架构,没有主从节点之分,所以也要将master中的而环境变量同步到node1和node2中

scp /etc/profile node1:/etc/
scp /etc/profile node2:/etc/

4.3  在ndoe1和node2中执行source

source /etc/profile

4.4  修改node1和node2中的broker.id为1,2

# node1
broker.id=1
# node2
broker.id=2

5、启动kafka

前提:已经安装了zookeeper,没安装可以参考以前的写的一个zookeeper的搭建文档

链接:http://t.csdnimg.cn/qpuKV

启动步骤:

5.1 因为kafka使用zookeeper保存元数据需要,所以需要先在三个节点上启动zookeeper(也是去中心化架构)

zkServer.sh start

5.2 查看zookeeper启动的状态

zkServer.sh status       #一个节点Mode:leader,剩余节点Mode: follower说明启动成功

5.3  启动kafka,每个节点中都要启动(去中心化的架构)

# -daemon后台启动

kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties

5.4  测试是否成功

#生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic 

二、Kafka的使用

1、创建topic

注意:在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic

--replication-factor     #每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量

--partition          #分区数, 根据数据量设置

--zookeeper      #zk的地址,将topic的元数据保存在zookeeper中 ​

kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic bigdata

删除topic
kafka-topics.sh --delete --topic  bigdata--zookeeper master:2181,node1:2181,node2:2181/kafka

 

2、查看topic描述信息

kafka-topics.sh --describe  --zookeeper master:2181,node1:2181,node2:2181/kafka --topic kfkuse

3、获取所有topic

kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka

__consumer_offsetsL是kafka自带的用于保存消费偏移量的topic

 

4、创建控制台生产者

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic kfkuse

5、创建控制台消费者

--from-beginning   从头消费,, 如果不在执行消费的新的数据

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

6、kafka数据保存的方式及相关事项

# 1、保存的文件
/usr/local/soft/kafka_2.11-1.0.0/data

# 2、每一个分区每一个副本对应一个目录

# 3、每一个分区目录中可以有多个文件, 文件时滚动生成的
00000000000000000000.log
00000000000000000001.log
00000000000000000002.log

# 4、滚动生成文件的策略
log.segment.bytes=1073741824           #达到1GB左右滚动生成一个文件
log.retention.check.interval.ms=300000      #每隔5分钟检查一次文件数据是否被清理

# 5、文件删除的策略,默认为7天,以文件为单位删除
log.retention.hours=168

三、Flink整合kafka

1、IDEA中整合

  1.1 根据自己的flink版本添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version>
</dependency>

  1.2 案例

    案例1:编写flink代码,使用flink从kafka中读数据

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 使用flink从kafka中读数据*/
public class Demo1KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/**内置的位点初始化器包括:*     // 从消费组提交的位点开始消费,不指定位点重置策略*     .setStartingOffsets(OffsetsInitializer.committedOffsets())*     // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点*     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))*     // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费*     .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))*     // 从最早位点开始消费*     .setStartingOffsets(OffsetsInitializer.earliest())*     // 从最末尾位点开始消费*     .setStartingOffsets(OffsetsInitializer.latest());*///创建kafka sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092,node1:9092,node2:9092")     //设置kafka集群列表.setTopics("hash_students")     //指定消费的topic.setStartingOffsets(OffsetsInitializer.earliest())     //从最早点开始消费.setValueOnlyDeserializer(new SimpleStringSchema())      //设置读取数据的编码格式utf-8.build();//使用kafka sourceDataStreamSource<String> studentsDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");studentsDS.print();env.execute();}
}

 

案例2:使用flink从kafka中读json数据,解析json格式的数据

使用阿里的fastjson工具,要先添加fastjson依赖

<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo3Json {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建kafka sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092,node1:9092,node2:9092")     //设置kafka集群列表.setTopics("cars")     //指定消费的topic.setGroupId("flink_group1")     //指定消费者组.setStartingOffsets(OffsetsInitializer.earliest())     //从最早尾位点开始消费.setValueOnlyDeserializer(new SimpleStringSchema())      //设置读取数据的编码格式utf-8.build();//使用kafka sourceDataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");//解析json格式的数据DataStream<car> cars = kafkaSource.map(line -> JSON.parseObject(line, car.class));cars.print();env.execute();}
}@Data
@AllArgsConstructor
@NoArgsConstructor
class car{private String car;private String city_code;private String county_code;private String card;private String camera_id;private String orientation;private Long road_id;private Long time;private Double speed;
}

 

案例3:将本地文件的数据生产(写入)到kafka中

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo2FIleToKafkaSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> carsDS = env.readTextFile("flink/data/cars_sample.json");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("cars")//指定topic.setValueSerializationSchema(new SimpleStringSchema())//指定数据格式.build())//指定数据处理的语义.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//使用kafka sinkcarsDS.sinkTo(sink);env.execute();}
}

 

2、集群中整合

将flink-sql-connector-kafka-1.15.2.jar包上传到Flink的lib目录下就行

注意:一些外部的依赖,比如说上述案例中使用的fastjson依赖也要上传,否则会报错或者会运行不成功。

至于在什么集群中运行,以及集群中运行的详细步骤,我在Flink系列三中以及详细写过,在此不再赘述了。

文章链接:http://t.csdnimg.cn/W1yZL

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/20552.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习学习

机器学习类型(按学习方式分):监督学习、半监督学习、无监督学习、强化学习; 通过已知标签训练集训练模型,使用模型及逆行预测、测试; 向量表示法,其中每一维对应一个特征(feature)或者称为属性,记为[x1,x2,...,xn] 特征值、特征、标签,共同完成训练集的数据填充,最…

Linux C/C++ gdb调试正在运行的程序

启动程序&#xff0c;在新窗口进行如下操作 进程编号 ps -ef | grep 程序名[rootlocalhost 04demo]# ps -ef | grep core_demo root 2467 1657 0 23:56 pts/0 00:00:00 ./core_demo root 2703 2540 0 23:57 pts/1 00:00:00 grep --colorauto core_dem…

MySQL8.0免安装及phpmyadmin配置

安装包解压&#xff0c;运行mysqld文件后&#xff0c;启动net start&#xff0c;提示成功&#xff0c;但进入phpmyadmin登录页面后&#xff0c;输入用户名&#xff0c;提示不支持空密码&#xff0c;config.default.php设置密码后&#xff0c;提示 mysqli::real_connect(): (HY…

FreeRTOS基础(六):中断管理

在嵌入式系统开发中&#xff0c;中断管理是一个至关重要的概念。它允许我们的系统能够及时响应外部事件&#xff0c;而不需要通过轮询的方式来浪费宝贵的处理器资源。FreeRTOS作为一款广泛应用的实时操作系统&#xff0c;它提供了灵活且高效的中断管理机制&#xff0c;可以帮助…

搭建基于Django的博客系统增加广告轮播图(三)

上一篇&#xff1a;ChatGPT搭建博客Django的web网页添加用户系统&#xff08;二&#xff09; 下一篇&#xff1a;搭建基于Django的博客系统数据库迁移从Sqlite3到MySQL&#xff08;四&#xff09; 功能概述 增加轮播图显示广告信息。 需求详细描述 1. 增加轮播图显示广告信…

STM32(九):USART串口通信 (标准库函数)

前言 上一篇文章已经介绍了如何用STM32单片机中独立看门狗来实现检测按键点灯的程序。这篇文章我们来介绍一下如何用STM32单片机中USART通信协议来串口通信&#xff0c;并向XCOM发送信息。 一、实验原理 1.通信的介绍 首先&#xff0c;我们先介绍一下通信&#xff0c;何为通…

嵌入式linux系统中图片处理详解

大家好,今天给大家分享一下,嵌入式中如何进行图像处理,常见的处理方式有哪几种?这次将详细分析一下 第一:BMP图形处理方式 图形的基本特点,所有的图像文件,都是一种二进制格式文件,每一个图像文件,都可以通过解析文件中的每一组二进制数的含义来获得文件中的各种信息…

DataCube 漏洞小结

在这里分享一下通过拖取 DataCube 代码审计后发现的一些漏洞&#xff0c;包括前台的文件上传&#xff0c;信息泄露出账号密码&#xff0c;后台的文件上传。当然还有部分 SQL 注入漏洞&#xff0c;因为 DataCube 采用的是 SQLite 的数据库&#xff0c;所以SQL 注入相对来说显得就…

海外高清短视频:四川京之华锦信息技术公司

海外高清短视频&#xff1a;探索世界的新窗口 在数字化时代的浪潮下&#xff0c;海外高清短视频成为了人们探索世界、了解异国风情的新窗口。四川京之华锦信息技术公司这些短视频以其独特的视角、丰富的内容和高清的画质&#xff0c;吸引了无数观众的目光&#xff0c;让人们足…

关于前端代码移动端的适配方案

为什么需要适配&#xff1f; 由于PC端和移动端的分辨率不同&#xff0c;前端展示的页面在两端设备如果原模原样的搬运则会导致PC端或移动端看到的画面相对于其设备的分辨率及其的不合理。 最为常见的是PC端正常浏览的网页没有做移动端适配&#xff0c;由于移动端分辨率普遍低于…

计算机基础之:平均负载与CPU使用率的关系

想象一下&#xff0c;你的厨房是一个操作系统&#xff0c;厨师是CPU&#xff0c;而菜谱上的任务就是进程。厨房的忙碌程度可以用“平均负载”来衡量&#xff0c;它反映了等待被处理的任务总数加上正在被厨师处理的任务数。而“CPU使用率”则相当于厨师实际在切菜、炒菜的时间比…

ChaosBlade混沌测试实践

ChaosBlade: 一个简单易用且功能强大的混沌实验实施工具 官方仓库&#xff1a;https://github.com/chaosblade-io/chaosblade 1. 项目介绍 ChaosBlade 是阿里巴巴开源的一款遵循混沌工程原理和混沌实验模型的实验注入工具&#xff0c;帮助企业提升分布式系统的容错能力&…

【CVE-2021-3156】——漏洞复现、原理分析以及漏洞修复

文章目录 前言1、漏洞概述2、漏洞复现2.1、漏洞复现测试环境2.2、漏洞复现具体步骤 3、漏洞原理3.1、前置知识3.1.1、sudo3.1.2、sudoedit3.1.3、转义字符 3.2、漏洞分析 4、漏洞修复5、参考文献总结 前言 2021年01月27日&#xff0c;RedHat官方发布了Sudo缓冲区/栈溢出漏洞的风…

基于SSM前后端分离版本的论坛系统-自动化测试

目录 前言 一、测试环境 二、环境部署 三、测试用例 四、执行测试 4.1、公共类设计 创建浏览器驱动对象 测试套件 释放驱动类 4.2、功能测试 注册页面 登录页面 版块 帖子 用户个人中心页 站内信 4.3、界面测试 注册页面 登录页面 版块 帖子 用户个人中心页…

浮点数二分查找的实现

这是C算法基础-基础算法专栏的第六篇文章&#xff0c;专栏详情请见此处。 引入 上次我们学习了整数二分查找的实现&#xff0c;这次我们要学习浮点数二分查找的实现。 定义 浮点数二分查找与整数二分查找的定义都是大致相同的&#xff0c;如果想了解具体内容&#xff0c;可以移…

【Qt秘籍】[005]-Qt的首次邂逅-创建

一、如何创建文件&#xff1f; 当我们打开Qt Creator&#xff0c;你会发现整个界面类目繁多。现在&#xff0c;让我们直接开始新建一个项目。 1.点击左上角的“文件”>点击“新建文件或项目” 2.如图&#xff0c;选择“Application”>“Qt Wifgets application”> “…

机器学习笔记——SVM丝滑推导及代码实现,从硬间隔到软间隔再到核函数

前言 开始搓延期了一百年的机器学习实验&#xff0c;把SVM推导过程从定义到求解都刷了一遍&#xff0c;包括推导优化目标、提出对偶问题、利用KKT条件得到原问题的解以及SMO算法等。 注意&#xff1a;本文在某些地方比如KKT条件和SMO算法处不提供证明过程(太麻烦了喵)&#x…

奇偶校验位

描述 题目描述&#xff1a; 现在需要对输入的32位数据进行奇偶校验,根据sel输出校验结果&#xff08;1输出奇校验&#xff0c;0输出偶校验&#xff09; 信号示意图&#xff1a; 波形示意图&#xff1a; 输入描述&#xff1a; 输入信号 bus sel 类型 wi…

rust安装

目录 一、安装1.1 在Windows上安装1.2 在Linux下安装 二、包管理工具三、Hello World3.1 安装IDE3.2 输出Hello World 一、安装 1.1 在Windows上安装 点击页面 安装 Rust - Rust 程序设计语言 (rust-lang.org)&#xff0c;选择"下载RUSTUP-INIT.EXE(64位&#xff09;&qu…

2021JSP普及组第三题:插入排序

2021JSP普及组第三题 题目&#xff1a; 思路&#xff1a; 题目要求排序后根据操作进行对应操作。 操作一需要显示某位置数据排序后的位置&#xff0c;所以需要定义结构体数组储存原数据的位置和数据本身排序后所得数据要根据原位置输出排序后的位置&#xff0c;所以建立一个新…