关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型

  • 问题
  • 解决
    • 1.自定义转换器类
    • 2.代码引用
  • 结果

问题

flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

解决

1.自定义转换器类

创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 日期时间类型转换成字符串*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();public static final String CONVERTERS = "converters";public static final String DATE = "date";public static final String DATE_TYPE = "date.type";public static final String DATE_FORMAT_DATE = "date.format.date";public static final String DATE_FORMAT_DATETIME = "date.format.datetime";public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";public static final String TIME_ZONE_UTC_8 = "UTC+8";public static final String FORMAT_DATE = "format.date";public static final String FORMAT_TIME = "format.time";public static final String FORMAT_DATETIME = "format.datetime";public static final String FORMAT_TIMESTAMP = "format.timestamp";public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";public static final String UPPERCASE_DATE = "DATE";public static final String TIME = "TIME";public static final String DATETIME = "DATETIME";public static final String TIMESTAMP = "TIMESTAMP";public static final String SMALLDATETIME = "SMALLDATETIME";public static final String DATETIME2 = "DATETIME2";public static final Properties DEFAULT_PROPS = new Properties();static {DEFAULT_PROPS.setProperty(CONVERTERS, DATE);DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);}@Overridepublic void configure(Properties props) {readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {log.error("setting {} is illegal:{}", settingKey, settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if (UPPERCASE_DATE.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDate;}if (TIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTime;}if (DATETIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDateTime;}if (TIMESTAMP.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);} else if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);} else if (input instanceof Timestamp) {return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,//而在程序中处理这个zonedDatetime时,它表示的是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime =zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);} else if (input instanceof Timestamp) {return timestampFormatter.format(((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());}return null;}
}

2.代码引用

//获取Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 连接mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("junbo-bigdata01").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.activity_info").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true)// 日期时间类型转换成字符串 的设置引用.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) .build();//读取数据DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");dataStreamSource.print();env.execute();

结果

datetime 类型 成功转换成 字符串 类型
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/65847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SwiftUI 撸码常见错误 2 例漫谈

概述 在 SwiftUI 日常撸码过程中&#xff0c;头发尚且还算茂盛的小码农们经常会犯这样那样的错误。虽然犯这些错的原因都很简单&#xff0c;但有时想要快速准确的定位它们却并不容易。 况且这些错误还可能在模拟器和 Xcode 预览&#xff08;Preview&#xff09;表现的行为不甚…

【Unity】 HTFramework框架(五十八)【进阶篇】资源及代码热更新实战演示(Deployment + HybridCLR)

更新日期&#xff1a;2025年1月2日。 Github源码&#xff1a;[点我获取源码] 索引 资源及代码热更新实战演示运行演示Demo1.克隆项目工程2.更新子模块3.打开项目4.打开入口场景5.设置远端资源服务器地址6.导入HybridCLR7.初始化HybridCLR8.发布项目9.部署资源版本10.运行Exe11.…

(五)人工智能进阶:基础概念解释

前面我们介绍了人工智能是如何成为一个强大函数。接下来&#xff0c;搞清损失函数、优化方法和正则化等核心概念&#xff0c;才能真正驾驭它&#xff01; 1. 什么是网络模型&#xff1f; 网络模型就像是一个精密的流水线工厂&#xff0c;由多个车间&#xff08;层&#xff0…

初学STM32 --- 外部SRAM

SRAM简介 静态随机存取存储器&#xff08;Static Random-Access Memory&#xff0c;SRAM&#xff09; 1M字节容量的SRAM芯片XM8A51216为例介绍。 SRAM特性: 高速&#xff1a;具有最高访问速度15ns 低功耗&#xff1a;80MHz时55mA&#xff0c;待机电流 20mA TTL电平兼容 …

Zabbix:自动发现功能讲解,包括网络发现、自动注册、低级别自动发现以及案例分享。

ZBX&#xff1a;自动发现功能讲解 视频讲解&#xff1a;Zabbix 自动发现网络发现概述操作方法 自动注册概述操作方法 低级别自动发现概述工作原理及工作流程案例1&#xff0c;base进程监控要求&#xff1a;步骤&#xff1a; 案例2&#xff0c;磁盘IO监控要求&#xff1a;步骤&a…

Windows上安装Go并配置环境变量(图文步骤)

前言 1. 本文主要讲解的是在windows上安装Go语言的环境和配置环境变量&#xff1b; Go语言版本&#xff1a;1.23.2 Windows版本&#xff1a;win11&#xff08;win10通用&#xff09; 下载Go环境 下载go环境&#xff1a;Go下载官网链接(https://golang.google.cn/dl/) 等待…

#端云一体化开发# #HarmonyOS Next#《说书人》鸿蒙原生基于角色的对话式文本编辑开发方案

1、写在前面 过去的一百年里&#xff0c;在“编程”的这个行业诞生之初&#xff0c;人们采用面向过程的方式进行开发&#xff0c;但是&#xff0c;伴随着程序规模的日益增大&#xff0c;程序的复杂度也随之增加&#xff0c;使用结构化编程方法来管理复杂的程序逻辑变得越来越困…

xadmin后台首页增加一个导入数据按钮

xadmin后台首页增加一个导入数据按钮 效果 流程 1、在添加小组件中添加一个html页面 2、写入html代码 3、在urls.py添加导入数据路由 4、在views.py中添加响应函数html代码 <!DOCTYPE html> <html lang

【AimRT】现代机器人通信中间件 AimRT

目录 一、什么是AimRT二、AimRT与ROS22.1 定位与设计2.2 组成与通信方式对比 三、AimRT基本概念3.1 Node、Pkg 和 Module3.2 Protocol、Channel、Rpc 和 Filter3.3 App模式 和 Pkg模式3.4 Executor3.5 Plugin 一、什么是AimRT AimRT 是智元机器人公司自主研发的一款机器人通信…

mysql系列7—Innodb的redolog

背景 本文涉及的内容较为底层&#xff0c;做了解即可&#xff0c;是以前学习《高性能Mysql》和《mysql是怎样运行的》的笔记整理所得。 redolog(后续使用redo日志表示)的核心作用是保证数据库的持久性。 在mysql系列5—Innodb的缓存中介绍过&#xff1a;数据和索引保存在磁盘上…

C++【内存管理】

C/C中程序的内存划分&#xff1a; 栈&#xff1a;又称堆栈&#xff0c;存放非静态的局部变量、函数参数、返回值等等&#xff0c;栈是向下增长的。内存映射段&#xff1a;是高效的&#xff29;&#xff0f;&#xff2f;映射方式&#xff0c;用于装载一个共享的动态内存库。用户…

手机租赁平台开发助力智能设备租赁新模式

内容概要 手机租赁平台开发&#xff0c;简单说就是让你用得起高大上的智能设备&#xff0c;不管是最新款的手机、平板&#xff0c;还是那些炫酷的智能耳机&#xff0c;这个平台应有尽有。想要体验但又不希望花大钱&#xff1f;那你就找对地方了&#xff01;通过灵活的租赁方案…

【开源免费】基于SpringBoot+Vue.JS校园社团信息管理系统(JAVA毕业设计)

本文项目编号 T 107 &#xff0c;文末自助获取源码 \color{red}{T107&#xff0c;文末自助获取源码} T107&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

【鸿蒙NEXT】鸿蒙里面类似iOS的Keychain——关键资产(@ohos.security.asset)实现设备唯一标识

前言 在iOS开发中Keychain 是一个非常安全的存储系统&#xff0c;用于保存敏感信息&#xff0c;如密码、证书、密钥等。与 NSUserDefaults 或文件系统不同&#xff0c;Keychain 提供了更高的安全性&#xff0c;因为它对数据进行了加密&#xff0c;并且只有经过授权的应用程序才…

使用npm包的工程如何引入mapboxgl-enhance/maplibre-gl-enhance扩展包

作者&#xff1a;刘大 前言 在使用iClient for MapboxGL/MapLibreGL项目开发中&#xff0c;往往会对接非EPSG:3857坐标系的地图&#xff0c;由于默认不支持&#xff0c;因此需引入mapboxgl-enhance/maplibre-gl-enhance扩展包。 在使用Vue等其他框架&#xff0c;通过npm包下载…

应急指挥系统总体架构方案

引言 应急指挥系统总体架构方案旨在构建一个高效、智能的应急管理体系&#xff0c;以应对自然灾害、事故灾难等突发事件&#xff0c;保障人民生命财产安全。 背景与挑战 近年来&#xff0c;安全生产形势严峻&#xff0c;自然灾害事故频发&#xff0c;对应急指挥系统的要求越…

如何用CSS3创建圆角矩形并居中显示?

在网页设计中&#xff0c;圆角矩形因其美观和现代感而被广泛使用&#xff0c;居中显示元素也是一个常见的需求。今天&#xff0c;我们将学习如何使用CSS3的border-radius属性来创建圆角矩形&#xff0c;并将其居中显示在页面上。 如果你正在学习CSS&#xff0c;那么这个实例将非…

UE5通过蓝图节点控制材质参数

通过蓝图节点控制材质的参数 蓝图节点 在材质上设置标量值 和 在材质上设置向量参数值 Set Scalar Parameter Value on Materials Set Vector Parameter Value on Materials 这两个蓝图节点都可以在蓝图中&#xff0c;控制材质的参数值和向量值

canvas+fabric实现时间刻度尺(二)

前言 我们前面实现了时间刻度尺&#xff0c;鼠标移动显示时间&#xff0c;接下来我们实现鼠标点击某个时间进行弹框。 效果 实现 1.监听鼠标按下事件 2.编写弹框页面 3.时间转换 <template><div><canvas id"rulerCanvas" width"1200"…

手机实时提取SIM卡打电话的信令声音-双卡手机来电如何获取哪一个卡的来电

手机实时提取SIM卡打电话的信令声音 --双卡手机来电如何获取哪一个卡的来电 一、前言 前面的篇章《手机实时提取SIM卡打电话的信令声音-智能拨号器的双SIM卡切换方案》中&#xff0c;我们论述了局域网SIP坐席通过手机外呼出去时&#xff0c;手机中主副卡的呼叫调度策略。 但…