构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个强大的实时数据处理流水线。

什么是 Flink、Kafka、CnosDB

  • Flink:是一个强大的流式处理引擎,它支持事件驱动、分布式、并且容错。Flink能够处理高吞吐量和低延迟的实时数据流,适用于多种应用场景,如数据分析、实时报表和推荐系统等。
  • Kafka:是一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka具有良好的持久性、可扩展性和容错性,适用于构建实时数据流的可靠管道。
  • CnosDB:是一个专为时序数据设计的开源时序数据库。它具有高性能、高可用性和易用性的特性,非常适合存储实时生成的时间序列数据,如传感器数据、日志和监控数据等。

场景描述

用例中假设有一个物联网设备网络,每个设备都定期生成传感器数据,包括温度、湿度和压力等。我们希望能够实时地收集、处理和存储这些数据,以便进行实时监控和分析。

数据流向架构图如下:

  1. 首先,我们需要设置一个数据收集器来获取传感器数据,并将数据发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。
  2. 使用 Flink来实时处理传感器数据。首先,需要编写一个Flink应用程序,该应用程序订阅 Kafka 主题中的数据流,并对数据进行实时处理和转换。例如,您可以计算温度的平均值、湿度的最大值等。
  3. 将处理后的数据存储到 CnosDB 中以供后续查询。为了实现这一步,需要配置一个CnosDB Sink,使得Flink应用程序可以将处理后的数据写入 CnosDB 中。

构建流水线

1.数据采集与传输

编写一个生产者应用程序,读取传感器数据并将其发送到 Kafka 主题。

public class SensorDataProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);while (true) {SensorData data = generateSensorData(); // 生成传感器数据producer.send(new ProducerRecord<>("sensor-data-topic", data));Thread.sleep(1000); // 每秒发送一次数据}}
}

2.实时处理与转换

编写一个 Flink 应用程序,订阅 Kafka 主题中的数据流,实时处理并转换数据。

// Flink 应用程序示例
public class SensorDataProcessingJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");props.setProperty("group.id", "sensor-data-consumer-group");DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));DataStream<ProcessedData> processedData = sensorData.map(json -> parseJson(json)) // 解析JSON数据.keyBy(ProcessedData::getDeviceId).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口.apply(new SensorDataProcessor()); // 自定义处理逻辑processedData.print(); // 打印处理后的数据,可以替换为写入 CnosDB 操作env.execute("SensorDataProcessingJob");}
}

3.数据写入与存储

配置CnosDB Sink,将 processedData.print() 替换为写入 CnosDB 的程序在 CnosDB 创建一个存储数据时长为 30 天的数据库:

| CnosDB 建库语法说明请查看:创建数据库[https://docs.cnosdb.com/zh/latest/reference/sql.html#创建数据库]

CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REPLICA 2;

在 Maven [https://maven.apache.org/]中引入 CnosBD Sink [https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html]包:

<dependency><groupId>com.cnosdb</groupId><artifactId>flink-connector-cnosdb</artifactId><version>1.0</version>
</dependency>

编写程序:

public class WriteToCnosDBJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");props.setProperty("group.id", "sensor-data-consumer-group");DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));DataStream<ProcessedData> processedData = sensorData.map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // 解析JSON数据.keyBy(ProcessedData::getDeviceId).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口.apply(new SensorDataProcessor()); // 自定义处理逻辑DataStream<CnosDBPoint> cnosDBDataStream = processedData.map(new RichMapFunction<ProcessedData, CnosDBPoint>() {@Overridepublic CnosDBPoint map(String s) throws Exception {return new CnosDBPoint("sensor_metric").time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS).tag("device_id", value.getDeviceId()).field("average_temperature", value.getAverageTemperature()).field("max_humidity", value.getMaxHumidity());}});CnosDBConfig cnosDBConfig = CnosDBConfig.builder().url("http://localhost:8902").database("db_flink_test").username("root").password("").build();cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig));env.execute("WriteToCnosDBJob");}
}

运行后查看结果:

db_flink_test ❯ select * from sensor_metric limit 10;
+---------------------+---------------+---------------------+--------------+
| time                | device_id     | average_temperature | max_humidity |
+---------------------+---------------+---------------------+--------------+
| 2023-01-14T17:00:00 | OceanSensor1  | 23.5                | 79.0         |
| 2023-01-14T17:05:00 | OceanSensor2  | 21.8                | 68.0         |
| 2023-01-14T17:10:00 | OceanSensor1  | 25.2                | 75.0         |
| 2023-01-14T17:15:00 | OceanSensor3  | 24.1                | 82.0         |
| 2023-01-14T17:20:00 | OceanSensor2  | 22.7                | 71.0         |
| 2023-01-14T17:25:00 | OceanSensor1  | 24.8                | 78.0         |
| 2023-01-14T17:30:00 | OceanSensor3  | 23.6                | 80.0         |
| 2023-01-14T17:35:00 | OceanSensor4  | 22.3                | 67.0         |
| 2023-01-14T17:40:00 | OceanSensor2  | 25.9                | 76.0         |
| 2023-01-14T17:45:00 | OceanSensor4  | 23.4                | 70.0         |
+---------------------+---------------+---------------------+--------------+

总结

通过结合Flink、Kafka 和 CnosDB,您可以构建一个强大的实时数据处理流水线,从数据采集到实时处理再到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的特性和操作。这种架构适用于各种实时数据应用,如物联网监控、实时报表和仪表板等。根据您的需求和情境,调整配置和代码,以构建适合您业务的实时数据处理解决方案。

CnosDB简介

CnosDB是一款高性能、高易用性的开源分布式时序数据库,现已正式发布及全部开源。

欢迎关注我们的社区网站:https://cn.cnosdb.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/75121.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java的动态代理如何实现

一. JdkProxy jdkproxy动态代理必须基于接口(interface)实现 接口UserInterface.java public interface UserService {String getUserName(String userCde); }原始实现类&#xff1a;UseServiceImpl.java public class UserServiceImpl implements UserSerice {Overridepub…

举例说明PyTorch函数torch.cat与torch.stack的区别

一、torch.cat与torch.stack的区别 torch.cat用于在给定的维度上连接多个张量&#xff0c;它将这些张量沿着指定维度堆叠在一起。 torch.stack用于在新的维度上堆叠多个张量&#xff0c;它会创建一个新的维度&#xff0c;并将这些张量沿着这个新维度堆叠在一起。 二、torch.…

原生js之dom表单改变和鼠标常用事件

那么好,本次我们聊聊表单改变时如何利用onchange方法来触发input改变事件以及鼠标常用的滑入滑出,点击down和点击up事件. 关于onchange方法 onchange方法在鼠标输入完后点击任何非输入框位置时触发.触发时即可改变原有输入框的值. out 、leave、over、down、up鼠标方法 当用…

React refers to UMD global, but the current file is a module vite初始化react项目

vite搭建react项目 初始化项目 npm create vite 在执行完上面的命令后&#xff0c;npm 首先会自动下载create-vite这个第三方包&#xff0c;然后执行这个包中的项目初始化逻辑。输入项目名称之后按下回车&#xff0c;此时需要选择构建的前端框架&#xff1a; ✔ Project na…

932. 漂亮数组

932. 漂亮数组 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 932. 漂亮数组 https://leetcode.cn/problems/beautiful-array/description/ 完成情况&#xff1a; 解题思路&#xff1a; nums 是由范围 [1, n] 的…

jmeter如何压测和存储

一、存储过程准备&#xff1a; 1、建立一个空表&#xff1a; 1 CREATE TABLE test_data ( id NUMBER, name VARCHAR2(50), age NUMBER ); 2、建立一个存储过程&#xff1a; CREATE OR REPLACE PROCEDURE insert_test_data(n IN NUMBER) ASBEGIN--EXECUTE IMMEDIATE trunca…

4个维度讲透ChatGPT技术原理,揭开ChatGPT神秘技术黑盒!(文末送书)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

浏览器进程,性能指标,性能优化

目录 浏览器进程&#xff1a;多进程 主进程&#xff1a;显示、交互&#xff0c;增删进程 UI进程&#xff1a;控制地址栏、书签、前进后退 存储进程&#xff1a;cookie&#xff0c;webstorage&#xff0c;indexDB 渲染进程&#xff1a;每个标签页或窗口都有一个独立的渲染进…

【深度学习】分类损失函数解析

【深度学习】分类相关的损失解析 文章目录 【深度学习】分类相关的损失解析1. 介绍2. 解析3. 代码示例 1. 介绍 在分类任务中&#xff0c;我们通常使用各种损失函数来衡量模型输出与真实标签之间的差异。有时候搞不清楚用什么&#xff0c;下面是几种常见的分类相关损失函数及其…

Boost搜索引擎

项目背景 先说一下什么是搜索引擎,很简单,就是我们平常使用的百度,我们把自己想要所有的内容输入进去,百度给我们返回相关的内容.百度一般给我们返回哪些内容呢?这里很简单,我们先来看一下. 搜索引擎基本原理 这里我们简单的说一下我们的搜索引擎的基本原理. 我们给服务器发…

Fast RCNN

【简介】 Fast RCNN[6]网络是RCNN和SPPNet的改进版&#xff0c;该网路使得我们可以在相同的网络配置下同时训练一个检测器和边框回归器。该网络首先输入图像&#xff0c;图像被传递到CNN中提取特征&#xff0c;并返回感兴趣的区域ROI&#xff0c;之后再ROI上运用ROI池化层以保证…

如何用Jmeter提取和引用Token

1.执行获取token接口 在结果树这里&#xff0c;使用$符号提取token值。 $根节点&#xff0c;$.data.token表示提取根节点下的data节点下的token节点的值。 2.使用json提取器&#xff0c;提取token 变量路径就是把在结果树提取的路径写上。 3.使用BeanShell取样器或者BeanShell后…

浅谈泛在电力物联网、能源互联网与虚拟电厂

导读&#xff1a;从能源互联网推进受阻&#xff0c;到泛在电力物联网名噪一时&#xff0c;到虚拟电厂再次走向火爆&#xff0c;能源领域亟需更进一步的数智化发展。如今&#xff0c;随着新型电力系统建设推进&#xff0c;虚拟电厂有望迎来快速发展。除了国网和南网公司下属的电…

常见排序算法

排序简介常见排序算法插入排序直接插入排序希尔排序 选择排序选择排序堆排序 交换排序冒泡排序快速排序hoare版挖坑法前后指针法非递归实现快排优化 归并排序非递归实现归并排序海量数据排序问题 基数排序&#xff08;不用比较就能够排序&#xff09;桶排序计数排序&#xff08…

常用消息中间件有哪些

RocketMQ 阿里开源&#xff0c;阿里参照kafka设计的&#xff0c;Java实现 能够保证严格的消息顺序 提供针对消息的过滤功能 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力 RabbitMQ Erlang实现&#xff0c;非常重量级&#xff0c;更适…

一百七十三、Flume——Flume写入HDFS后的诸多小文件问题

一、目的 在用Flume采集Kafka中的数据写入HDFS后&#xff0c;发现写入HDFS的不是每天一个文件&#xff0c;而是一个文件夹&#xff0c;里面有很多小文件&#xff0c;浪费namenode的宝贵资源 二、Flume的配置文件优化&#xff08;参考了其他博文&#xff09; &#xff08;一&a…

OpenCV(二十四):可分离滤波

目录 1.可分离滤波的原理 2.可分离滤波函数sepFilter2D() 3.示例代码 1.可分离滤波的原理 可分离滤波的原理基于滤波器的可分离性。对于一个二维滤波器&#xff0c;如果它可以表示为水平方向和垂直方向两个一维滤波器的卷积&#xff0c;那么它就是可分离的。也就是说&#x…

无涯教程-JavaScript - DDB函数

描述 DDB函数使用双倍余额递减法或您指定的某些其他方法返回指定期间内资产的折旧。 语法 DDB (cost, salvage, life, period, [factor])争论 Argument描述Required/OptionalCostThe initial cost of the asset.RequiredSalvage 折旧结束时的价值(有时称为资产的残值)。 该…

Druid LogFilter输出可执行的SQL

配置 测试代码&#xff1a; DruidDataSource dataSource new DruidDataSource(); dataSource.setUrl("xxx"); dataSource.setUsername("xxx"); dataSource.setPassword("xxx"); dataSource.setFilters("slf4j"); dataSource.setVal…

RTSP/Onvif安防视频云平台EasyNVR视频监控汇聚平台显示视频流却无法播放,是什么原因?

EasyNVR是基于RTSP/Onvif协议的视频平台&#xff0c;拥有视频监控直播、录像、云存储、检索与回看、国标级联等视频能力&#xff0c;可支持将接入的视频流进行全平台、全终端的分发&#xff0c;分发的视频流包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等格式。 有用户反馈…