关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型

  • 问题
  • 解决
    • 1.自定义转换器类
    • 2.代码引用
  • 结果

问题

flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

解决

1.自定义转换器类

创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 日期时间类型转换成字符串*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();public static final String CONVERTERS = "converters";public static final String DATE = "date";public static final String DATE_TYPE = "date.type";public static final String DATE_FORMAT_DATE = "date.format.date";public static final String DATE_FORMAT_DATETIME = "date.format.datetime";public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";public static final String TIME_ZONE_UTC_8 = "UTC+8";public static final String FORMAT_DATE = "format.date";public static final String FORMAT_TIME = "format.time";public static final String FORMAT_DATETIME = "format.datetime";public static final String FORMAT_TIMESTAMP = "format.timestamp";public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";public static final String UPPERCASE_DATE = "DATE";public static final String TIME = "TIME";public static final String DATETIME = "DATETIME";public static final String TIMESTAMP = "TIMESTAMP";public static final String SMALLDATETIME = "SMALLDATETIME";public static final String DATETIME2 = "DATETIME2";public static final Properties DEFAULT_PROPS = new Properties();static {DEFAULT_PROPS.setProperty(CONVERTERS, DATE);DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);}@Overridepublic void configure(Properties props) {readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {log.error("setting {} is illegal:{}", settingKey, settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if (UPPERCASE_DATE.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDate;}if (TIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTime;}if (DATETIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDateTime;}if (TIMESTAMP.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);} else if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);} else if (input instanceof Timestamp) {return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,//而在程序中处理这个zonedDatetime时,它表示的是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime =zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);} else if (input instanceof Timestamp) {return timestampFormatter.format(((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());}return null;}
}

2.代码引用

//获取Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 连接mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("junbo-bigdata01").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.activity_info").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true)// 日期时间类型转换成字符串 的设置引用.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) .build();//读取数据DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");dataStreamSource.print();env.execute();

结果

datetime 类型 成功转换成 字符串 类型
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/65847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SwiftUI 撸码常见错误 2 例漫谈

概述 在 SwiftUI 日常撸码过程中&#xff0c;头发尚且还算茂盛的小码农们经常会犯这样那样的错误。虽然犯这些错的原因都很简单&#xff0c;但有时想要快速准确的定位它们却并不容易。 况且这些错误还可能在模拟器和 Xcode 预览&#xff08;Preview&#xff09;表现的行为不甚…

【Unity】 HTFramework框架(五十八)【进阶篇】资源及代码热更新实战演示(Deployment + HybridCLR)

更新日期&#xff1a;2025年1月2日。 Github源码&#xff1a;[点我获取源码] 索引 资源及代码热更新实战演示运行演示Demo1.克隆项目工程2.更新子模块3.打开项目4.打开入口场景5.设置远端资源服务器地址6.导入HybridCLR7.初始化HybridCLR8.发布项目9.部署资源版本10.运行Exe11.…

SpringCloud源码分析-nacos与eureka

一、高版本为什么优先用nacos 如果用alibaba springcloud&#xff0c;那么就是阿里的技术体系。nacos属于阿里的原生技术栈&#xff0c;所以阿里更偏向于用nacos作为服务发现注册。 二、对比分析 Spring Cloud Alibaba 推荐使用 Nacos 作为服务发现和配置管理的首选组件&…

(五)人工智能进阶:基础概念解释

前面我们介绍了人工智能是如何成为一个强大函数。接下来&#xff0c;搞清损失函数、优化方法和正则化等核心概念&#xff0c;才能真正驾驭它&#xff01; 1. 什么是网络模型&#xff1f; 网络模型就像是一个精密的流水线工厂&#xff0c;由多个车间&#xff08;层&#xff0…

级联配准learning

1.定义 级联配准&#xff08;Cascade Registration&#xff09;是一种在图像处理、计算机视觉等领域广泛应用的技术。它主要用于将不同视角、不同模态或者不同时间获取的图像进行精确的对齐&#xff0c;并且是通过多个阶段&#xff08;级联&#xff09;的处理来逐步优化配准的精…

初学STM32 --- 外部SRAM

SRAM简介 静态随机存取存储器&#xff08;Static Random-Access Memory&#xff0c;SRAM&#xff09; 1M字节容量的SRAM芯片XM8A51216为例介绍。 SRAM特性: 高速&#xff1a;具有最高访问速度15ns 低功耗&#xff1a;80MHz时55mA&#xff0c;待机电流 20mA TTL电平兼容 …

leetcode hot 100 前k个高平元素

347. 前 K 个高频元素 已解答 中等 相关标签 相关企业 给你一个整数数组 nums 和一个整数 k &#xff0c;请你返回其中出现频率前 k 高的元素。你可以按 任意顺序 返回答案。 class Solution(object):def topKFrequent(self, nums, k):""":type nums: Lis…

Zabbix:自动发现功能讲解,包括网络发现、自动注册、低级别自动发现以及案例分享。

ZBX&#xff1a;自动发现功能讲解 视频讲解&#xff1a;Zabbix 自动发现网络发现概述操作方法 自动注册概述操作方法 低级别自动发现概述工作原理及工作流程案例1&#xff0c;base进程监控要求&#xff1a;步骤&#xff1a; 案例2&#xff0c;磁盘IO监控要求&#xff1a;步骤&a…

Windows上安装Go并配置环境变量(图文步骤)

前言 1. 本文主要讲解的是在windows上安装Go语言的环境和配置环境变量&#xff1b; Go语言版本&#xff1a;1.23.2 Windows版本&#xff1a;win11&#xff08;win10通用&#xff09; 下载Go环境 下载go环境&#xff1a;Go下载官网链接(https://golang.google.cn/dl/) 等待…

python3.x支持,但python2.7不支持的语法

python3.x支持,但python2.7不支持的语法 1.属性访问的问题&#xff1a; 你可能是在 params 中直接使用了某个对象的属性&#xff0c;而不是该属性的值。在你提供的信息中&#xff0c;<slot wrapper next of instance objects> 指的是一个对象的槽位&#xff08;slot wra…

开源GTKSystem.Windows.Forms框架:C# Winform跨平台运行深度解析

开源GTKSystem.Windows.Forms框架&#xff1a;C# Winform跨平台运行深度解析 一、跨平台框架的崛起 1.1 跨平台技术的现状与需求 在当今快速发展的科技时代&#xff0c;软件开发的需求日益多样化。随着移动设备和操作系统的不断涌现&#xff0c;开发者面临着前所未有的挑战&…

#端云一体化开发# #HarmonyOS Next#《说书人》鸿蒙原生基于角色的对话式文本编辑开发方案

1、写在前面 过去的一百年里&#xff0c;在“编程”的这个行业诞生之初&#xff0c;人们采用面向过程的方式进行开发&#xff0c;但是&#xff0c;伴随着程序规模的日益增大&#xff0c;程序的复杂度也随之增加&#xff0c;使用结构化编程方法来管理复杂的程序逻辑变得越来越困…

xadmin后台首页增加一个导入数据按钮

xadmin后台首页增加一个导入数据按钮 效果 流程 1、在添加小组件中添加一个html页面 2、写入html代码 3、在urls.py添加导入数据路由 4、在views.py中添加响应函数html代码 <!DOCTYPE html> <html lang

【AimRT】现代机器人通信中间件 AimRT

目录 一、什么是AimRT二、AimRT与ROS22.1 定位与设计2.2 组成与通信方式对比 三、AimRT基本概念3.1 Node、Pkg 和 Module3.2 Protocol、Channel、Rpc 和 Filter3.3 App模式 和 Pkg模式3.4 Executor3.5 Plugin 一、什么是AimRT AimRT 是智元机器人公司自主研发的一款机器人通信…

mysql系列7—Innodb的redolog

背景 本文涉及的内容较为底层&#xff0c;做了解即可&#xff0c;是以前学习《高性能Mysql》和《mysql是怎样运行的》的笔记整理所得。 redolog(后续使用redo日志表示)的核心作用是保证数据库的持久性。 在mysql系列5—Innodb的缓存中介绍过&#xff1a;数据和索引保存在磁盘上…

Swift Combine 学习(二):发布者 Publisher

Swift Combine 学习&#xff08;一&#xff09;&#xff1a;Combine 初印象Swift Combine 学习&#xff08;二&#xff09;&#xff1a;发布者 PublisherSwift Combine 学习&#xff08;三&#xff09;&#xff1a;Subscription和 SubscriberSwift Combine 学习&#xff08;四&…

Win11清除安全中心保护历史记录全攻略

Win11清除安全中心保护历史记录全攻略 在Windows 11操作系统中,安全中心作为守护系统安全的重要防线,扮演着举足轻重的角色。它不仅实时监控系统的安全状态,还详细记录各类安全事件,为用户提供全面的安全保障。然而,随着系统的长期使用,这些安全记录可能会逐渐累积,占用…

C++【内存管理】

C/C中程序的内存划分&#xff1a; 栈&#xff1a;又称堆栈&#xff0c;存放非静态的局部变量、函数参数、返回值等等&#xff0c;栈是向下增长的。内存映射段&#xff1a;是高效的&#xff29;&#xff0f;&#xff2f;映射方式&#xff0c;用于装载一个共享的动态内存库。用户…

手机租赁平台开发助力智能设备租赁新模式

内容概要 手机租赁平台开发&#xff0c;简单说就是让你用得起高大上的智能设备&#xff0c;不管是最新款的手机、平板&#xff0c;还是那些炫酷的智能耳机&#xff0c;这个平台应有尽有。想要体验但又不希望花大钱&#xff1f;那你就找对地方了&#xff01;通过灵活的租赁方案…

Spring Boot 3 文件上传、多文件上传、大文件分片上传、文件流处理以及批量操作

在 Spring Boot 3 中&#xff0c;可以通过内置的文件处理机制结合 Java 的 IO 流与多线程技术&#xff0c;实现文件上传、多文件上传、大文件分片上传、文件流处理以及批量操作的需求。以下是详细实现步骤&#xff1a; 1. 单文件上传 控制器代码 import org.springframework…