大数据-玩转数据-Flink RedisSink

一、添加Redis Connector依赖

具体版本根据实际情况确定

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version>
</dependency>

二、启动redis

参见大数据-玩转数据-Redis 安装与使用

三、编写代码

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class SinkRedis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer key) throws Exception {return key.intValue();}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop100").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).setDatabase(0).setPassword("redis").build();keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(Integer integer) {return integer.toString();}@Overridepublic String getValueFromData(Integer integer) {return integer.toString();}}));env.execute();}
}

可以根据要写入的redis的不同数据类型进行调整

四、查询结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/41652.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机多输入分类预测

分类预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机多输入分类预测 目录 分类预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机多输入分类预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.分类预测 | MATLAB实现DBN-SVM深度置信网络结合支持向量机多输入分…

回归预测 | MATLAB实现基于SAE堆叠自编辑器多输入单输出回归预测

回归预测 | MATLAB实现基于SAE堆叠自编辑器多输入单输出回归预测 目录 回归预测 | MATLAB实现基于SAE堆叠自编辑器多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于SAE堆叠自编辑器多输入单输出回归预测&#xff1b; 2.运行环…

Request+Response

文章目录 1. 介绍2. Request对象2.1 Request继承体系2.2 Request获取请求数据1.获取请求行2.获取请求头3.获取请求体4. 请求参数的通用方式5. 解决中文乱码问题 2.3 Request请求转发请求转发资源间共享数据: 3. Response对象3.0 Response 继承体系3.1 Response设置响应数据的功…

iOS手机无法安装Charles 的ssl证书

问题描述 iOS客户端安装证书时一直卡在下载这一步&#xff0c;无法抓包 1、打开Charles&#xff0c;选择help→SSL Proxying→Install Charles Root Certificate on a Mobile Device or Remote Browser 2、按照步骤1中的提示进行操作&#xff0c;手机连接电脑代理&#xff0c;…

Spring系列七:声明式事务

&#x1f418;声明式事务 和AOP有密切的联系, 是AOP的一个实际的应用. &#x1f432;事务分类简述 ●分类 1.编程式事务: 示意代码, 传统方式 Connection connection JdbcUtils.getConnection(); try { //1.先设置事务不要自动提交 connection.setAutoCommit(false…

ZooKeeper的应用场景(分布式锁、分布式队列)

7 分布式锁 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源&#xff0c;那么访问这些资源的时候&#xff0c;往往需要通过一些互斥手段来防止彼此之间的干扰&#xff0c;以保证一致性&#xff0c;…

岛屿的最大面积(力扣)递归 JAVA

给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合&#xff0c;这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0&#xff08;代表水&#xff09;包围着。 岛屿的面积是岛上值为 1 的…

error_Network Error

此页面为订单列表&#xff0c;是混合开发(页面嵌入在客户端中) 此页面为订单列表&#xff0c;此需求在开发时后端先将代码发布在测试环境&#xff0c;我在本地调试时调用的后端接口进行联调没有任何问题。 此后我将代码发布在测试环境&#xff0c;在app中打开页面&#xff0c…

【C与C++的相互调用方法】

C与C的相互调用方法 C与C为什么相互调用的方式不同C中调用CC中调用C致谢 C与C为什么相互调用的方式不同 C 和 C 之间的相互调用方式存在区别&#xff0c;主要是由于 C 和 C 语言本身的设计和特性不同。 函数调用和参数传递方式不同&#xff1a;C 和 C 在函数调用和参数传递方面…

docker oracle linux命令执行sql

docker 安装参照 https://blog.csdn.net/arcsin_/article/details/123707618 docker container ls -a命令查看容器名 打开容器 docker exec -it orcl19c_03 /bin/bashsys 用户登录容器 sqlplus / as sysdbashow pdbs;什么是pdb数据库&#xff1f;什么是CDB&#xff1f; 参…

微信小程序 蓝牙设备连接,控制开关灯

1.前言 微信小程序中连接蓝牙设备&#xff0c;信息写入流程 1、检测当前使用设备&#xff08;如自己的手机&#xff09;是否支持蓝牙/蓝牙开启状态 wx:openBluetoothAdapter({}) 2、如蓝牙已开启状态&#xff0c;检查蓝牙适配器的状态 wx.getBluetoothAdapterState({}) 3、添加…

第十三章 SpringBoot项目(总)

1.创建SpringBoot项目 1.1.设置编码 1.4.导入已有的spring boot项目 2.快速搭建Restfull风格的项目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…

kafka的位移

文章目录 概要消费位移__consumer_offsets主题位移提交 概要 本文主要总结kafka的位移是如何管理的&#xff0c;在broker端如何通过命令行查看到位移信息&#xff0c;并从代码层面总结了位移的提交方式。 消费位移 对于 Kafka 中的分区而言&#xff0c;它的每条消息都有唯一…

0基础学习VR全景平台篇 第86篇:智慧眼-为什么要设置分组选择?

一、功能说明 分组选择&#xff0c;也就是给全景的每个分组去设置其所属的行政区划&#xff0c;设置后只有属于同行政区划的成员才可进入其场景进行相关操作&#xff0c;更便于实现城市的精细化管理。 二、后台编辑界面 分组名称&#xff1a;场景的分组名称。 对应分类&…

网络安全--linux下Nginx安装以及docker验证标签漏洞

目录 一、Nginx安装 二、docker验证标签漏洞 一、Nginx安装 1.首先创建Nginx的目录并进入&#xff1a; mkdir /soft && mkdir /soft/nginx/cd /soft/nginx/ 2.下载Nginx的安装包&#xff0c;可以通过FTP工具上传离线环境包&#xff0c;也可通过wget命令在线获取安装包…

【数据结构与算法】队列

文章目录 一&#xff1a;队列1.1 队列的概念1.2 队列的介绍1.3 队列示意图 二&#xff1a;数组模拟队列2.1 介绍2.2 思路2.3 代码实现2.3.1 定义队列基本信息2.3.2 初始化队列2.3.3 判断队列是否满&#xff0c;是否为空2.3.4 添加数据到队列2.3.5 获取队列数据&#xff0c;出队…

图数据库_Neo4j和SpringBoot整合使用_创建节点_删除节点_创建关系_使用CQL操作图谱---Neo4j图数据库工作笔记0009

首先需要引入依赖 springboot提供了一个spring data neo4j来操作 neo4j 可以看到它的架构 这个是下载下来的jar包来看看 有很多cypher对吧 可以看到就是通过封装的驱动来操作graph database 然后开始弄一下 首先添加依赖

【实用黑科技】如何 把b站的缓存视频弄到本地——数据恢复软件WinHex 和 音视频转码程序FFmpeg

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;效率…

Baumer工业相机堡盟工业相机如何通过BGAPISDK设置相机的固定帧率(C#)

Baumer工业相机堡盟工业相机如何通过BGAPI SDK设置相机的固定帧率&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机的固定帧率功能的技术背景CameraExplorer如何查看相机固定帧率功能在BGAPI SDK里通过函数设置相机固定帧率 Baumer工业相机通过BGAPI SDK设置相机固定帧…

蓝牙资讯|中国智能家居前景广阔,蓝牙Mesh照明持续火爆

据俄罗斯卫星通讯社报道&#xff0c;中国已成为全球最大的智能家居消费国&#xff0c;占全球50%—60%的市场份额。未来&#xff0c;随着人工智能技术的发展以及智能家居生态的不断进步&#xff0c;智能家居在中国的渗透率将加速提升。德国斯塔蒂斯塔调查公司数据显示&#xff0…