Flink消费Kafka实时写入Doris

本文模拟实际生产环境,通过FileBeat采集日志信息到Kafka,再通过Flink消费Kafka实时写入Doris。

文章目录

    • Filebeat采集日志到Kafka
    • Flink消费Kafka实时写入Doris
    • 总结

在这里插入图片描述

Filebeat采集日志到Kafka

常见的日志采集工具有以下几种:Flume、Logstash和Filebeat

  • Flume采用Java编写,它是一个分布式、高度可靠且高度可用的工具,旨在高效地搜集、汇总和转移大量日志数据,该工具拥有一个简洁且灵活的流数据流架构,它配备了可调节的可靠性机制、故障切换以及恢复功能,此外,Flume通过简单且可扩展的数据模型支持在线分析应用程序。
  • Logstash是一个开源的日志管理和分析工具,它能够从多个数据源收集数据,对数据进行转换和清洗,并将处理后的数据传输到目标系统。
  • Filebeat是一款go语言编写的日志文件收集工具,当在服务器上部署其客户端后,它会持续监听特定的日志目录或日志文件,实时跟踪并读取这些文件的更新内容,并将这些数据发送到指定的输出目标,例如Elasticsearch或Kafka等。

这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低,相较于Flume和Logstash,Filebeat占用的内存最少,对CPU的负载也最小。它的运行进程十分稳定,很少出现崩溃或宕机的情况。

首先下载Filebeat

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz

在这里插入图片描述
解压缩文件

tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz

进入目录

cd filebeat-8.12.0-linux-x86_64

编写配置文件接入Kafka

vim filebeat.yaml

filebeat.yaml的文件内容

filebeat.inputs:
- type: logpaths:- /doc/input/*.log  # 更换为你的日志文件路径
processors:- include_fields:fields: ["message"]
output.kafka:# 更换为你的Kafka地址和主题.hosts: ["192.168.235.130:9092"]topic: k2ggcodec:format:string: '%{[message]}'

运行Filebeat采集日志

./filebeat -e -c ./filebeat.yaml

在这里插入图片描述

这是log日志的信息,现要求保持原始格式发送到Kafka
在这里插入图片描述Filebeat采集日志信息发送到Kafka的主题,消费者收到的信息如下,Filebeat会添加一些自带的数据,比如时间戳和元数据等,但是一般情况下只需要采集message里面的信息,通过filebeat.yaml中的processors和codec即可实现。
在这里插入图片描述processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
消费者消费原始格式的日志消息
在这里插入图片描述

Flink消费Kafka实时写入Doris

在写入之前,建立doris的数据表用于接收消费的信息

CREATE TABLE transactions (timestamp datetime,user_id INT,transaction_type VARCHAR(50),amount DECIMAL(15, 2),currency CHAR(3),status VARCHAR(20),description TEXT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num"="1");

引入依赖

   <dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>24.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency>

主程序

package flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;import java.util.Properties;public class DorisWrite {public static void main(String[] args) throws Exception {Properties props = new Properties();//Kafka broker的地址props.put("bootstrap.servers", "192.168.235.130:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//指定消费的主题FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("k2gg",new SimpleStringSchema(),props);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();//Doris的地址以及账号密码等信息dorisBuilder.setFenodes("192.168.235.130:8030").setTableIdentifier("test.transactions").setUsername("root").setPassword("1445413748");Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("read_json_by_line", "true");DorisExecutionOptions  executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris12"+System.currentTimeMillis()) //streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);// 将Kafka数据转换为JSON格式DataStream<String> jsonStream = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println("value"+value);// 分割字符串String[] parts = value.split(",");// 创建JSON字符串StringBuilder jsonString = new StringBuilder();jsonString.append("{");jsonString.append("\"timestamp\":\"").append(parts[0]).append("\",");jsonString.append("\"user_id\":").append(parts[1]).append(",");jsonString.append("\"transaction_type\":\"").append(parts[2]).append("\",");jsonString.append("\"amount\":").append(parts[3]).append(",");jsonString.append("\"currency\":\"").append(parts[4]).append("\",");jsonString.append("\"status\":\"").append(parts[5]).append("\",");jsonString.append("\"description\":\"").append(parts[6].replace("\"", "")).append("\"");jsonString.append("}");return jsonString.toString();}});jsonStream.print();jsonStream.sinkTo(builder.build());env.execute("flink kafka to doris by datastream");}
}

运行主程序通过Flink消费Kafka的信息写入doris
在这里插入图片描述log日志的信息
在这里插入图片描述
登录Doris进行验证

mysql -h k8s-master -P 9030 -uroot -p

这是没运行主程序之前doris的数据,没有2024-10-15这一天的数据。

select * from transactions where date(timestamp) = "2024-10-15";

在这里插入图片描述
运行主程序之后,Flink将Kafka主题的信息实时写入Doris。
在这里插入图片描述

总结

1.Filebeat格式问题
Filebeat采集日志格式会添加一些自带的额外信息,一般情况下只需要message里面的字段信息,那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
2.Flink消费Kafka失败
Flink在消费Kafka主题的过程中,不要往该主题发送其他格式的数据,否则会解析失败,尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败,可以尝试在setLabelPrefix添加一个时间戳,这样保证每次生成的标签前缀都不一样,这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作,Doris服务器会根据这个标签来跟踪导入的进度和状态,如果导入过程中出现问题,Doris会保留失败的数据,客户端就可以通过标签重新导入这些数据。
3.实时写入Doris失败
Flink处理字段的数据类型要与Doris匹配,可以参考官方文档Doris 和 Flink 列类型映射关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/56853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot+Vue+uniapp微信小程序的校园反诈骗微信小程序的详细设计和实现(源码+lw+部署文档+讲解等)

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…

Unity 从零开始搭建一套简单易用的UGUI小框架 功能撰写与优化篇

Unity 从零开始搭建一套简单易用的UGUI小框架 基础分析篇-CSDN博客 开始撰写 从基础分析篇我们得到了三个类&#xff0c;面板基类&#xff0c;管理类和面板子类 那就从面板基类开始&#xff0c;定义其基本行为 面板基类 基本方法都很简单&#xff0c;分别是首次加载并打开…

[含文档+PPT+源码等]精品基于springboot实现的原生微信小程序小区兼职系统

基于Spring Boot实现的原生微信小程序小区兼职系统背景&#xff0c;可以从以下几个方面进行阐述&#xff1a; 一、技术背景 移动互联网的普及&#xff1a;随着移动互联网的快速发展&#xff0c;微信小程序作为一种轻量级应用&#xff0c;因其无需下载安装、即用即走的特点&am…

SSD |(七)FTL详解(中)

文章目录 &#x1f4da;垃圾回收&#x1f407;垃圾回收原理&#x1f407;写放大&#x1f407;垃圾回收实现&#x1f407;垃圾回收时机 &#x1f4da;解除映射关系&#x1f4da;磨损均衡 &#x1f4da;垃圾回收 &#x1f407;垃圾回收原理 ✋设定一个迷你SSD空间&#xff1a; 假…

解决ImageIO无法读取部分JPEG格式图片问题

解决ImageIO无法读取部分JPEG格式图片问题 问题描述 我最近对在线聊天功能进行了一些内存优化&#xff0c;结果在回归测试时&#xff0c;突然发现有张图片总是发送失败。测试同事把问题转到我这儿来看&#xff0c;我仔细检查了一下&#xff0c;发现是上传文件的接口报错&#…

获取非加密邮件协议中的用户名和密码——安全风险演示

引言 在当今的数字时代,网络安全变得越来越重要。本文将演示如何通过抓包工具获取非加密邮件协议中的用户名和密码,以此说明使用非加密协议的潜在安全风险。通过这个演示,我们希望能提高读者的安全意识,促使大家采取更安全的通信方式。 注意: 本文仅用于教育目的,旨在提高安全…

Android开发蒙版引导操作功能

Android开发蒙版引导操作功能 复杂的功能&#xff0c;往往需要在上面加一层蒙版引导用户操作 一、思路&#xff1a; 堆积布局方式 二、效果图&#xff1a; 三、关键代码&#xff1a; <?xml version"1.0" encoding"utf-8"?> <FrameLayout x…

【大模型】AI视频课程制作工具开发

1. 需求信息 1.1 需求背景 讲师们在制作视频的过程中&#xff0c;发现录制课程比较麻烦&#xff0c;要保证环境安静&#xff0c;保证录制过程不出错&#xff0c;很容易反复重复录制&#xff0c;为了解决重复录制的工作量&#xff0c;想通过 ai 课程制作工具&#xff0c;来解决…

飞机大战告尾

参考 PPO算法逐行代码详解 链接 通过网盘分享的文件&#xff1a;PlaneWar 链接: https://pan.baidu.com/s/1cbLKTcBxL6Aem3WkyDtPzg?pwd1234 提取码: 1234 10.17关于博客发了又改这件事 悲催的事 今天训练了一早上ppo模型&#xff0c;满怀期待的检测成果时发现一点长进都…

【Linux】“echo $变量“ 命令打印变量值的底层原理

在 shell 中&#xff0c;echo $变量 命令的工作原理涉及几个关键步骤&#xff0c;主要是由 shell 解释器来处理变量的查找和替换。以下是详细的过程&#xff1a; 变量展开的过程顺序 变量引用&#xff1a; 在命令行中&#xff0c;变量通常以 $variable_name 或 ${variable_…

使用Shell脚本对Java应用等服务进行启停控制(支持批量)

通过shell脚本对Java服务启停进行控制。支持单个服务和多个服务的 start、stop、status、restart。支持自定义启动命令。(不限于Java服务,适用于各类通过命令行启动的服务) 脚本名称为 runjar.sh , 底部提供源码。通过三部分进行说明:操作说明、维护自定义服务列表、脚本源…

搭建`mongodb`副本集-开启权限认证 mongo:7.0.5

搭建mongodb副本集-开启权限认证 mongo:7.0.5 1.5.1、创建文件 创建配置文件保存目录和数据保存目录 mkdir -p /data/mongodb/{/conf,/data,/logs}生成和设置权限 这个文件一定要在一个服务里面生成然后复制到其它服务器&#xff0c;所有服务器的这个key一定是相同的。 op…

C语言 | Leetcode C语言题解之第480题滑动窗口中位数

题目&#xff1a; 题解&#xff1a; struct Heap {int* heap;int heapSize;int realSize;bool (*cmp)(int, int); };void init(struct Heap* obj, int n, bool (*cmp)(int, int)) {obj->heap malloc(sizeof(int) * (n 1));obj->heapSize 0;obj->cmp cmp; }bool c…

第二十二篇——菲欧几何:相对论的数学基础是什么?

目录 一、背景介绍二、思路&方案三、过程1.思维导图2.文章中经典的句子理解3.学习之后对于投资市场的理解4.通过这篇文章结合我知道的东西我能想到什么&#xff1f; 四、总结五、升华 一、背景介绍 对于几何的几个工具&#xff0c;让我再次感叹数学的伟大&#xff0c;逻辑…

国产大模型基础能力大比拼 - 计数:通义千文 vs 文心一言 vs 智谱 vs 讯飞-正经应用场景的 LLM 逻辑测试

在大语言模型&#xff08;LLM&#xff09;不断涌现的时代&#xff0c;如何评估这些国产大模型的逻辑推理能力&#xff0c;尤其是在处理基础计数问题上的表现&#xff0c;成为了一个备受关注的话题。随着越来越多的国产大模型进入市场&#xff0c;比较它们在不同任务中的表现尤为…

STM32笔记(1)GPIO之点亮LED

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 总结 第一步&#xff1a;先看原理图。PB0输出高电平是&#xff0c;LED1点亮。 初始化完成了两项工作&#xff1a; (1)从时钟上启动所用GPIO所在的总线&#xff1b…

Centos7 安装升级最新版Redis7.4.1

1. 前言 今天阿里云云盾检测出一个redis低版本的漏洞,需要升级到稳定高版本修复漏洞,升级过程遇到了一些坑,特记录分享给大家,原服务器默认yum源安装的gcc 是4.8.5 ,默认安装redis是 3.2.12(如下图): 2.升级GCC 升级新版redis需要更高级的gcc支持,这里我们就选择升级…

《计算机视觉》—— 换脸

效果如下&#xff1a; 完整代码&#xff1a; import cv2 import dlib import numpy as npJAW_POINTS list(range(0, 17)) RIGHT_BROW_POINTS list(range(17, 22)) LEFT_BROW_POINTS list(range(22, 27)) NOSE_POINTS list(range(27, 35)) RIGHT_EYE_POINTS list(range(36…

力扣 中等 82.删除排序链表中的重复元素 II

文章目录 题目介绍题解 题目介绍 题解 只需在83题基础上加一个while循环即可 class Solution {public ListNode deleteDuplicates(ListNode head) {ListNode dummy new ListNode(101, head);ListNode cur dummy;while (cur.next ! null && cur.next.next ! null) {…

【Linux】Anaconda下载安装配置Pytorch安装配置(保姆级)

目录 Anaconda下载 Anaconda安装 conda init conda --v Conda 配置 conda 环境创建 conda info --envs conda list Pytorch安装配置 检验安装情况 检验是否可以使用GPU Anaconda下载 可以通过两种途径完成Anaconda安装包的下载 途径一&#xff1a;本地windows下…