pyflink读取kafka数据写入mysql实例

依赖包下载

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/

版本

flink:1.16.0

kafka:2.13-3.2.0

实例

import logging
import sysfrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchemadef write_to_kafka(env):type_info = Types.ROW([Types.INT(), Types.STRING()])ds = env.from_collection([(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],type_info=type_info)serialization_schema = JsonRowSerializationSchema.Builder() \.with_type_info(type_info) \.build()kafka_producer = FlinkKafkaProducer(topic='test_json_topic',serialization_schema=serialization_schema,producer_config={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaaaaaa\" password=\"bbbbbbb\";'})# note that the output type of ds must be RowTypeInfods.add_sink(kafka_producer)env.execute()def read_from_kafka(env):deserialization_schema = JsonRowDeserializationSchema.Builder() \.type_info(Types.ROW([Types.INT(), Types.STRING()])) \.build()kafka_consumer = FlinkKafkaConsumer(topics='test_json_topic',deserialization_schema=deserialization_schema,properties={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaa\" password=\"bbbbbb\";'})kafka_consumer.set_start_from_earliest()env.add_source(kafka_consumer).print()env.execute()def wirte_data_todb(env, data):type_info = Types.ROW([Types.INT(), Types.STRING()])env.from_collection([(101, "Stream Processing with Apache Flink"),(102, "Streaming Systems"),(103, "Designing Data-Intensive Applications"),(104, "Kafka: The Definitive Guide")], type_info=type_info) \.add_sink(JdbcSink.sink("insert into flink (id, title) values (?, ?)",type_info,JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url('jdbc:mysql://192.168.1.110:23006/test').with_driver_name('com.mysql.jdbc.Driver').with_user_name('sino').with_password('Caib@sgcc-56').build()))env.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")env = StreamExecutionEnvironment.get_execution_environment()#env.add_jars("file:///opt/flink/flink-sql-connector-kafka-1.15.0.jar")#env.add_jars("file:///opt/flink/kafka-clients-2.8.1.jar")#env.add_jars("file:///opt/flink/flink-connector-jdbc-1.16.0.jar")#env.add_jars("file:///opt/flink/mysql-connector-java-8.0.29.jar")print("start reading data from kafka")read_from_kafka(env)#wirte_data_todb(env, "")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/99680.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

毛玻璃 has 选择器卡片悬停效果

效果展示 页面结构 从上述的效果展示可以看到&#xff0c;页面是由多个卡片组成&#xff0c;并且鼠标悬停在卡片上时&#xff0c;会旋转用户图片并且韩式对应的用户信息框。 CSS3 知识点 :has 属性的运用 实现页面整体结构 <div class"container"><div…

Java with RocketMQ

Java with RocketMQ 概念开始开发广播延时发送批量消息过滤消息事务 如何保证消息不丢失如何存储和保证检索速度 概念 MQ指代Message Queue消息队列&#xff0c;通过在两个服务之间加入这种独立的消息队列应用&#xff0c;从而解耦不同服务之间的代码&#xff0c;使之可以通过…

linux | linux扩大磁盘空间 | centos7.9 | 虚拟机

注意&#xff1a;可以完全参考下面这边博客&#xff08;我只是搬运工&#xff09; centos扩大磁盘空间 简单讲讲&#xff0c;为什么有点失落落的&#xff1f; 明明就是一个 很程序化的东西 可是网上一大推 天花乱坠 而且很多人都是半吊子水&#xff0c;甚至半吊子都没有 通过关…

Ubuntu16.04apt更新失败

先设置网络设置 换成nat、桥接&#xff0c;如果发现都不行&#xff0c;那么就继续下面操作 1.如果出现一开始就e&#xff0c;检查源&#xff0c;先换源 2.换完源成功之后&#xff0c;ping网络&#xff0c;如果ping不通就是网络问题 如果ping baidu.com ping不通但是ping 112…

[网鼎杯 2018]Comment git泄露 / 恢复 二次注入 bash_history文件查看

首先我们看到账号密码有提示了 我们bp爆破一下 我首先对数字爆破 因为全字符的话太多了 爆出来了哦 所以账号密码也出来了 zhangwei zhangwei666 没有什么用啊 扫一下吧 有git git泄露 那泄露看看 真有 <?php include "mysql.php"; session_start(); if(…

leetCode 53.最大子数组和 动态规划 + 优化空间复杂度

关于此题我的往期文章&#xff1a; leetCode 53.最大子数和 图解 贪心算法/动态规划优化_呵呵哒(&#xffe3;▽&#xffe3;)"的博客-CSDN博客https://heheda.blog.csdn.net/article/details/13349726853. 最大子数组和 - 力扣&#xff08;LeetCode&#xff09; >&…

NSA 和 CISA 揭示十大网络安全错误配置

美国国家安全局 (NSA) 和网络安全与基础设施安全局 (CISA) 在5日公布了其红蓝团队在大型组织网络中发现的十大最常见的网络安全错误配置。 通报还详细介绍了威胁行为者使用哪些策略、技术和程序 (TTP) 来成功利用这些错误配置来实现各种目标&#xff0c;包括获取访问权限、横向…

40V汽车级P沟道MOSFET SQ4401EY-T1_GE3 工作原理、特性参数、封装形式—节省PCB空间,更可靠

AEC-Q101车规认证是一种基于失效机制的分立半导体应用测试认证规范。它是为了确保在汽车领域使用的分立半导体器件能够在严苛的环境条件下正常运行和长期可靠性而制定的。AEC-Q101认证包括一系列的失效机制和应力测试&#xff0c;以验证器件在高温、湿度、振动等恶劣条件下的可…

设计模式 - 行为型模式:责任链模式(概述 | 案例实现 | 优缺点 | 使用场景)

目录 一、行为型模式 1.1、责任链模式 1.1.1、概述 1.1.2、案例实现 1.1.3、优缺点 1.1.4、使用场景 一、行为型模式 1.1、责任链模式 1.1.1、概述 为了避免请求发送者和多个请求处理者耦合在一起&#xff0c;就将所有请求处理者通过前一个对象记住下一个对象的引用的方…

uniapp apple 苹果登录 离线本地打包

官方文档 uni-app官网 文档写的不全&#xff0c;没有写离线打包流程 加lib 签名里带 sign in with apple hbuilder开关 代码 测试代码&#xff0c;获取app里所有的provider uni.getProvider({service: oauth,success: function (res) {console.log(res.provider)uni.showT…

【HTML5】语义化标签记录

前言 防止一个页面中全部都是div&#xff0c;或者ul li&#xff0c;在html5推出了很多语义化标签 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 常用语义化案例 一般我用的多的是header&#xff0c;main&#xff0c;footer 这些标签不难理解&#x…

【学习笔记】数据结构算法文档(类C语言)

0、类C语言代码说明 // 函数结果状态代码 #define OK 1 #define ERROR 0 #define OVERFLOW -2// 函数返回值类型&#xff08;返回函数结果状态代码&#xff09; typedef int Status;// 用户自定义数据元素类型 ElemType typedef xxx ElemType;// C 引用&#xff08;示例&#…

2023年中国助消化药物行业现状分析:消化不良患者逐年上升,提升需求量[图]

助消化药物主要分为促胃动力药物、消化酶抑制剂、胃酸抑制药物和消食剂4种类型。促胃动力药物的作用机制是通过增强胃肠道平滑肌动力促进胃酸分泌&#xff0c;从而达到助消化的目的&#xff0c;临床常用药物包括多潘立酮、莫沙必利、西沙比利等。 助消化药物分类 资料来源&…

Observability:使用 OpenTelemetry 对 Node.js 应用程序进行自动检测

作者&#xff1a;Bahubali Shetti DevOps 和 SRE 团队正在改变软件开发的流程。 DevOps 工程师专注于高效的软件应用程序和服务交付&#xff0c;而 SRE 团队是确保可靠性、可扩展性和性能的关键。 这些团队必须依赖全栈可观察性解决方案&#xff0c;使他们能够管理和监控系统&a…

Django开发之进阶篇

Django进阶篇 一、Django学习之模板二、Django学习之中间件默认中间件自定义中间件 三、Django学习之ORM定义模型类生成数据库表操作数据库添加查询修改删除 一、Django学习之模板 在 Django 中&#xff0c;模板&#xff08;Template&#xff09;是用于生成动态 HTML&#xff…

【架构】研发高可用架构和系统设计经验

研发高可用架构和系统设计经验 从研发规范层面、应用服务层面、存储层面、产品层面、运维部署层面、异常应急层面这六大层面去剖析一个高可用的系统需要有哪些关键的设计和考虑。 一、高可用架构和系统设计思想 1.可用性和高可用概念 可用性是一个可以量化的指标,计算的公…

Java 8遍历Map的方式

1、使用entrySet()和stream()方法结合遍历Map Map<String, String> map new HashMap<>();map.put("A001", "zhangsan");map.put("A002", "lisi");map.entrySet().stream().forEach(entry -> {String key entry.getKe…

自动拟人对话机器人在客户服务方面起了什么作用?

在当今数字时代&#xff0c;企业不断寻求创新的方法来提升客户服务体验。随着科技的不断进步和消费者期望的提升&#xff0c;传统的客户服务方式逐渐无法满足现代消费者的需求。因此&#xff0c;许多企业正在积极探索利用新兴技术来改进客户服务&#xff0c;自动拟人对话机器人…

LuatOS-SOC接口文档(air780E)-- gmssl - 国密算法

sm.sm2encrypt(pkx,pky,data)# sm2算法加密 参数 传入值类型 解释 string 公钥x,必选 string 公钥y,必选 string 待计算的数据,必选,最长255字节 返回值 返回值类型 解释 string 加密后的字符串, 原样输出,未经HEX转换 例子 local originStr "encryptio…

新增一个timestamp.html 页面 --chatGPT

问&#xff1a;新增一个timestamp.html 页面&#xff0c;页面实现日期和时间戳 互转功能 gpt: 要创建一个 timestamp.html 页面&#xff0c;用于实现日期和时间戳的互转功能&#xff0c;可以按照以下步骤操作&#xff1a; 1. 创建一个名为 timestamp.html 的 HTML 文件&…