【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、redis环境及相关内容说明
  • 三、redis作为Flink的source异步数据交互示例
    • 1、maven依赖
    • 2、redis异步交互数据实现
      • 1)、读取redis数据时以string进行输出
      • 2)、读取redis数据时以pojo进行输出
    • 3、使用示例
    • 4、验证
      • 1)、准备redis环境数据
      • 2)、启动应用程序,并观察控制台输出


本文主要介绍Flink 的redis作为数据源的异步读取使用示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文依赖redis环境好用。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、redis环境及相关内容说明

1、本示例是需要redis环境的,至于redis是集群或单击和本示例关系不大。
关于redis的更多信息请参考其官网。

2、本示例是以redis作为外部数据进行异步交互的例子,也是实际中应用中常见的例子。关于异步数据交互参考文章:55、Flink之用于外部数据访问的异步 I/O

三、redis作为Flink的source异步数据交互示例

本示例是模拟根据外部数据用户姓名查询redis中用户的个人信息。
本示例外部数据就以flink的集合作为示例,redis数据中存储的为hash表,下面验证中会有具体展示。

1、maven依赖

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>flink-streaming-java_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-runtime_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-core</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-java</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions>
</dependency>

2、redis异步交互数据实现

1)、读取redis数据时以string进行输出

package org.datastreamapi.source.custom.redis;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import com.sun.jdi.IntegerValue;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** @author alanchan**/
public class CustomRedisSource extends RichAsyncFunction<String, String> {private JedisPoolConfig config = null;private static String ADDR = "192.168.10.41";private static int PORT = 6379;// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时private static int TIMEOUT = 10000;private JedisPool jedisPool = null;private Jedis jedis = null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config = new JedisPoolConfig();jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);jedis = jedisPool.getResource();}@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {// 文件中读取的内容System.out.println("输入参数input----:" + input);// 发起一个异步请求,返回结果CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadUser_Redis", name);System.out.println("查询结果output----:" + value);return value;}}).thenAccept((String dbResult) -> {// 设置请求完成时的回调,将结果返回resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法@Overridepublic void timeout(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("redis connect timeout!");}@Overridepublic void close() throws Exception {super.close();if (jedis.isConnected()) {jedis.close();}}@Data@AllArgsConstructor@NoArgsConstructorstatic class User {private int id;private String name;private int age;private double balance;User(String value) {String[] str = value.split(",");this.setId(Integer.valueOf(str[0]));this.setName(str[1]);this.setAge(Integer.valueOf(str[2]));this.setBalance(Double.valueOf(str[0]));}}}

2)、读取redis数据时以pojo进行输出

package org.datastreamapi.source.custom.redis;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** @author alanchan**/
public class CustomRedisSource2 extends RichAsyncFunction<String, User> {private JedisPoolConfig config = null;private static String ADDR = "192.168.10.41";private static int PORT = 6379;// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时private static int TIMEOUT = 10000;private JedisPool jedisPool = null;private Jedis jedis = null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config = new JedisPoolConfig();jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);jedis = jedisPool.getResource();}@Overridepublic void asyncInvoke(String input, ResultFuture<User> resultFuture) throws Exception {System.out.println("输入查询条件:" + input);CompletableFuture.supplyAsync(new Supplier<User>() {@Overridepublic User get() {String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadUser_Redis", name);System.out.println("查询redis结果:" + value);return new User(value);}}).thenAccept((User dbResult) -> {// 设置请求完成时的回调,将结果返回resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法@Overridepublic void timeout(String input, ResultFuture<User> resultFuture) throws Exception {System.out.println("redis connect timeout!");}@Overridepublic void close() throws Exception {super.close();if (jedis.isConnected()) {jedis.close();}}}

3、使用示例

package org.datastreamapi.source.custom.redis;import java.util.concurrent.TimeUnit;import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.redis.CustomRedisSource.User;/*** @author alanchan**/
public class TestCustomRedisSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// id,nameDataStreamSource<String> lines = env.fromElements("1,alan", "2,alanchan", "3,alanchanchn", "4,alan_chan", "5,alan_chan_chn");SingleOutputStreamOperator<String> result = AsyncDataStream.orderedWait(lines, new CustomRedisSource(), 10, TimeUnit.SECONDS, 1);SingleOutputStreamOperator<User> result2 = AsyncDataStream.orderedWait(lines, new CustomRedisSource2(), 10, TimeUnit.SECONDS, 1);result.print("result-->").setParallelism(1);result2.print("result2-->").setParallelism(1);env.execute();}
}

4、验证

1)、准备redis环境数据

hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'127.0.0.1:6379> hset AsyncReadUser_Redis alan '1,alan,18,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,alan.chan.chn@163.com'
(integer) 1
127.0.0.1:6379> hgetall AsyncReadUser_Redis1) "alan"2) "1,alan,18,20,alan.chan.chn@163.com"3) "alanchan"4) "2,alanchan,19,25,alan.chan.chn@163.com"5) "alanchanchn"6) "3,alanchanchn,20,30,alan.chan.chn@163.com"7) "alan_chan"8) "4,alan_chan,27,20,alan.chan.chn@163.com"9) "alan_chan_chn"
10) "5,alan_chan_chn,36,10,alan.chan.chn@163.com"

2)、启动应用程序,并观察控制台输出

输入查询条件:5,alan_chan_chn
输入参数input----:2,alanchan
输入参数input----:5,alan_chan_chn
输入查询条件:3,alanchanchn
输入查询条件:1,alan
输入参数input----:1,alan
输入查询条件:2,alanchan
输入查询条件:4,alan_chan
输入参数input----:4,alan_chan
输入参数input----:3,alanchanchn
查询结果output----:3,alanchanchn,20,30,alan.chan.chn@163.com
查询redis结果:1,alan,18,20,alan.chan.chn@163.com
查询结果output----:1,alan,18,20,alan.chan.chn@163.com
查询redis结果:4,alan_chan,27,20,alan.chan.chn@163.com
查询redis结果:2,alanchan,19,25,alan.chan.chn@163.com
查询结果output----:2,alanchan,19,25,alan.chan.chn@163.com
查询redis结果:3,alanchanchn,20,30,alan.chan.chn@163.com
查询结果output----:4,alan_chan,27,20,alan.chan.chn@163.com
查询结果output----:5,alan_chan_chn,36,10,alan.chan.chn@163.com
查询redis结果:5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 4,alan_chan,27,20,alan.chan.chn@163.com
result-->> 5,alan_chan_chn,36,10,alan.chan.chn@163.com
result-->> 3,alanchanchn,20,30,alan.chan.chn@163.com
result-->> 2,alanchan,19,25,alan.chan.chn@163.com
result-->> 1,alan,18,20,alan.chan.chn@163.com
result2-->> CustomRedisSource.User(id=4, name=alan_chan, age=27, balance=4.0)
result2-->> CustomRedisSource.User(id=1, name=alan, age=18, balance=1.0)
result2-->> CustomRedisSource.User(id=3, name=alanchanchn, age=20, balance=3.0)
result2-->> CustomRedisSource.User(id=5, name=alan_chan_chn, age=36, balance=5.0)
result2-->> CustomRedisSource.User(id=2, name=alanchan, age=19, balance=2.0)

以上,本文主要介绍Flink 的redis作为数据源的异步读取使用示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/217793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

四、mapbox搭载vue3测试demo(31-40)

demo地址https://bidding-m.gitee.io/mapbox-test/#/ 31、[添加] 热力图图层 32、[添加] 样式聚类 33、[添加] HTML聚类 34、[添加] 点动画效果 35、[添加] marker动

探索SSL证书的应用场景,远不止网站,还有小程序、App Store等

说到SSL证书&#xff0c;我们都知道其是用于实现HTTPS加密保障数据安全的重要工具&#xff0c;在建设网站的时候经常会部署SSL证书。但实际上&#xff0c;SSL证书的应用场景远不止网站&#xff0c;它还被广泛地应用到小程序、App Store、抖音广告、邮件服务器以及各种物联网设备…

用户管理第2节课 -- idea 2023.2 创建表

一、懂得 1.1编码格式是防止乱码的&#xff0c;utf-8是完全够的&#xff0c;那几个基本没差别 网址&#xff1a; 【IDEA——连接MySQL数据库&#xff0c;创建库和表】_idea中数据库-CSDN博客 这些是MySQL数据库中的一些术语&#xff0c;可以简单解释如下&#xff1a; 1、col…

【K8S 系列】认识k8s、k8s架构

一、什么是k8s? Kubernetes 简称 k8s&#xff0c;是支持云原生部署的一个平台&#xff0c;k8s 本质上就是用来简化微服务的开发和部署的&#xff0c;用于自动化部署、扩展和管理容器化应用的开源容器编排技术。对于传统的docker其实也提供了容器编排的技术docker-compose&…

SystemServer 进程启动过程

首语 SystemServer进程主要用于启动系统服务&#xff0c;诸如AMS、WMS、PMS都是由它来创建的。在系统的名称为"system_server"&#xff0c;Android核心服务都是它启动&#xff0c;它是非常重要。 Zygote处理SystemServer进程 在 Zygote启动过程 文章中分析我们知道…

LeetCode算法练习:双指针计算三数之和和四数之和

通过双指针将时间复杂度降一个级别。 public class TOP {//15. 三数之和public List<List<Integer>> threeSum(int[] nums) {List<List<Integer>> res new ArrayList<>();int n nums.length;if (n < 3) {return res;}Arrays.sort(nums);//…

2023/12/12作业

思维导图 作业&#xff1a; 成果图 代码 #include "widget.h" #include "ui_widget.h" Widget::Widget(QWidget *parent) : QWidget(parent) , ui(new Ui::Widget) { speechernew QTextToSpeech(this); ui->setupUi(this); //一直获取当前时间 idst…

SQL server创建联合索引

CREATE INDEX idx_dim_product_dt_ord ON [dim_product] (dt, ord); 在SQL Server中计算月同比和季度同步的SQL查询可能看起来像这样&#xff1a; ### 月度同比 月度同比是与前一年同一月份的数据进行比较。以下是一个基本的例子&#xff0c;假设我们有一个名为sales的表&…

1843_emacs中两个插件use-package以及org-bullets的使用

Grey 1843_emacs中两个插件use-package以及org-bullets的使用 全部学习汇总&#xff1a; GitHub - GreyZhang/editors_skills: Summary for some common editor skills I used. 我个人的emacs的配置以及两个插件的使用由来 我自己现在也开始维护一个我自己的emacs配置&…

剑指offer(C++)-JZ49:丑数(算法-其他)

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 题目描述&#xff1a; 把只包含质因子2、3和5的数称作丑数&#xff08;Ugly Number&#xff09;。例如6、8都是丑数&#xff0c;…

深入理解Golang中的goroutine 池

并发性是 Golang 的一项强大功能,它允许开发人员同时有效地管理多个任务。工作线程池的实现是并发的最常见用例之一。在本文中,我们将了解 Golang 中工作线程池的概念,讨论它们的好处,并引导您完成在下一个 Go 项目中实现工作线程池的过程。 什么是工作线程池? 工作线程…

【数据库】基于有效性确认的并发访问控制原理及调度流程,乐观无锁模式,冲突较少下的最优模型

使用有效性确认的并发控制 ​专栏内容&#xff1a; 手写数据库toadb 本专栏主要介绍如何从零开发&#xff0c;开发的步骤&#xff0c;以及开发过程中的涉及的原理&#xff0c;遇到的问题等&#xff0c;让大家能跟上并且可以一起开发&#xff0c;让每个需要的人成为参与者。 本专…

物流供应链数字化转型:国内领先服务商技术综合解析

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

TCP/IP详解——IP协议,IP选路

文章目录 1. IP 编址1.1 IP 报文头部1.2 进制之间的转换1.3 网络通信1.4 有类 IP 编制的缺陷1.5 变长子网掩码1.6 网关1.7 IP 包分片1.7.1 IP 包分片实例1.7.2 IP 分片注意事项1.7.3 Wireshark 抓取 IP 包分片1.7.4 OmniPeek 抓取 IP 包分片1.7.5 ICMP 不可达差错&#xff08;需…

【词云图】从excel和从txt文件,绘制以句子、词为单位的词云图

从excel和从txt文件&#xff0c;绘制以句子、词为单位的词云图 写在最前面数据说明&结论 从txt文件&#xff0c;绘制以句子、词为单位的词云图自我介绍 从excel&#xff0c;绘制以句子、词为单位的词云图读取excel绘制以句子、词为单位的词云图文章标题 写在最前面 经常绘…

GEE机器学习——利用梯度决策树Gradient Tree Boost 方法(GBDT/GBRT)进行土地分类和精度测试

Gradient Tree Boost 方法的具体介绍 梯度提升树(Gradient Tree Boost)是一种集成学习方法,通过串行训练多个决策树来解决回归和分类问题。它通过迭代的方式不断优化模型预测结果,使得每一棵树能够纠正前一棵树的预测误差。 Gradient Tree Boost方法的具体步骤如下: 1. …

小程序时代的机遇:开发成功的知识付费平台

知识付费平台不仅为知识创作者提供了广阔的变现渠道&#xff0c;同时也为用户提供了更为个性化、精准的学习体验。本篇文章&#xff0c;小编将为大家讲解知识付费小程序开发相关的知识。 一、小程序时代的背景 知识付费作为小程序领域中的“大热门”&#xff0c;有着非常高的…

WT2605C-32N语音芯片:跑步机音乐新搭档,畅享健康奔跑旋律

在现代健身领域&#xff0c;唯创知音的WT2605C-32N蓝牙音频MP3音乐解码语音芯片IC作为音乐解码的新搭档&#xff0c;为跑步机带来了更为智能、富有音乐节奏的健康奔跑体验&#xff0c;引领跑步健身的新时代。 1. 蓝牙连接&#xff0c;自由音乐选择 跑步机启动时&#xff0c;W…

【java】Optional操作

参考&#xff1a; 【Java8】 Optional 详解java Optional操作 1、简介 Optional类是Java8为了解决null值判断问题&#xff0c;借鉴google guava类库的Optional类而引入的一个同名Optional类&#xff0c;使用Optional类可以避免显式的null值判断&#xff08;null的防御性检查…

一个最小的物联网系统设计方案及源码(一)——系统组成

关于物联网 物联网&#xff08;Internet of Things&#xff0c;缩写IOT&#xff09;是一个基于互联网、传统电信网等信息承载体&#xff0c;让所有能够被独立寻址的普通物理对象实现互联互通的网络。 物联网一般为无线网&#xff0c;由于每个人周围的设备可以达到一千至五千个&…