Flink学习4 - 富函数 + 数据重分区操作 + sink 操作(kafka、redis、jdbc)

1、富函数 - 函数类接口,可以获取运行环境的上下文,实现更复杂的功能

在这里插入图片描述
在这里插入图片描述

2、数据重分区操作

在这里插入图片描述
在这里插入图片描述

3、sink操作

sink - kafka

1、引入kafka的pom依赖

<dependency><groupId>org.apache.flink</groupId>
<!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
<!--<version>${flink.version}</version>--><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version>
</dependency>

2.启动 zookeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties

3.启动 kafka 服务

$ bin/kafka-server-start.sh config/server.properties

4.启动 kafka 生产者

$ bin/kafka-console-profucer.sh --broker-list localhost:9092 --topic sensor

5.运行 Flink 程序,在 kafka 生产者输入数据,查看 kafka 消费者的输出结果
![在这里插入图在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

sink - redis

1、添加 pom 依赖
在这里插入图片描述
2、 java代码
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3、启动 redis

redis-server..exe redis.windows.conf

原来的不要关闭,启动另一个窗口

redis-cli.exe -h 127.0.0.1 -p 6379

4、运行程序,进行查询
在这里插入图片描述

sink-JDBC自定义sink-mysql

1.pom依赖
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

错误

1 启动 zookeeper,却无法启动 kafka

原因:kafka 日志被异常清理导致进程频繁挂掉

linux 会定时清理 /tmp 目录下的文件, kafka 日志文件目录正是放在了 /tmp/kafka-logs目录下,导致被定时给清理掉了,所以 kafka 在尝试读取或追加日志时就会出错。

修改:配置文件中的log.dirs

vi ./config/server.properties

重启kafka
在这里插入图片描述
2 另一个程序正在使用此文件,进程无法访问

原因:该问题是因为在关闭启动命令窗口时,直接点击右上角的×号关闭,下次启动就会出现该问题。
正确的关闭窗口方法:在启动窗口按ctrl+C

输入字母—Y则可成功关闭。需要注意的是该选择可能需要等待一会。这样子关闭窗口下次就可以正常启动了。

3由于 window 自带的 linux 子系统,由于是个 mini 的系统,没有其他功能,也无法下载软件 ,因此 windows 版本的 redis,在 window 中的 cmd 中测试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/725363.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Linux/Ubuntu/Debian中加密磁盘(U盘)

sudo cryptsetup luksFormat /dev/sdX 此命令初始化指定块设备 (/dev/sdX) 上的 LUKS。它将提示你输入用于解锁加密设备的密码。 sudo cryptsetup open --type luks /dev/sdX crypto_disk 此命令打开 LUKS 加密设备 (/dev/sdX) 并将其映射到名为“encrypted_disk”的新设备。映…

Java基础知识点

Java基础知识点 1.方法重载和重写的区别 方法重载&#xff1a; 同一个类中的方法&#xff0c;方法名相同&#xff0c;返回值可以相同可以不同&#xff0c;参数列表必须不同发生在编译期&#xff0c;在编译期确定执行哪个方法 方法重写&#xff1a; 指的是子类重新定义父类…

ZooKeeper和Diamond有什么不同

本文主要是讨论下两个类似产品&#xff1a;ZooKeeper和Diamond在配置管理这个应用场景上的异同点。 Diamond&#xff0c;顾名思义&#xff0c;寄寓了开发人员对产品稳定性的厚望&#xff0c;希望它像钻石一样&#xff0c;提供稳定的配置访问。Diamond是淘宝网Java中间件团队的核…

零基础如何系统自学Python

零基础系统自学Python 学习前的准备 明确学习目标 Python 一共有两大版本&#xff0c;即 Python2 以及 Python3&#xff0c;Python2 已停止维护&#xff0c;强烈建议直接上手 Python3。Python 可以说是无所不能&#xff0c;主要有以下几大方向&#xff0c;建议选择自己感兴趣…

递归神经网络 (RNN) 及其变体 LSTM (长短期记忆) 和 GRU (门控循环单元)

递归神经网络&#xff08;RNN, Recurrent Neural Networks&#xff09;是一类用于处理序列数据的神经网络&#xff0c;特别适合于时间序列数据、语音、文本等连续数据的处理。RNN之所以独特&#xff0c;是因为它们在模型内部维持一个隐藏状态&#xff0c;该状态理论上可以捕获到…

【国产MCU】-CH32V307-SysTick中断与延时功能实现

SysTick中断与延时功能实现 文章目录 SysTick中断与延时功能实现1、SysTick介绍2、SysTick中断使用3、SysTick实现微秒和毫秒延时功能CH32V307的RISC-V内核控制器自带的一个64位可选递增或递减的计数器,用于产生SYSTICK异常(异常号:15),可专用于实时操作系统,为系统提供“…

LabVIEW高精度天线自动测试系统

LabVIEW高精度天线自动测试系统 系统是一个集成了LabVIEW软件的自动化天线测试平台&#xff0c;提高天线性能测试的精度与效率。系统通过远程控制测试仪表&#xff0c;实现了数据采集、方向图绘制、参数计算等功能&#xff0c;特别适用于对天线辐射特性的精确测量。 在天线的…

20 easy 70. 爬楼梯

//假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 // // 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; // // // // 示例 1&#xff1a; // // //输入&#xff1a;n 2 //输出&#xff1a;2 //解释&#xff1a;有两种方法可以爬到楼顶。 /…

uniapp H5 $el.querySelectorAll is not a function

在监听是否在可视区域遇到问题&#xff08;网页端&#xff09; 解决方案 <view class"container"> ...省略 业务代码... </view>参考 &#xff1a; https://blog.csdn.net/qq_18841969/article/details/134620559

ZJGSU 1737 链表

题目描述 请根据输入数据构造一个带头结点的单链表&#xff0c;链表结点的数据结构为struct node {int data; struct node *next;}&#xff0c;试设计算法&#xff1a;按递增次序输出单链表中各结点的数据元素&#xff0c;并释放结点所占用的存储空间。 要求&#xff1a;不允…

裸机编程的几种模式、架构、缺陷

目录 裸机编程模式/架构 1&#xff1a;初始化代码的编写 裸机编程模式/架构 2&#xff1a;轮询模式 裸机编程模式/架构 3&#xff1a;轮询加中断执行模式 裸机编程模式/架构 4&#xff1a;中断定时器主循环的前后台架构 裸机编程模式/架构 5&#xff1a;前后台 状态机架构…

常见的几种echarts类型

一&#xff1a;折线图 let option {tooltip: {},animation: false,grid: {top: "20%",bottom: "33%", //也可设置left和right设置距离来控制图表的大小left: 5%,right: 5%},xAxis: {boundaryGap:false,data: [1,2,3,4,5],axisLine: {show: true, //隐藏X轴…

Leetcode : 147. 对链表进行插入排序

给定单个链表的头 head &#xff0c;使用 插入排序 对链表进行排序&#xff0c;并返回 排序后链表的头 。 插入排序 算法的步骤: 插入排序是迭代的&#xff0c;每次只移动一个元素&#xff0c;直到所有元素可以形成一个有序的输出列表。 每次迭代中&#xff0c;插入排序只从输…

(科目三)简答题汇总

模块一 信息和计算机基础 一、简述信息的的特征 【记忆技巧】普传功夫真驾驶 普遍性&#xff1a;信息是无时不有的&#xff0c;无时不在的载体依附性:信息不能独立存在&#xff0c;必须有载体。价值性:同一则信息&#xff0c;对不同的人来说&#xff0c;价值不同。时效性:同…

【AIGC调研系列】在AIGC发展背景下数据标注领域的机会

数据标注领域拥抱AIGC的发展主要通过以下几个方面实现&#xff1a; 市场规模的快速增长&#xff1a;随着AIGC技术的应用和发展&#xff0c;特别是在数据标注领域&#xff0c;市场规模呈现出爆炸式增长。预计未来几年内&#xff0c;中国AIGC数据标注产业的市场规模将达到百亿量…

如何快速的搭建一个小程序

要快速搭建一个小程序&#xff0c;你可以按照以下步骤进行&#xff1a; 明确目标和需求&#xff1a;在开始搭建小程序之前&#xff0c;首先明确你的小程序的主要功能、目标用户以及希望实现的业务需求。这将帮助你更好地规划和设计小程序。选择小程序平台&#xff1a;根据你的…

【Leetcode】字符串 string 补充知识

有限状态机 请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数&#xff08;类似 C/C 中的 atoi 函数&#xff09;。 函数 myAtoi(string s) 的算法如下&#xff1a; 读入字符串并丢弃无用的前导空格检查下一个字符&#xff08;假…

windows 两个服务器远程文件夹同步,支持文件新增文件同步、修改文件同步、删除文件同步,根据文件大小和时间戳判断文件是否修改 python脚本

在Python中实现Windows两个服务器之间的文件夹同步&#xff0c;包括文件新增、修改和删除的同步&#xff0c;可以使用paramiko库进行SSH连接以及SFTP传输&#xff0c;并结合文件大小和时间戳判断文件是否发生过变化。以下是包含删除文件同步逻辑的完整脚本示例&#xff1a; im…

二十五、剖析HashMap

剖析HashMap 本文为书籍《Java编程的逻辑》1和《剑指Java&#xff1a;核心原理与应用实践》2阅读笔记 1.1 Map 接口 Map是映射&#xff0c;有键和值的概念&#xff0c;映射表示键和值之间的对应关系&#xff0c;一个键映射到一个值&#xff0c;Map按照键存储和访问值&#x…

leetcode 3.5

普通数组 1.最大子数组和 最大子数组和 前缀和pre 动态规划 pre保留的是当前包含了当前遍历的最大的前缀和&#xff0c;如果之前的pre 对结果有增益效果&#xff0c;则 pre 保留并加上当前遍历, 如果pre 对结果无增益效果&#xff0c;需要舍弃&#xff0c;则 pre 直接更新为…