flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/663847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELK集群搭建(基础教程)

ELK集群搭建(基础教程) 目录: 机器准备 集群内各台机器安装Elasticsearch 安装部署Kafka(注:每个节点都配置,注意ip不同) 安装logstash工具 安装filebeat ELK收集Nginx的json日志 ELK收集Nginx正常日志和错误日…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之TimePicker组件

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之TimePicker组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 编辑 二、TimePicker组件 TextClock组件通过文本将当前系统时间显示在设备上。…

华为配置使用SNMPv1与网管通信示例

配置使用SNMPv1与网管通信示例 组网图形 图1 配置使用SNMPv1与网管通信组网图 SNMP简介配置注意事项组网需求配置思路操作步骤配置文件 SNMP简介 简单网络管理协议SNMP(Simple Network Management Protocol)是广泛应用于TCP/IP网络的网络管理标准协议。S…

AJAX-接口文档

接口文档:由后端提供的描述接口的文章 接口:使用AJAX和服务器通讯时,使用的URL,请求方法,以及参数 1.请求参数的位置为query(查询)的时候,就说明要使用params写为查询参数 2.请求参…

神经网络 | 基于多种神经网络模型的轴承故障检测

Hi,大家好,我是半亩花海。本文主要源自《第二届全国技能大赛智能制造工程技术项目比赛试题(样题) 模块 E 工业大数据与人工智能应用》,基于给出的已知轴承状态的振动信号样本,对数据进行分析,建…

PDF中公式转word

效果:实现pdf中公式免编辑 step1: 截图CtrlAltA,复制 step2: SimpleTex - Snip & Get 网页或客户端均可,无次数限制,效果还不错。还支持手写、文字识别 单张图片:选 手写板 step3: 导出结果选择 注:…

pdmodel从动态模型转成静态onnx

1.下载项目 git clone https://github.com/jiangjiajun/PaddleUtils.git 2.新建两个新的文件夹 第一个文件夹放两个必要文件 第二个文件夹可以设置为空,用来存放转换后的模型 如图: 3.在终端运行 python paddle/paddle_infer_shape.py --model_dir …

谷粒商城【成神路】-【4】——分类维护

目录 1.删除功能的实现 2.新增功能的实现 3.修改功能的实现 4.拖拽功能 1.删除功能的实现 1.1逻辑删除 逻辑删除:不删除数据库中真实的数据,用指定字段,显示的表示是否删除 1.在application.yml中加入配置 mybatis-plus:global-config:…

【PostgreSQL内核学习(二十五) —— (DBMS存储空间管理)】

DBMS存储空间管理 概述块(或页面)PageHeaderData 结构体HeapTupleHeaderData 结构 表空间表空间的作用:表空间和数据库关系表空间执行案例 补充 —— 模式(Schema) 声明:本文的部分内容参考了他人的文章。在…

【红包封面发放+微信红包封面制作教程】小黑猫祝大家小年快乐~

今年终于成功获得了微信红包封面~是我们家的小黑猫,嘿嘿。 封面获取方式 一共还有600份,数量有限,大家想要的话请关注文末的公众号,访问红包封面相关的推文获取~ 平时公众号主要发布一些技术类工具知识,希望能帮到大…

Vue2+ElementUI 弹窗全局拖拽 支持放大缩小

拖拽组件 dialogDrag.vue <template><div></div> </template> <script>export default {name: dialogDrag,data() {return {originalWidth: null,originalHeight: null}},created() {this.$nextTick(()>{this.dialogDrag()})},mounted() {}…

cesium-场景出图场景截屏导出图片或pdf

cesium把当前的场景截图&#xff0c;下载图片或pdf 安装 npm install canvas2image --save npm i jspdf -S 如果安装的插件Canvas2Image不好用&#xff0c;可自建js Canvas2Image.js /*** covert canvas to image* and save the image file*/ const Canvas2Image (function…

无人机激光雷达标定板

机载激光雷达标定板是用于校准和验证机载激光雷达系统的设备。由于机载激光雷达系统在测量地形、建筑物和植被等方面具有广泛的应用&#xff0c;因此标定板的使用对于确保测量结果的准确性和可靠性至关重要。 标定板通常由高反射率的材料制成&#xff0c;如镀金的玻璃或陶瓷&am…

计算机网络实验五

目录 实验五 路由器基本配置 1、实验目的 2、实验设备 3、网络拓扑及IP地址分配 4、实验过程 &#xff08;1&#xff09;路由器设备名称的配置 &#xff08;2&#xff09;路由器每日提示信息配置 &#xff08;3&#xff09;路由器端口的IP地址配置 &#xff08;4&…

Docker 阿里云镜像仓库CR使用实践

一、使用容器镜像&#xff0c;查看镜像&#xff0c;创建&#xff0c;推送&#xff0c;拉取阿里云镜像 CR镜像管理&#xff08;阿里云容器镜像服务&#xff08;Container Registry&#xff09;&#xff09; 登录实例 未创建的镜像名称也可以push、docker的私有仓库需要提起创建…

微信小程序新手入门教程二:认识JSON配置文件

在上一篇我们介绍了微信小程序的注册和基本使用方式&#xff0c;并且写出了一个简单的页面&#xff0c;但是依然没有解释目录中的各种.json文件是做什么的。这篇我们就来认识一下各种JSON配置文件及其配置项。 一 认识JSON 首先先来认识一下JSON是什么。 JSON 指的是 JavaScri…

航道大数据应用专项研究报告(附下载)

总体目标 充分认识航道大数据对行业治理的重要性和必要性&#xff0c;航道大数据的开发和利用是建设智慧航道的基础。基于大数据的航道管理体系&#xff0c;实现了现有数据的梳理和汇聚&#xff0c;跨部门数据的交换和整合&#xff0c;建立了数据关联和深度学习的模型机制&…

网络异常案例六_IP冲突

问题现象 同一个局域网下&#xff0c;一个路由器带几十台终端设备&#xff0c;存在终端设备获取到了相同IP的场景。该路由器也是DHCP Server。 有两个设备终端&#xff0c;都显示获取到了192.168.11.177这个ip。 抓包分析 抓包过程中&#xff0c;看到的一些问题。 ps&#x…

​(三)hadoop之hive的搭建1

下载 访问官方网站https://hive.apache.org/ 点击downloads 点击Download a release now! 点击https://dlcdn.apache.org/hive/ 选择最新的稳定版 复制最新的url 在linux执行下载命令 wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz 2.解压…

第十一篇【传奇开心果系列】Python的OpenCV技术点案例示例:三维重建

传奇开心果短博文系列 系列短博文目录Python的OpenCV技术点案例示例系列 短博文目录一、前言二、OpenCV三维重建介绍三、基于区域的SGBM示例代码四、BM&#xff08;Block Matching&#xff09;算法介绍和示例代码五、基于能量最小化的GC&#xff08;Graph Cut&#xff09;算法介…