flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/663847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELK集群搭建(基础教程)

ELK集群搭建(基础教程) 目录: 机器准备 集群内各台机器安装Elasticsearch 安装部署Kafka(注:每个节点都配置,注意ip不同) 安装logstash工具 安装filebeat ELK收集Nginx的json日志 ELK收集Nginx正常日志和错误日…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之TimePicker组件

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之TimePicker组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 编辑 二、TimePicker组件 TextClock组件通过文本将当前系统时间显示在设备上。…

Java数组的静态初始化、动态初始化和默认初始化

以下是Java数组的静态初始化、动态初始化和默认初始化的示例: 静态初始化: 静态初始化是在声明数组时直接赋值,不需要使用new关键字。例如: int[] staticArray {10, 20, 30, 40};这里,staticArray是一个静态初始化…

华为配置使用SNMPv1与网管通信示例

配置使用SNMPv1与网管通信示例 组网图形 图1 配置使用SNMPv1与网管通信组网图 SNMP简介配置注意事项组网需求配置思路操作步骤配置文件 SNMP简介 简单网络管理协议SNMP(Simple Network Management Protocol)是广泛应用于TCP/IP网络的网络管理标准协议。S…

AJAX-接口文档

接口文档:由后端提供的描述接口的文章 接口:使用AJAX和服务器通讯时,使用的URL,请求方法,以及参数 1.请求参数的位置为query(查询)的时候,就说明要使用params写为查询参数 2.请求参…

神经网络 | 基于多种神经网络模型的轴承故障检测

Hi,大家好,我是半亩花海。本文主要源自《第二届全国技能大赛智能制造工程技术项目比赛试题(样题) 模块 E 工业大数据与人工智能应用》,基于给出的已知轴承状态的振动信号样本,对数据进行分析,建…

PDF中公式转word

效果:实现pdf中公式免编辑 step1: 截图CtrlAltA,复制 step2: SimpleTex - Snip & Get 网页或客户端均可,无次数限制,效果还不错。还支持手写、文字识别 单张图片:选 手写板 step3: 导出结果选择 注:…

PriorityBlockingQueue的tryGrow方法

前言: 最近看PriorityBlockingQueue这个类的过程中,对扩容方法产生了一些困惑,特此记录下自己思索的过程。 PriorityBlockingQueue: PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。…

pdmodel从动态模型转成静态onnx

1.下载项目 git clone https://github.com/jiangjiajun/PaddleUtils.git 2.新建两个新的文件夹 第一个文件夹放两个必要文件 第二个文件夹可以设置为空,用来存放转换后的模型 如图: 3.在终端运行 python paddle/paddle_infer_shape.py --model_dir …

GMSSL之ZUC256算法

GmSSL介绍 GmSSL是一个开源的密码工具箱,支持SM2/SM3/SM4/SM9/ZUC等国密(国家商用密码)算法。 从 GmSSL 官网处得到的下载链接为 GitHub - guanzhi/GmSSL: 支持国密SM2/SM3/SM4/SM9/SSL的密码工具箱 GmSSL的下载编译如下: # git clone https://g…

谷粒商城【成神路】-【4】——分类维护

目录 1.删除功能的实现 2.新增功能的实现 3.修改功能的实现 4.拖拽功能 1.删除功能的实现 1.1逻辑删除 逻辑删除:不删除数据库中真实的数据,用指定字段,显示的表示是否删除 1.在application.yml中加入配置 mybatis-plus:global-config:…

【PostgreSQL内核学习(二十五) —— (DBMS存储空间管理)】

DBMS存储空间管理 概述块(或页面)PageHeaderData 结构体HeapTupleHeaderData 结构 表空间表空间的作用:表空间和数据库关系表空间执行案例 补充 —— 模式(Schema) 声明:本文的部分内容参考了他人的文章。在…

【HarmonyOS】鸿蒙开发之自定义组件——第3.7章

自定义构建函数 (适合内部页面的封装,更加合适)(构建页面) 案例: 自定义组件文件 Index.ets //全局自定义构建函数写法 Builder function item1(){Row({space:10}){Text("我是自定义构建函数")} }Component export struct Index{build(){Column(){item…

【红包封面发放+微信红包封面制作教程】小黑猫祝大家小年快乐~

今年终于成功获得了微信红包封面~是我们家的小黑猫,嘿嘿。 封面获取方式 一共还有600份,数量有限,大家想要的话请关注文末的公众号,访问红包封面相关的推文获取~ 平时公众号主要发布一些技术类工具知识,希望能帮到大…

Vue2+ElementUI 弹窗全局拖拽 支持放大缩小

拖拽组件 dialogDrag.vue <template><div></div> </template> <script>export default {name: dialogDrag,data() {return {originalWidth: null,originalHeight: null}},created() {this.$nextTick(()>{this.dialogDrag()})},mounted() {}…

cesium-场景出图场景截屏导出图片或pdf

cesium把当前的场景截图&#xff0c;下载图片或pdf 安装 npm install canvas2image --save npm i jspdf -S 如果安装的插件Canvas2Image不好用&#xff0c;可自建js Canvas2Image.js /*** covert canvas to image* and save the image file*/ const Canvas2Image (function…

Linux下的线程操作

一、多线程的创建于退出 1. pthread_create(线程的创建) pthread_create 是 POSIX 线程库中的函数&#xff0c;用于创建一个新的线程。 函数原型如下&#xff1a; int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine) (void *), void…

无人机激光雷达标定板

机载激光雷达标定板是用于校准和验证机载激光雷达系统的设备。由于机载激光雷达系统在测量地形、建筑物和植被等方面具有广泛的应用&#xff0c;因此标定板的使用对于确保测量结果的准确性和可靠性至关重要。 标定板通常由高反射率的材料制成&#xff0c;如镀金的玻璃或陶瓷&am…

计算机网络实验五

目录 实验五 路由器基本配置 1、实验目的 2、实验设备 3、网络拓扑及IP地址分配 4、实验过程 &#xff08;1&#xff09;路由器设备名称的配置 &#xff08;2&#xff09;路由器每日提示信息配置 &#xff08;3&#xff09;路由器端口的IP地址配置 &#xff08;4&…

目标检测YOLO实战应用案例100讲-【目标检测】Halcon(工具应用篇)

目录 Image、Regiong、XLD相关知识 一 读取的3种方式: 二 图像变量Region 三 图型变量