尚硅谷大数据项目《在线教育之采集系统》笔记002

视频地址:尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili

目录

P032

P033

P033

P034

P035

P036


P032

P033

# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

P033

2023-07-26 11:13:42,136 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2023-07-26 11:13:42,139 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:42,241 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2023-07-26 11:13:43,157 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:43,164 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.

[2023-07-26 11:03:06,989] INFO Opening socket connection to server node002/192.168.10.102:2181. (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,989] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,992] WARN Session 0x0 for sever node002/192.168.10.102:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)

flume生效!

node001
启动hadoop、zookeeper、kafka,再启动flume。[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf
Info: Sourcing environment configuration script /opt/module/flume/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/module/hadoop/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
...
[atguigu@node001 ~]$ jpsall
================ node001 ================
6368 NodeManager
5793 NameNode
2819 QuorumPeerMain
6598 JobHistoryServer
5960 DataNode
6681 Application
4955 Kafka
7532 Jps
================ node002 ================
4067 NodeManager
2341 Kafka
3942 ResourceManager
4586 ConsoleConsumer
5131 Jps
1950 QuorumPeerMain
3742 DataNode
================ node003 ================
3472 NodeManager
3235 DataNode
1959 QuorumPeerMain
3355 SecondaryNameNode
2347 Kafka
3679 Jps
[atguigu@node001 ~]$ 
[atguigu@node002 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_log
[atguigu@node001 ~]$ mock.sh
[atguigu@node001 ~]$ 

P034

# /opt/module/flume/flume-1.9.0/job# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
package com.atguigu.flume.interceptor;import com.atguigu.flume.interceptor.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 过滤掉脏数据(不完整的json)** @param event* @return*/@Overridepublic Event intercept(Event event) {//1、获取body当中的数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断数据是否为完整的jsonif (JSONUtil.isJSONValidate(log)) {return event;}return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
package com.atguigu.flume.interceptor.utils;import com.alibaba.fastjson.JSONObject;public class JSONUtil {public static boolean isJSONValidate(String log) {try {JSONObject.parseObject(log);return true;} catch (Exception e) {e.printStackTrace();return false;}}
}
[atguigu@node001 log]$ echo '{"id":1}' >> app.log
[atguigu@node001 log]$ echo '{"id": }' >> app.log
[atguigu@node001 log]$ echo '{"id":2}' >> app.log
[atguigu@node001 log]$ 

P035

#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"done
};;	
"stop"){for i in hadoop102 hadoop103doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "done};;
esac
#! /bin/bashcase $1 in
"start"){for i in node001 node002doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/flume-1.9.0/log1.txt 2>&1 &"done
};;	
"stop"){for i in node001 node002doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done
};;
esac
#! /bin/bashcase $1 in
"start") {echo " --------采集flume启动-------"ssh node001 "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume/flume-1.9.0/conf/ -f /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf >/dev /null 2>&1 &"
};;	
"stop") {echo " --------采集flume关闭-------"ssh node001 "ps -ef | grep file_to_kafka | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
};;
esac

P036

## 1、定义组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1## 2、配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.sources.r1.kafka.consumer.group.id = topic_log
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false## 3、配置channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/flume-1.9.0/data/behavior1/
a1.channels.c1.capacity = 1000000
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 3## 4、配置sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/log/edu_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#a1.sinks.k1.hdfs.rollInterval = 10
#a1.sinks.k1.hdfs.rollSize = 134217728
#a1.sinks.k1.hdfs.rollCount = 0## 5、组装 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/14816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务划分的原则

微服务的划分 微服务的划分要保证的原则 单一职责原则 1、耦合性也称块间联系。指软件系统结构中各模块间相互联系紧密程度的一种度量。模块之间联系越紧密&#xff0c;其耦合性就越强&#xff0c;模块的独立性则越差。模块间耦合高低取决于模块间接口的复杂性、调用的方式及…

hive删除数据进行恢复

在实际开发或生产中&#xff0c;hive表如果被误删&#xff0c;如被truncate或是分区表的分区被误删了&#xff0c;只要在回收站的清空周期内&#xff0c;是可以恢复数据的&#xff0c;步骤如下&#xff1a; &#xff08;1&#xff09; 先找到被删除数据的存放目录&#xff0c;…

Neo4j数据库中导入CSV示例数据

本文简要介绍Neo4j数据库以及如何从CSV文件中导入示例数据&#xff0c;方便我们快速学习测试图数据库。首先介绍简单数据模型以及基本图查询概念&#xff0c;然后通过LOAD CSV命令导入数据&#xff0c;生成节点和关系。 环境准备 读者可以快速安装Neo4j Desktop&#xff0c;启…

30天python速成-第一天(python简介及下载安装)

初识Python Python简介 Python的历史 1989年圣诞节:Guido von Rossum开始写Python语言的编译器。1991年2月:第一个Python编译器(同时也是解释器)诞生,它是用C语言实现的(后面),可以调用C语言的库函数。在最早的版本中,Python已经提供了对“类”,“函数”,“异常处…

1.2 网络安全法律法规

数据参考&#xff1a;CISP官方 目录 国家立法体系网络安全法解析网络安全相关法律 一、国家立法体系 1、我国的立法体系 我国的立法体系在网络空间治理中扮演着基础工作的角色。为了应对快速发展的网络技术和威胁&#xff0c;我国采取了多级立法机制来完善网络空间的法律…

【EI/SCOPUS会议征稿】2023年第四届新能源与电气科技国际学术研讨会 (ISNEET 2023)

作为全球科技创新大趋势的引领者&#xff0c;中国一直在为科技创新创造越来越开放的环境&#xff0c;提高学术合作的深度和广度&#xff0c;构建惠及全民的创新共同体。这些努力为全球化和创建共享未来的共同体做出了新的贡献。 为交流近年来国内外在新能源和电气技术领域的最新…

打卡力扣题目六

#左耳听风 ARST 打卡活动重启# 目录 一、问题 二、解题方法 三、解题方法二 四、两个方法的区别 关于 ARTS 的释义 —— 每周完成一个 ARTS&#xff1a; ● Algorithm: 每周至少做一个 LeetCode 的算法题 ● Review: 阅读并点评至少一篇英文技术文章 ● Tips: 学习至少一个技…

Mac 上使用 Tesseract OCR 识别图片文本

Tesseract OCR 引擎&#xff1a;Tesseract是一个开源的OCR引擎&#xff0c;你需要先安装它。可以从Tesseract官方网站&#xff08;https://github.com/tesseract-ocr/tesseract&#xff09;下载适用于你的操作系统的安装程序或源代码&#xff0c;并按照官方文档进行安装。 Tes…

【Postman】Postman接口测试进阶用法详解:断言、全局与环境变量、关联、批量执行用例、读取外部文件实现参数化

文章目录 一、Postman断言1、断言位置2、Postman的常用断言3、操作实例 二、全局变量与环境变量1、二者区分2、设置全局变量3、设置环境变量 三、Postman接口关联1、概念2、操作步骤 四、批量执行测试用例1、操作步骤2、查看结果 五、读取外部文件实现参数化1、使用场景2、操作…

el-select多个选择框位置错位、偏移

el-select多个选择框位置错位、偏移 一、解决办法 在el-select组件中&#xff0c;可能会由于option选项过多而导致下拉框位置错乱、偏移的情况&#xff0c;我个人试验大概是在5-6个option以上时&#xff0c;该bug就会出现。 一、解决办法 这个时候需要手动为下拉框设置: popper…

调查需求合理呈现业务人员想要的数据维度视图

开始着手做一个单据,首先想到的是业务人员最终想看到的单据数据的呈现样式,这是信息化的出发点和数据分析的基础。业务人员工作有时很多是重复,以一个他喜欢的方式将需要的数据方便的提供给它,无疑改善方便了他的工作。我们发现不管你是一个家庭,还是一个店铺或是一个公司…

Vite+Typescript+Vue3学习笔记

ViteTypescriptVue3学习笔记 1、项目搭建 1.1、创建项目(yarn) D:\WebstromProject>yarn create vite yarn create v1.22.19 [1/4] Resolving packages... [2/4] Fetching packages... [3/4] Linking dependencies... [4/4] Building fresh packages...success Installed…

2023-将jar包上传至阿里云maven私有仓库(云效制品仓库)

一、背景介绍 如果要将平时积累的代码工具jar包&#xff0c;上传至云端&#xff0c;方便团队大家一起使用&#xff0c;一般的方式就是上传到Maven中心仓库&#xff08;但是这种方式步骤多&#xff0c;麻烦&#xff0c;而且上传之后审核时间比较长&#xff0c;还不太容易通过&a…

Python语法(二、内置函数)

数学计算库 Python ​math ​模块提供了许多对浮点数的数学运算函数。 Python ​cmath ​模块包含了一些用于复数运算的函数。 import math 内置函数 关键字 自定义函数 Python函数的定义。定义函数需要用def 关键字实现&#xff0c;具体的语法格式如下&#xff1a; def 函…

CAN学习笔记3:STM32 CAN控制器介绍

STM32 CAN控制器 1 概述 STM32 CAN控制器&#xff08;bxCAN&#xff09;&#xff0c;支持CAN 2.0A 和 CAN 2.0B Active版本协议。CAN 2.0A 只能处理标准数据帧且扩展帧的内容会识别错误&#xff0c;而CAN 2.0B Active 可以处理标准数据帧和扩展数据帧。 2 bxCAN 特性 波特率…

Springboot实现Rsa非对称加密

依赖 <dependency><groupId>cn.dev33</groupId><artifactId>sa-token-spring-boot-starter</artifactId><version>1.30.0</version> </dependency>生成公钥和私钥 // 生成一对公钥和私钥&#xff0c;其中Map对象 (private私…

部署问题集合(十八)Windows环境下使用两个Tomcat

下载Tomcat Tomcat镜像下载地址&#xff1a;https://mirrors.cnnic.cn/apache/tomcat/进入如下地址&#xff1a;zip的是压缩版&#xff0c;exe是安装版 修改第二个Tomcat配置文件 第一步&#xff1a;编辑conf/server.xml文件&#xff0c;修改三个端口&#xff0c;有些版本改…

3d激光slam建图与定位(1)_基于ndt算法定位

一.代码实现流程 二.ndt算法原理 一.该算法定位有三个进程文件 1.map_loader.cpp用于点云地图的读取&#xff0c;从文件中读取点云后对这个点云地图进行旋转平移后发布点云地图到ros #include "map_loader.h"MapLoader::MapLoader(ros::NodeHandle &nh){std::st…

Form1单例模式与互斥锁

一、使用mutex来解决。 如何让窗体Form1也是一个单例模式呢&#xff1f; 在窗体项目中找到Program.cs&#xff0c;双击。找到入口点&#xff0c;更改如下&#xff1a; [STAThread]private static void Main(){string mutexName "MyapplicatonMutexApp1121";usin…

【嵌入式Linux项目】基于Linux的全志H616开发板智能家居项目(语音控制、人脸识别、安卓APP和PC端QT客户端远程操控)有视频功能展示

目录 一、功能需求 二、开发环境 1、硬件&#xff1a; 2、软件&#xff1a; 3、引脚分配&#xff1a; 三、关键点 1、设计模式之工厂模式 2、wiringPi库下的相关硬件操作函数调用 3、语音模块的串口通信 4、线程 5、摄像头的实时监控和拍照功能 6、人脸识别 四、编…