(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

前言

本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文

  • 启动Kafka集群,创建first主题

- 启动Kafka集群

- 创建first主题

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3

- 查看first主题详情

kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

  • 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务

 - 创建nc监听的flume任务:job-netcat-flume-kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 1111
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务

-  创建kafka监听的flume任务:job-kafka-flume-console.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf

- 启动job-kafka-flume-console.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console

  •  在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf

 - 启动job-netcat-flume-kafka.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console

  •  使用netcat工具发送数据到nc服务1111端口

- 发送nc消息

  • 查看结果 

- 控制台结果

结语

该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/78131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从零开发短视频电商 使用Spring WebClient发起远程Http调用

文章目录 依赖使用创建WebClient实例创建带有超时的WebClient实例示例 请求准备获取响应 高级过滤器自定义过滤器 自定义线程池自定义WebClient连接池开启日志错误处理最佳实践 示例异步请求同步请求上传文件重试过滤错误错误处理 参考 Spring WebClient 是 Spring WebFlux 项目…

基于Protege的知识建模实战

一.Protege简介、用途和特点 1.Protege简介 Protege是斯坦福大学医学院生物信息研究中心基于Java开发的本体编辑和本体开发工具,也是基于知识的编辑器,属于开放源代码软件。这个软件主要用于语义网中本体的构建,是语义网中本体构建的核心开发…

高阶导数的概念与公式

目录 高阶导数的概念 常用的高阶导数的公式 隐函数补充 反函数补充 高阶导数的概念 高阶导数是指一阶或二阶及以上的导数。这些导数可以通过连续进行一阶导数的计算来得到。然而,实际计算高阶导数时,存在一些问题,例如对抽象函数高阶导数…

遇见问题:使用mybaties向数据库中插入数据,idea显示插入成功,但是数据库中并没有数据变化?

遇见问题:使用mybaties向数据库中插入数据,idea显示插入成功,但是数据库中并没有数据变化? 可能的原因有几种: 没有提交事务:在使用 MyBatis 进行数据库操作时,需要手动提交事务。你可以在插入数据完成后…

北京映急物流有限公司 面试.net软件工程师岗位

请实现以下算法,语言不限,也可以是伪代码。 1.有一个数组 a[1000]存放了1000整数,这 1000 个数都大于等于 1,小于等于999,并且只有两个数是相同的,剩下的 998 个数均不相同。请写一个最优搜索算法,找出相同…

西门子S7-1200F或1500F系列安全PLC的组态步骤和基础编程(一)

西门子S7-1200F或1500F系列安全PLC的组态步骤和基础编程(一) 第一部分:组态配置 具体步骤可参考以下内容: 如下图所示,新建一个项目后,添加一个安全型PLC,这里以1516F-3 PN/DP为例进行说明, 如下图所示,添加CPU完成后,可以看到左侧的项目树中比普通的PLC多了几个选项…

Markdown和PlantUML的基本使用

首先需要在VS Code中安装Markdown extention和plantUML插件 测试标题 这是测试标题,使用一个#号配合标题 测试1级标题 这是1级测试标题,使用2个#号配合标题 测试2级标题 这是2级测试标题,使用3个#号配合标题 这里是多级列表 Part A S…

leetcode 232 用栈实现队列

请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(push、pop、peek、empty): 实现 MyQueue 类: void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除并返回元素int peek() 返回队列开头…

软件测试基础

什么是软件?程序,文档,服务,数据什么是软件测试?尽快尽早的发现中软件存在错误,贯穿整个软件生命周期的确定和验证的过程。项目流程:需求分析 概要设计 详细设计 编码 测试 验收项目的开发模型 瀑布模型 优点:规范了项目的流程 缺点:测试介入的太晚,…

【Docker】Docker简介

Docker简介 📋导航 1. Docker简介1.1 什么是Docker?1.2 什么是容器?1.3 容器的优势?1.4 Docker的优势?1.5 虚拟技术与容器技术Docker的区别?1.6 为什么学习Docker? 2. 安装Docker3. Docker架构4. Docker命…

Vue3样式绑定

文章目录 Vue3样式绑定1. class 属性绑定1.1 v-bind:class 设置一个对象,从而动态的切换 class1.2 在对象中传入更多属性用来动态切换多个 class1.3 直接绑定数据里的一个对象1.4 绑定一个返回对象的计算属性。这是一个常用且强大的模式1. 5 数据语法1.6 errorClass…

算法通关村第十五关——位运算在查找重复元素中的妙用

前言 大部分算法默认给定的数据量都比较小,只有几个或者几十个元素,但是如果将数据量提高到百万甚至几十亿,那么处理逻辑就会发生很大差异。在海量数据中,普通数据结构都无效了,因为内存空间放不下,常规的…

JAsper:专注于营销领域的AIGC

【产品介绍】 Jasper 由 Dave Rogenmoser(CEO)、Chris Hull(COO)和 John Phillip Morgan(CTO)在2021 年成立,是一款领先的 AI 营销工具以及写作助手。整个jasper官网都会强调自己对营销领域的理…

了解冒泡排序

package com.mypackage.array;import java.util.Arrays;public class Demo07 {public static void main(String[] args) {int[] a {3,2,6,7,4,5,6,34,56,7};int[] sort1 sort1(a); //调用我们自己写的排序方法后,返回一个排序后的数组System.out.println(Array…

Spring Boot 下载文件(word/excel等)文件名中文乱码问题|构建打包不存在模版文件(templates等)

Spring Boot 下载文件(word/excel等)文件名中文乱码问题|构建打包不存在模版文件(templates等) 准备文件,这里我放在resource下的templates路径 在pom中配置构建打包的资源,更新maven 如果使用了assembly打包插件这样配置可能仍不生效&#…

台积电的战略布局:“曲线”抢单 | 百能云芯

郭明錤最新的分析引发了广泛关注,他指出台积电采取了一系列重大战略投资举措,旨在争夺未来的半导体订单,尤其是来自苹果和英伟达的12纳米订单。这些战略举措包括认购英特尔手中的IMS Nanofabrication Global股权以及投资安谋(Arm&…

【C++学习笔记】野指针的定义与避免

1.野指针的定义 指向非法的内存地址指针叫作野指针(Wild Pointer),也叫悬挂指针(Dangling Pointer),意为无法正常使用的指针。 2.出现野指针的常见情形 2.1使用未初始化的指针 出现野指针最典型的情形就…

单个vue echarts页面

<template> <div ref"history" class"echarts"></div> </template> <script> export default{ data () { return {}; }, methods: { history(){ let myChart this.$echarts.init(this.$refs.history); // 绘制图表 myCha…

Linux:基础开发工具之yum,vim,gcc的使用

文章目录 yumvimgcc 本篇主要总结的是Linux下开发工具 yumvimgcc/g yum 什么是yum&#xff1f; 不管是在手机移动端还是pc端&#xff0c;不管是什么操作系统&#xff0c;当用户想要下载一些内容或者工具的时候&#xff0c;都需要到一个特定的位置进行下载&#xff0c;例如在…

点云从入门到精通技术详解100篇-从全局到局部的三维点云细节差异分析

目录 前言 国内外研究现状 细节差异分析相关研究 三维点云的相似性相关研究 存在的问题 三维点云对比的相关技术 2.1 三维点云的采集设备 2.2三维点云的存储格式 2.3三维点云的空间变换 2.4三维点云相似度分析 2.4.1点云特征的提取 2.4.2特征相似度计算 本文篇幅较长&#xff0…