Flink定制化功能开发,demo代码

前言:

       这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

2.2.2 kafka生产者线程方法

package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生产数据** @author i7杨* @date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic = topic;}private static Producer<String, String> createProducer() {// 通过Properties类设置Producer的属性Properties properties = new Properties();
//        测试环境 kafka 配置properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}@Overridepublic void run() {Producer<String, String> producer = createProducer();Random random = new Random();Random random2 = new Random();while (true) {int nums = random.nextInt(10);int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();String time = new Date().getTime() / 1000 + 5 + "";String type = "pv";try {if (nums2 % 2 == 0) {type = "pv";} else {type = "uv";}
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";String info = nums + "=" + nums2;System.out.println("message : " + info);producer.send(new ProducerRecord<String, String>(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println("=========数据已经写入==========");try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil("test01").run();}public static void sendMessage(String topic, String message) {Producer<String, String> producer = createProducer();producer.send(new ProducerRecord<String, String>(topic, message));}}
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重构** @author i7杨* @date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunction<String> {private double threshold;public InfoFilterFunction(double threshold) {this.threshold = threshold;}@Overridepublic boolean filter(String value) throws Exception {if (value.split("=").length == 2)// 阈值过滤return Double.valueOf(value.split("=")[1]) > threshold;else return false;}
}

自定义MapFunction函数:后缀为2的,添加上特殊信息

package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {System.out.println("value:" + value);if (value.endsWith("2"))return value.concat(":Special processing information");else return value;}
}
2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps = new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 创建 Kafka 消费者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test01",// Kafka 主题名称new SimpleStringSchema(),kafkaProps);// 从 Kafka 中读取数据流DataStream<String> kafkaStream = env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print("阈值大于40以上的message=");// 执行任务env.execute("This is a testing task");}}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis夯实之路-集群详解

Redis有单机模式和集群模式。 集群是 Redis 提供的分布式数据库方案&#xff0c;集群通过分片( sharding )来实现数据共享&#xff0c;并提供复制和故障转移。集群模式可以有多个 master 。使用集群模式可以进一步提升 Redis 性能&#xff0c;分布式部署实现高可用性&#xff…

Linux中断 -- 中断应答、嵌套、

接上文&#xff0c;本文继续介绍Linux软件部分逻辑。 参考内核版本&#xff1a;kernel-4.19 目录 1.中断信号在各级中断控制器中的应答 2.supports_deactivate_key意义 3.中断嵌套 1.中断信号在各级中断控制器中的应答 本章主要从内核软件层面来看各中断控制器对中断信号处…

Android开发基础(三)

Android开发基础&#xff08;三&#xff09; 本篇将介绍Android权限管理。 Android权限管理 Android权限管理主要是为了保护用户的隐私和设备的安全性&#xff1b; 在Android系统中&#xff0c;应用在请求权限时必须进行明确的申请&#xff0c;根据权限的保护级别&#xff0…

软件测试最新项目合集【商城、外卖、银行、金融等等.......】

​项目一&#xff1a;ShopNC商城 项目概况&#xff1a; ShopNC商城是一个电子商务B2C电商平台系统&#xff0c;功能强大&#xff0c;安全便捷。适合企业及个人快速构建个性化网上商城。 包含PCIOS客户端Adroid客户端微商城&#xff0c;系统PC后台是基于ThinkPHP MVC构架开发的…

NAND新一代接口Separate Command Address (SCA) 简介

通过NAND Flash总线传输的信号分为三种类型&#xff1a;命令&#xff08;Commands&#xff09;、地址&#xff08;Addresses&#xff09;和数据&#xff08;Data&#xff09;。这些信号利用DQ[7:0]时间分时复用技术&#xff0c;在不同的时间段分别进行传输。其中&#xff0c;数…

BikeDNA(五)参考数据的内在分析1

BikeDNA&#xff08;五&#xff09;参考数据的内在分析1 该笔记本分析用户提供的给定区域的参考自行车基础设施数据集的质量。 质量评估是“内在的”&#xff0c;即仅基于一个输入数据集&#xff0c;并且不使用数据集外部的信息。 对于将参考数据集与相应 OSM 数据进行比较的外…

redis — redis cluster集群模式下如何实现批量可重入锁?

一、redis cluster 集群版 在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能…

Java内存结构

前文&#xff1a; 《Java8之类的加载》 《Java8之类加载机制class源码分析》 写在开头&#xff1a;本文为学习后的总结&#xff0c;可能有不到位的地方&#xff0c;错误的地方&#xff0c;欢迎各位指正。 JVM 在执行 Java 程序的过程中会把它所管理的内存划分为若干个不同的数…

Ubuntu server配置ssh远程登录

使用如下命令进行安装 apt-get install ssh 安装好后启动 service ssh start 然后查看运行状态 然后用本机ping虚拟机 关闭本机和虚拟机防火墙 ufw disable 然后打开Xshell进行连接

14、MySQL高频面试题

1、内连接和外连接的区别 内连接和外连接都是数据库进行多表联查时使用的连接方式&#xff0c;区别在于二者获取的数据集不同 内连接指的是使用左表中的每一条数据分别去连接右表中的每一条数据&#xff0c;仅仅显示出匹配成功的那部分 外连接有分为左外连接和右外连接 左外…

重磅!OpenAI正式发布,自定义ChatGPT商店!

1月11日凌晨&#xff0c;OpenAI在官网正式发布了&#xff0c;自定义GPT商店&#xff0c;可以帮助用户找到目前最好用、流行的自定义ChatGPT助手。 在2024年第一季度&#xff0c;OpenAI将启动GPT 开发者收入计划。首先&#xff0c;美国地区的开发者将根据用户对其 GPT 的使用情…

day-07 统计出现过一次的公共字符串

思路 用哈希表统计words1和words2中各个字符串的出现次数&#xff0c;次数皆为1的字符串符合题意 解题方法 //用于存储words1中各个字符串的出现次数 HashMap<String,Integer> hashMap1new HashMap<>(); //用于存储words2中各个字符串的出现次数 HashMap<Stri…

小程序系列-5.WXML 模板语法

一、数据绑定 1、在 data 中定义页面的数据 动态绑定内容&#xff1a; 动态绑定属性&#xff1a; 2. Mustache 语法的格式 3. Mustache 语法的应用场景 4. 三元运算 5.算数运算 二、 事件绑定 1. 什么是事件&#xff1f; 2. 小程序中常用的事件 3. 事件对象的属性列表 4.…

Linux基础工具的使用(yum,vim,gcc,g++,gdb,make/makefile)【详解】

目录 linux软件包管理器-yum什么是软件包&#xff1f;查找软件包如何安装软件卸载软件 linux编辑器 - vimvim的基本概念vim模式之间的切换vim命令模式各命令汇总vim底行模式各命令汇总 Linux编译器 - gcc/ggcc/g的作用gcc/g选项预处理编译汇编链接静态库与动态库 Linux调试器 -…

详细分析Java中的@Transactional注解

目录 前言1. 基本知识2. 常用属性3. Demo4. 总结 前言 Transactional 是 Spring 框架中用于管理事务的注解。 该注解来源于Spring&#xff0c;对于Spring的基础知识可看我之前的文章&#xff1a; Spring框架从入门到学精&#xff08;全&#xff09; 该注解也可用在xxl-job框架…

自动化测试框架pytest系列之8个常用的装饰器函数

自动化测试框架pytest系列之基础概念介绍(一)-CSDN博客 自动化测试框架pytest系列之21个命令行参数介绍(二)-CSDN博客 自动化测试框架pytest系列之强大的fixture功能&#xff0c;为什么fixture强大&#xff1f;一文拆解它的功能参数。(三)-CSDN博客 接上文 3.5 pytest的8…

Vant4在Vue3.3中如何按需导入组件和样式

前言 最近我在Vue 3.3的项目中对Vant4做按需导入时&#xff0c;尽管按照Vant4的官方指南进行操作&#xff0c;但样式仍然无法正确加载。经过深入研究和多篇文章的比较&#xff0c;我终于找到了在Vue3中如何正确的按需导入Vant 4组件和样式的方法。由于Vue3.3和Vant4相对较新&am…

多无人机编队避障(人工势场法)

matlab2020正常运行&#xff0c;预设编队类型&#xff0c;目标位置&#xff0c;障碍物 多无人机编队避障&#xff08;人工势场法&#xff09;资源-CSDN文库

浅析链表结构

一、单向链表 C语言中数组是常用的一种数据类型&#xff0c;但可惜数组长度是固定大小的&#xff0c;不能动态扩展&#xff0c;使用起来有时不是很方便。然后就有了自定义的动态数组结构&#xff0c;动态数组就比较好用了&#xff0c;长度可以任意扩展&#xff0c;但还有一个问…

easyexcel 3.0.x 版本实现指定列 锁定以及指定列隐藏

1&#xff1a;效果示例 2&#xff1a;代码示例&#xff1a; UnLockCell.java package com.example.juc.zhujie;/*** Author * Date Created in 2023/12/19 10:09* DESCRIPTION:* Version V1.0*/import java.lang.annotation.*;/*** 用于标记锁定哪些列不需要锁定* author 12…