flink学习(6)——自定义source和kafka

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}

cancel+open+close的调用时机

package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}

 kafkaSource

package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/62249.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu安装chrome无法打开问题

如果在ubuntu安装chrome后&#xff0c;点击chrome打开没反应&#xff0c;可以先试着在terminal上用命令打开 google-chrome 如果运行命令显示 Chrome has locked the profile so that it doesnt get corrupted. If you are sure no other processes are using this profile…

Ansible--自动化运维工具

Ansible自动化运维工具介绍 1.Ansible介绍 Ansible是一款自动化运维工具&#xff0c;基于Python开发&#xff0c;集合了众多运维工具&#xff08;puppet、cfengine、chef、func、fabric&#xff09;的优点&#xff0c;实现了批量系统配置、批量程序部署、批量运行命令等功能。…

软件工程相关-用PD画类图-设置方法的参数

前提&#xff1a;pd16安装完成&#xff0c;已经画好了类 1、为类添加方法。 2、双击方法&#xff0c;如下图所示 3、此时会有弹窗&#xff0c;选择参数栏&#xff0c;按需求进行设置。 &#xff08;总是忘记&#xff0c;故记录下自用

IT人日常健康工作生活方案

1. 早餐(7:00-8:00) 早餐是一天中最重要的一餐,提供充足的能量来启动新的一天。根据亚洲饮食的特点,我们加入了米饭、豆腐、蔬菜等传统食材,同时保持高蛋白、低糖的原则。 糙米粥或小米粥(1碗):低GI碳水化合物,有助于稳定血糖,提供持久能量。可加入少量的红枣、枸杞…

Qt配置Opencv环境

下载opencv后配置环境变量(官网下载&#xff09; 然后ok了

MongoDB相关问题

视频教程 【GeekHour】20分钟掌握MongoDB Complete MongoDB Tutorial by Net Ninja MongoDB开机后调用缓慢的原因及解决方法 问题分析&#xff1a; MongoDB开机后调用缓慢&#xff0c;通常是由于以下原因导致&#xff1a; 索引重建&#xff1a; MongoDB在启动时会重建索引…

目标检测之学习路线(本科版)

以下是为一名计算机科学与技术本科大四学生整理的“目标检测”学习路线&#xff0c;结合了从基础到高级的内容&#xff0c;适合初学者逐步深入。每个阶段都有明确的学习要求、学习建议和资源推荐。 阶段一&#xff1a;基础知识学习 学习要求&#xff1a; 掌握编程语言 Pytho…

网络安全在现代企业中的重要作用

网络安全是这个数字时代最令人担忧的事情之一。对技术的依赖性越来越强&#xff0c;使其同时面临多种网络威胁。其声誉和法律后果的大幅下降可能归因于一次妥协。 这使得良好的网络安全成为所有企业的选择和必需品。本文介绍了网络安全的重要性、企业中常见的网络威胁以及公司…

“harmony”整合不同平台的单细胞数据之旅

其实在Seurat v3官方网站的Vignettes中就曾见过该算法&#xff0c;但并没有太多关注&#xff0c;直到看了北大张泽民团队在2019年10月31日发表于Cell的《Landscap and Dynamics of Single Immune Cells in Hepatocellular Carcinoma》&#xff0c;为了同时整合两类数据&#xf…

《进程隔离机制:C++多进程编程安全的坚固堡垒》

在当今数字化时代&#xff0c;软件系统的安全性愈发成为人们关注的焦点。尤其是在 C多进程编程领域&#xff0c;如何确保进程间的安全交互与数据保护&#xff0c;是每一位开发者都必须面对的重要课题。而进程隔离机制&#xff0c;犹如一座坚固的堡垒&#xff0c;为 C多进程编程…

C++11 http服务端和客户端库cpp-httplib

C11 http服务端和客户端库cpp-httplib 环境&#xff1a; http: yhirose/cpp-httplib v0.18.1 json: nlohmann/json v3.11.31. 简介 cpp-httplib 是一个轻量级且易于使用的 C11 HTTP 库&#xff0c;由 yhirose 开发和维护&#xff0c;开源协议为MIT。它支持 HTTP/HTTPS 协议&…

怎样使用sys.dm_os_wait_stats

文章目录 sys.dm_os_wait_stats 支持诊断 SQL Server 性能问题的基本指标。如果在 SQL Server 引擎中遇到一些问题&#xff08;CPU、内存、I/O、锁、闩锁等&#xff09;&#xff0c;sys.dm_os_wait_stats 数据揭示一些问题。SQL Server Management Studio 中的活动监视器&#…

Hbase2.2.7集群部署

环境说明 准备三台服务器&#xff0c;分别为&#xff1a;bigdata141&#xff08;作为Hbase主节点&#xff09;、bigdata142、bigdata143确保hadoop和zookeeper集群都先启动好我这边的hadoop版本为3.2.0&#xff0c;zookeeper版本为3.5.8 下载安装包 下载链接&#xff1a;In…

JVM 性能调优 -- CMS 垃圾回收器 GC 日志分析【Full GC】

前言&#xff1a; 上一篇我们分析了 Minor GC 的发生过程&#xff0c;因为 GC 日志没有按我们预估的思路进行打印&#xff0c;其中打印了 CMS 垃圾回收器的部分日志&#xff0c;本篇我们就来分析一下 CMS 垃圾收集日志。 JVM 系列文章传送门 初识 JVM&#xff08;Java 虚拟机…

微信小程序中的WXSS与CSS的关系及使用技巧

微信小程序中的WXSS与CSS的关系及使用技巧 引言 在微信小程序的开发中,样式的设计与实现是构建用户友好界面的关键。微信小程序使用WXSS(WeiXin Style Sheets)作为其样式表语言,WXSS在语法上与CSS非常相似,但也有一些独特的特性。本文将深入探讨WXSS与CSS的关系,介绍WX…

自研芯片逾十年,亚马逊云科技Graviton系列芯片全面成熟

在云厂商自研芯片的浪潮中&#xff0c;亚马逊云科技无疑是最早践行这一趋势的先驱。自其迈出自研芯片的第一步起&#xff0c;便如同一颗石子投入平静的湖面&#xff0c;激起了层层涟漪&#xff0c;引领着云服务和云上算力向着更高性能、更低成本的方向演进。 早在2012年&#x…

掌上单片机实验室 — RT - Thread+ROS2 浅尝(26)

前面化解了Micro_ROS通讯问题&#xff0c;并在 RT-Thread Studio 环境下&#xff0c;使用Micro_ROS软件包中的例程&#xff0c;实现了STM32F411CE核心板和ROS2主机的通讯。之后还尝试修改例程 micro_ros_sub_twist.c &#xff0c;实现了接收 turtle_teleop_key 所发出的 turtle…

【Leetcode 每日一题】25. K 个一组翻转链表

25. K 个一组翻转链表 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的整数倍&#xff0c;那么请将最后剩余的节点保持原有顺序。 你不能只是单…

Android 图形系统之一:概览

Android 图形系统是一套完整的架构&#xff0c;用于管理从应用绘制到显示屏幕的整个流程。它涉及多个层次和组件&#xff0c;从应用程序到硬件&#xff0c;确保每一帧都能准确、高效地呈现到用户的设备屏幕上。 1. Android 图形系统的架构 Android 图形系统的架构可以分为以下…

【C语言】指针与数组的例题详解:深入分析与高级用法

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C语言 文章目录 &#x1f4af;前言&#x1f4af;题目一详细分析与解答代码逐步解析 &#x1f4af;进一步优化和拓展1. 指针与数组的关系2. 指针运算的注意事项3. 常见的错误和陷阱4. 拓展&#xff1a;指针操作的应用场…