大数据学习05-Kafka分布式集群部署

系统环境:centos7
软件版本:jdk1.8、zookeeper3.4.8、hadoop2.8.5
本次实验使用版本 kafka_2.12-3.0.0

一、安装

Kafka官网
在这里插入图片描述

将安装包上传至linux服务器上
在这里插入图片描述
解压

tar -zxvf kafka_2.12-3.0.0.tgz -C /home/local/

移动目录至kafka

mv kafka_2.12-3.0.0/ kafka

二、部署

配置Kafka环境

vi /etc/profile

添加如下配置

#kafka
export KAFKA_HOME=/home/local/kafka
export PATH=$PATH:${KAFKA_HOME}/bin

修改server.properties文件

vim /home/local/kafka/config/server.properties

修改参数如下:

broker.id=0
listeners=PLAINTEXT://192.168.245.200:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.245.200:2181,192.168.245.201:2181,192.168.245.202:2181

参数说明:
broker.id : 集群内全局唯一标识,每个节点上需要设置不同的值
listeners:这个IP地址也是与本机相关的,每个节点上设置为自己的IP地址
log.dirs :存放kafka消息的
zookeeper.connect : 配置的是zookeeper集群地址

分发kafka安装目录

for i in {1..2};do scp -r /home/local/kafka root@slave${i}:/home/local/;done

三、启动

进入kafka安装目录下

./bin/kafka-server-start.sh ./config/server.properties &

kafka相关命令


创建topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test显示所有topic
kafka-topics.sh --list --bootstrap-server localhost:9092产生消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test消费消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning删除topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

四、flink与kafka结合示例

首先 ,构建maven工程,加入flink与kafka的一些依赖:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>bigdata-kafka_2.12-3.0.0</artifactId><version>1.0-SNAPSHOT</version><name>bigdata-kafka_2.12-3.0.0</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink-version>1.14.0</flink-version><scala.binary.version>2.11.2</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies>
</project>

第一个,flink生产者示例代码:

package com.example;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.io.Serializable;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.245.200:9092");DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props));env.execute();}
}class SimpleStringGenerator implements SourceFunction<String>, Serializable {private static final long serialVersionUID = 1L;private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {String str = RandomStringUtils.randomAlphanumeric(5);ctx.collect(str);Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}

因为flink是生产者,需要启动一个kafka的消费者终端,然后运行本示例:
启动kafka

bin/kafka-server-start.sh config/server.properties &

启动一个kafka的消费者终端

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test

终端内容
在这里插入图片描述
第二个,flink消费者示例代码:

package com.example;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaConsumerApp {public static void main(String[] args) {try {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "master:9092");properties.setProperty("group.id", "flink");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));stream.map(new MapFunction<String, Object>() {@Overridepublic Object map(String value) throws Exception {return "flink: " + value;}}).print();env.execute("consumer");} catch (Exception e) {e.printStackTrace();}}
}

为了测试,我们先开启一个生产者,不断往kafka中发送消息。

 kafka-console-producer.sh --broker-list master:9092 --topic test

终端
在这里插入图片描述

控制台
在这里插入图片描述
打印结果符合预期,flink与kafka结合的示例就演示完成了,主要的还是熟悉flink编程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

护城河理论

护城河理论 护城河理论|来自股神巴菲特&#xff0c;是指投资的企业在某一方面的核心竞争力。 模型介绍 在2000年的伯克希尔哈撒韦的年会上&#xff0c;巴菲特说&#xff1a;让我们来把护城河作为一个伟大企业的首要标准&#xff0c;保持它的宽度&#xff0c;保持它不被跨越。我…

听GPT 讲K8s源代码--pkg(五)

在 Kubernetes 中&#xff0c;kubelet 是运行在每个节点上的主要组件之一&#xff0c;它负责管理节点上的容器&#xff0c;并与 Kubernetes 控制平面交互以确保容器在集群中按照期望的方式运行。kubelet 的代码位于 Kubernetes 代码库的 pkg/kubelet 目录下。 pkg/kubelet 目录…

数学建模-分类模型 Fisher线性判别分析

论文中1. 判别分析系数 2. 分类结果 多分类问题 勾选内容和上面一样

【C++】入门 --- 命名空间

文章目录 &#x1f36a;一、前言&#x1f369;1、C简介&#x1f369;2、C关键字 &#x1f36a;二、命名冲突&#x1f36a;三、命名空间&#x1f369;1、命名空间定义&#x1f369;2、命名空间的使用 &#x1f36a;四、C输入&输出 &#x1f36a;一、前言 本篇文章是《C 初阶…

Linux笔记——管道相关命令以及shell编程

文章目录 管道相关命令 目标 准备工作 1 cut 1.1 目标 1.2 路径 1.3 实现 2 sort 2.1 目标 2.2 路径 2.3 实现 第一步: 对字符串排序 第二步&#xff1a;去重排序 第三步: 对数值排序 默认按照字符串排序 升序 -n 倒序 -r 第四步: 对成绩排序【按照列排序】 …

ffmpeg中filter_query_formats函数解析

ffmpeg中filter_query_formats主要起一个pix fmt引用指定的功能。 下下结论&#xff1a; 先看几个结构体定义&#xff1a; //删除了一些与本次分析不必要的成员 struct AVFilterLink {AVFilterContext *src; ///< source filterAVFilterPad *srcpad; ///<…

PhpStudy靶场首页管理

PhpStudy靶场首页管理 一、源码一二、源码二三、源码三四、源码四 一、源码一 index.html <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>靶场访问首页</title><style>body {background-color: #f2f2f2;colo…

JavaDemo——使用jks的https

java使用https主要就是设置下sslContext&#xff0c;sslContext初始化需要密钥管理器和信任管理器&#xff0c;密钥管理器用于管理本地证书和私钥&#xff0c;信任管理器用于验证远程服务器的证书&#xff0c;这两种管理器都需要KeyStore初始化&#xff0c;两种管理器可以按需只…

Ubuntu 网络配置指导手册

一、前言 从Ubuntu 17.10 Artful开始&#xff0c;Netplan取代ifupdown成为默认的配置实用程序&#xff0c;网络管理改成 netplan 方式处理&#xff0c;不在再采用从/etc/network/interfaces 里固定 IP 的配置 &#xff0c;配置写在 /etc/netplan/01-network-manager-all.yaml 或…

【事业单位-语言理解1】中心理解02

【事业单位-语言理解1】中心理解02 1.中心理解1.1 并列关系1.2 主题词1.3程度词&#xff0c;表示强调 二、标题填入题&#xff08;优先考虑主题词&#xff09;三、词句理解题 1.中心理解 解题思路 1.1 并列关系 涉及时间顺序 注意选项不要逻辑不当 并列关系的时候&…

行云创新 CloudOS 助力上汽乘用车企业云原生IT架构变革

近日&#xff0c;在2023架构可持续未来峰会成都制造业分会场上&#xff0c;上海汽车集团股份有限公司乘用车公司基础架构部主管茹洋带来了议题为《云原生时代上汽乘用车企业IT架构变革和实践》的精彩演讲。他从云原生对于企业IT架构的意义、企业IT架构变革的必要性入手&#xf…

C程序环境及预处理

​​​​​文章目录 一、程序的翻译环境和执行环境 1.程序编译过程 2.编译内部原理 3.执行环境 二、程序运行前的预处理 1.预定义符号归纳 2.define定义标识符 3.define定义宏 4.define替换规则 5.宏和函数的对比 三、头文件被包含的方式 四、练习&#xff1a;写一…

Vue3状态管理库Pinia——核心概念(Store、State、Getter、Action)

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

Java将数据集合转换为PDF

这里写自定义目录标题 将数据集合转换为pdf引入包工具类测试代码导出效果 将数据集合转换为pdf 依赖itext7包将数据集合转换导出为PDF文件 引入包 <properties><itext.version>7.1.11</itext.version> </properties><dependency><groupId&…

什么是HTTP 500错误,怎么解决

目录 什么是HTTP 500 HTTP 500错误的常见原因&#xff1a; 如何修复HTTP 500 总结 什么是HTTP 500 错误 HTTP 500内部服务器错误是指在客户端发出请求后&#xff0c;服务器在处理请求过程中发生了未知的问题&#xff0c;导致服务器无法完成请求。HTTP 500错误是一个通用的服…

Spring-缓存初步认识

Spring-缓存 简单介绍 缓存是一种介于数据永久存储介质和数据应用之间的数据临时存储介质缓存有效提高读取速度&#xff0c;加速查询效率 spring使用缓存方式 添加依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring…

海康摄像头开发笔记(一):连接防爆摄像头、配置摄像头网段、设置rtsp码流、播放rtsp流、获取rtsp流、调优rtsp流播放延迟以及录像存储

文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/131679108 红胖子(红模仿)的博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬结…

每日一题——反转链表

题目 给定一个单链表的头结点pHead(该头节点是有值的&#xff0c;比如在下图&#xff0c;它的val是1)&#xff0c;长度为n&#xff0c;反转该链表后&#xff0c;返回新链表的表头。 数据范围&#xff1a;0≤n≤1000 要求&#xff1a;空间复杂度 O(1) &#xff0c;时间复杂度 O…

Python实战项目——旅游数据分析(四)

由于有之前的项目&#xff0c;所以今天我们直接开始&#xff0c;不做需求分析&#xff0c;还不会需求分析的可以看我之前的文章。Python实战项目——用户消费行为数据分析&#xff08;三&#xff09; 导入库 import numpy as np import pandas as pd import matplotlib.pyplo…

PHP后台登录功能单账号登录限制

PHP后台登录功能单账号登录限制 单账号登陆是什么第一步创建数据表第二步创建登录页面test2.html第三步创建登录提交test2.php第四步访问后台首页第五步演示 单账号登陆是什么 一个用户只能登录一个账号通常被称为单账号登录限制或单用户单账号限制。这意味着每个用户只能使用…