大数据学习05-Kafka分布式集群部署

系统环境:centos7
软件版本:jdk1.8、zookeeper3.4.8、hadoop2.8.5
本次实验使用版本 kafka_2.12-3.0.0

一、安装

Kafka官网
在这里插入图片描述

将安装包上传至linux服务器上
在这里插入图片描述
解压

tar -zxvf kafka_2.12-3.0.0.tgz -C /home/local/

移动目录至kafka

mv kafka_2.12-3.0.0/ kafka

二、部署

配置Kafka环境

vi /etc/profile

添加如下配置

#kafka
export KAFKA_HOME=/home/local/kafka
export PATH=$PATH:${KAFKA_HOME}/bin

修改server.properties文件

vim /home/local/kafka/config/server.properties

修改参数如下:

broker.id=0
listeners=PLAINTEXT://192.168.245.200:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.245.200:2181,192.168.245.201:2181,192.168.245.202:2181

参数说明:
broker.id : 集群内全局唯一标识,每个节点上需要设置不同的值
listeners:这个IP地址也是与本机相关的,每个节点上设置为自己的IP地址
log.dirs :存放kafka消息的
zookeeper.connect : 配置的是zookeeper集群地址

分发kafka安装目录

for i in {1..2};do scp -r /home/local/kafka root@slave${i}:/home/local/;done

三、启动

进入kafka安装目录下

./bin/kafka-server-start.sh ./config/server.properties &

kafka相关命令


创建topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test显示所有topic
kafka-topics.sh --list --bootstrap-server localhost:9092产生消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test消费消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning删除topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

四、flink与kafka结合示例

首先 ,构建maven工程,加入flink与kafka的一些依赖:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>bigdata-kafka_2.12-3.0.0</artifactId><version>1.0-SNAPSHOT</version><name>bigdata-kafka_2.12-3.0.0</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink-version>1.14.0</flink-version><scala.binary.version>2.11.2</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies>
</project>

第一个,flink生产者示例代码:

package com.example;import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.io.Serializable;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.245.200:9092");DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props));env.execute();}
}class SimpleStringGenerator implements SourceFunction<String>, Serializable {private static final long serialVersionUID = 1L;private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {String str = RandomStringUtils.randomAlphanumeric(5);ctx.collect(str);Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}

因为flink是生产者,需要启动一个kafka的消费者终端,然后运行本示例:
启动kafka

bin/kafka-server-start.sh config/server.properties &

启动一个kafka的消费者终端

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test

终端内容
在这里插入图片描述
第二个,flink消费者示例代码:

package com.example;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaConsumerApp {public static void main(String[] args) {try {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "master:9092");properties.setProperty("group.id", "flink");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));stream.map(new MapFunction<String, Object>() {@Overridepublic Object map(String value) throws Exception {return "flink: " + value;}}).print();env.execute("consumer");} catch (Exception e) {e.printStackTrace();}}
}

为了测试,我们先开启一个生产者,不断往kafka中发送消息。

 kafka-console-producer.sh --broker-list master:9092 --topic test

终端
在这里插入图片描述

控制台
在这里插入图片描述
打印结果符合预期,flink与kafka结合的示例就演示完成了,主要的还是熟悉flink编程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

护城河理论

护城河理论 护城河理论|来自股神巴菲特&#xff0c;是指投资的企业在某一方面的核心竞争力。 模型介绍 在2000年的伯克希尔哈撒韦的年会上&#xff0c;巴菲特说&#xff1a;让我们来把护城河作为一个伟大企业的首要标准&#xff0c;保持它的宽度&#xff0c;保持它不被跨越。我…

听GPT 讲K8s源代码--pkg(五)

在 Kubernetes 中&#xff0c;kubelet 是运行在每个节点上的主要组件之一&#xff0c;它负责管理节点上的容器&#xff0c;并与 Kubernetes 控制平面交互以确保容器在集群中按照期望的方式运行。kubelet 的代码位于 Kubernetes 代码库的 pkg/kubelet 目录下。 pkg/kubelet 目录…

算法-快速排序-java

下面是使用Java实现快速排序的一种方法&#xff1a; public class QuickSort {public static void quickSort(int[] arr, int low, int high) {if (low < high) {// 将数组划分为两个子数组&#xff0c;并获取划分点的索引int pivotIndex partition(arr, low, high);// 递…

数学建模-分类模型 Fisher线性判别分析

论文中1. 判别分析系数 2. 分类结果 多分类问题 勾选内容和上面一样

【C++】入门 --- 命名空间

文章目录 &#x1f36a;一、前言&#x1f369;1、C简介&#x1f369;2、C关键字 &#x1f36a;二、命名冲突&#x1f36a;三、命名空间&#x1f369;1、命名空间定义&#x1f369;2、命名空间的使用 &#x1f36a;四、C输入&输出 &#x1f36a;一、前言 本篇文章是《C 初阶…

机器人中常见的定位技术

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 TODO:写完再整理 文章目录 系列文章目录前言机器人中常见的定位方案惯性导航卫星导航组合导航UWB定位2D激光SLAM3D激光SLAM二维码定位视觉SLAM前言 认知有限,望大家多多包涵,有什么问题也希…

var、let和const的区别

先简单了解一下 var声明的变量会挂载在window上&#xff0c;而let和const声明的变量不会&#xff1a; var a 100; console.log(a,window.a); // 100 100let b 10; console.log(b,window.b); // 10 undefinedconst c 1; console.log(c,window.c); // 1 undefined v…

Linux笔记——管道相关命令以及shell编程

文章目录 管道相关命令 目标 准备工作 1 cut 1.1 目标 1.2 路径 1.3 实现 2 sort 2.1 目标 2.2 路径 2.3 实现 第一步: 对字符串排序 第二步&#xff1a;去重排序 第三步: 对数值排序 默认按照字符串排序 升序 -n 倒序 -r 第四步: 对成绩排序【按照列排序】 …

ffmpeg中filter_query_formats函数解析

ffmpeg中filter_query_formats主要起一个pix fmt引用指定的功能。 下下结论&#xff1a; 先看几个结构体定义&#xff1a; //删除了一些与本次分析不必要的成员 struct AVFilterLink {AVFilterContext *src; ///< source filterAVFilterPad *srcpad; ///<…

ES6-day03

ES6-类-Symbol 1.类 在javascript语言中&#xff0c;生成实例对象使用构造函数&#xff1b;ES6提供了类Class这个概念&#xff0c;作为对象的模板。定义一个类通过class关键字&#xff0c;ES6的类可以看成是构造函数的另一种写法。 ES5 如何继承 实例使用属性和方法1.从实例对象…

PhpStudy靶场首页管理

PhpStudy靶场首页管理 一、源码一二、源码二三、源码三四、源码四 一、源码一 index.html <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>靶场访问首页</title><style>body {background-color: #f2f2f2;colo…

《数据结构》栈,队列,双向链表

目录 栈 栈概念 顺序栈 链式栈&#xff08;链表实现&#xff09; 顺序栈和链式栈的区别是什么&#xff1f; 队列 队列概念 顺序队列 链式队列 双向链表 栈 栈概念 什么是栈&#xff1f; 只能在一端进行插入和删除数据的线性表(称为栈)&#xff0c;把能进行插入和删…

JavaDemo——使用jks的https

java使用https主要就是设置下sslContext&#xff0c;sslContext初始化需要密钥管理器和信任管理器&#xff0c;密钥管理器用于管理本地证书和私钥&#xff0c;信任管理器用于验证远程服务器的证书&#xff0c;这两种管理器都需要KeyStore初始化&#xff0c;两种管理器可以按需只…

Ubuntu 网络配置指导手册

一、前言 从Ubuntu 17.10 Artful开始&#xff0c;Netplan取代ifupdown成为默认的配置实用程序&#xff0c;网络管理改成 netplan 方式处理&#xff0c;不在再采用从/etc/network/interfaces 里固定 IP 的配置 &#xff0c;配置写在 /etc/netplan/01-network-manager-all.yaml 或…

【事业单位-语言理解1】中心理解02

【事业单位-语言理解1】中心理解02 1.中心理解1.1 并列关系1.2 主题词1.3程度词&#xff0c;表示强调 二、标题填入题&#xff08;优先考虑主题词&#xff09;三、词句理解题 1.中心理解 解题思路 1.1 并列关系 涉及时间顺序 注意选项不要逻辑不当 并列关系的时候&…

行云创新 CloudOS 助力上汽乘用车企业云原生IT架构变革

近日&#xff0c;在2023架构可持续未来峰会成都制造业分会场上&#xff0c;上海汽车集团股份有限公司乘用车公司基础架构部主管茹洋带来了议题为《云原生时代上汽乘用车企业IT架构变革和实践》的精彩演讲。他从云原生对于企业IT架构的意义、企业IT架构变革的必要性入手&#xf…

C程序环境及预处理

​​​​​文章目录 一、程序的翻译环境和执行环境 1.程序编译过程 2.编译内部原理 3.执行环境 二、程序运行前的预处理 1.预定义符号归纳 2.define定义标识符 3.define定义宏 4.define替换规则 5.宏和函数的对比 三、头文件被包含的方式 四、练习&#xff1a;写一…

Vue3状态管理库Pinia——核心概念(Store、State、Getter、Action)

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

98、简述Kafka的rebalance机制

简述Kafka的rebalance机制 consumer group中的消费者与topic下的partion重新匹配的过程 何时会产生rebalance: consumer group中的成员个数发生变化consumer 消费超时group订阅的topic个数发生变化group订阅的topic的分区数发生变化 coordinator: 通常是partition的leader节…

408专业课

1.快速排序 8.3_2_快速排序_哔哩哔哩_bilibili 优化后&#xff1a;时间复杂度O(nlogn) &#xff0c;空间复杂度O(logn) //优化后的快排 void Quick_sort(int a[], int l, int r) {if (l > r) return;把a数组中随机一个元素和a[l]交换 //快排优化int p a[l], i …