深圳手机医疗网站建设/品牌推广计划书怎么写

深圳手机医疗网站建设,品牌推广计划书怎么写,网站左侧导航栏设计,西宁市营销网站建设公司假设你有两个Kafka主题&#xff1a;user_activities_topic 和 product_views_topic&#xff0c;并且你希望将user_activities_topic中的数据写入到user_activities表&#xff0c;而将product_views_topic中的数据写入到product_views表。 maven <dependencies><!-- …

假设你有两个Kafka主题:user_activities_topicproduct_views_topic,并且你希望将user_activities_topic中的数据写入到user_activities表,而将product_views_topic中的数据写入到product_views表。

maven

<dependencies><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency>
</dependencies>

Flink Job 示例代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;import java.util.Properties;public class MultipleKafkaToFlinkToMysql {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置Kafka消费者属性Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");// 创建第一个Kafka消费者 (User Activities)FlinkKafkaConsumer<String> userActivitiesConsumer = new FlinkKafkaConsumer<>("user_activities_topic",new SimpleStringSchema(),kafkaProperties);// 创建第二个Kafka消费者 (Product Views)FlinkKafkaConsumer<String> productViewsConsumer = new FlinkKafkaConsumer<>("product_views_topic",new SimpleStringSchema(),kafkaProperties);// 从Kafka获取用户活动数据流env.addSource(userActivitiesConsumer).map(value -> {String[] parts = value.split(",");return new UserActivity(parts[0], parts[1]);}).addSink(JdbcSink.sink("INSERT INTO user_activities (user_id, activity) VALUES (?, ?)",(statement, userActivity) -> {statement.setString(1, userActivity.userId);statement.setString(2, userActivity.activity);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));// 从Kafka获取产品浏览数据流env.addSource(productViewsConsumer).map(value -> {String[] parts = value.split(",");return new ProductView(parts[0], Integer.parseInt(parts[1]));}).addSink(JdbcSink.sink("INSERT INTO product_views (user_id, product_id) VALUES (?, ?)",(statement, productView) -> {statement.setString(1, productView.userId);statement.setInt(2, productView.productId);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}// 用户活动类public static class UserActivity {public String userId;public String activity;public UserActivity(String userId, String activity) {this.userId = userId;this.activity = activity;}}// 产品浏览类public static class ProductView {public String userId;public int productId;public ProductView(String userId, int productId) {this.userId = userId;this.productId = productId;}}
}

当处理多个消费者和表时,直接为每个消费者编写独立的代码会导致代码冗长且难以维护。为了提高代码的可维护性和扩展性,可以采用一些设计模式和抽象方法来简化代码结构。以下是一些改进策略:

### 1. 使用工厂模式和配置文件

通过使用工厂模式和配置文件,可以将不同Kafka主题和MySQL表的映射关系抽象出来,从而减少重复代码。

### 2. 示例代码重构

下面是一个示例,展示了如何通过配置文件和工厂模式来管理多个Kafka消费者和相应的MySQL输出。

#### 2.1 配置文件 (`application.yaml`)

首先,定义一个配置文件来描述每个消费者的配置信息,包括Kafka主题、目标MySQL表名以及字段映射等。

consumers:- name: user_activities_consumerkafka_topic: user_activities_topicmysql_table: user_activitiesfields:- { index: 0, column: user_id }- { index: 1, column: activity }- name: product_views_consumerkafka_topic: product_views_topicmysql_table: product_viewsfields:- { index: 0, column: user_id }- { index: 1, column: product_id }

#### 2.2 工厂类 (`ConsumerFactory.java`)

创建一个工厂类,根据配置文件中的信息动态生成消费者并设置其数据处理逻辑。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import java.util.Properties;
import java.util.List;
import java.util.Map;public class ConsumerFactory {public static void createAndRegisterConsumers(StreamExecutionEnvironment env, List<Map<String, Object>> consumers) {Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");for (Map<String, Object> consumerConfig : consumers) {String kafkaTopic = (String) consumerConfig.get("kafka_topic");String mysqlTable = (String) consumerConfig.get("mysql_table");List<Map<String, Object>> fields = (List<Map<String, Object>>) consumerConfig.get("fields");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic,new SimpleStringSchema(),kafkaProperties);env.addSource(kafkaConsumer).map(value -> parseMessage(value, fields)).addSink(JdbcSink.sink(generateInsertSQL(mysqlTable, fields),(statement, record) -> populateStatement(statement, record, fields),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));}}private static Map<String, Object> parseMessage(String value, List<Map<String, Object>> fields) {String[] parts = value.split(",");return fields.stream().collect(Collectors.toMap(field -> (String) field.get("column"),field -> parts[(Integer) field.get("index")]));}private static String generateInsertSQL(String table, List<Map<String, Object>> fields) {StringBuilder columns = new StringBuilder();StringBuilder placeholders = new StringBuilder();for (int i = 0; i < fields.size(); i++) {if (i > 0) {columns.append(", ");placeholders.append(", ");}columns.append(fields.get(i).get("column"));placeholders.append("?");}return "INSERT INTO " + table + " (" + columns.toString() + ") VALUES (" + placeholders.toString() + ")";}private static void populateStatement(java.sql.PreparedStatement statement, Map<String, Object> record, List<Map<String, Object>> fields) throws Exception {for (int i = 0; i < fields.size(); i++) {String column = (String) fields.get(i).get("column");Object value = record.get(column);if (value instanceof Integer) {statement.setInt(i + 1, (Integer) value);} else if (value instanceof String) {statement.setString(i + 1, (String) value);}// 其他类型可以根据需要添加}}
}

#### 2.3 主程序 (`Main.java`)

在主程序中加载配置文件,并调用工厂类来注册所有消费者。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.yaml.snakeyaml.Yaml;import java.io.InputStream;
import java.util.List;
import java.util.Map;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Yaml yaml = new Yaml();InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("application.yaml");Map<String, Object> config = yaml.load(inputStream);List<Map<String, Object>> consumers = (List<Map<String, Object>>) config.get("consumers");ConsumerFactory.createAndRegisterConsumers(env, consumers);env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}
}

### 关键点解释

1. **配置文件**:通过配置文件定义每个消费者的信息,使得添加新的消费者变得简单,只需修改配置文件即可。
   
2. **工厂模式**:使用工厂类 `ConsumerFactory` 根据配置动态创建消费者,并为其设置数据处理逻辑和输出目标。

3. **通用的数据处理逻辑**:`parseMessage` 方法根据配置文件中的字段映射解析消息,`generateInsertSQL` 和 `populateStatement` 方法则用于生成插入SQL语句和填充PreparedStatement。

4. **扩展性**:这种设计方式非常灵活,易于扩展。如果需要增加新的消费者或修改现有消费者的配置,只需更新配置文件而无需更改代码逻辑。

这种方法不仅减少了代码量,还提高了代码的可维护性和扩展性,使得系统更容易管理和维护。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

远程登录客户端软件 CTerm 发布了 v4.0.0

有时候我们需要远程登录到 Linux/Unix 服务器&#xff0c;这方面使用最广泛的客户端软件是 PuTTY&#xff0c;不过它是全英文的&#xff0c;而且是单窗口的&#xff0c;有时候显得不那么方便。 CTerm (Clever Terminal) 是一个 Windows 平台下支持 Telnet 和 SSH 协议进行远程…

AFL++安装

学习fuzzing也几天了&#xff0c;今天记录AFL的安装及使用 一、实验环境 虚拟机&#xff1a;ubuntu20.04 当然也可以uname -a去看自己的版本号 二、AFL安装 1.先更新一下工具 sudo apt update2.安装AFL必要的一些依赖&#xff0c;例如编译工具&#xff08;如 build-essen…

【STM32】ADC功能-单通道多通道(学习笔记)

本章结合上一节内容复习更好理解【江协科技STM32】ADC数模转换器-学习笔记-CSDN博客 一、ADC单通道 接线图 ADC初始化 ①RCC开启时钟&#xff0c;包括ADC和GPIO的时钟&#xff0c;另外ADCCLK的分频器也要配置 ②配置GPIO,&#xff0c;把需要用的GPIO配置成模拟输入模式&am…

基于YOLO11深度学习的运动品牌LOGO检测与识别系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

当前主流的大模型训练与推理框架的全面汇总

以下是当前主流的大模型训练与推理框架的全面汇总 以下是更新后包含 SGLang 的大模型训练与推理框架列表&#xff0c;并对分类和示例进行了优化&#xff1a; 一、通用深度学习推理框架 TensorRT-LLM 特点&#xff1a;NVIDIA推出的针对Transformer类模型的优化框架&#xff0c;支…

[数据分享第七弹]全球洪水相关数据集

洪水是一种常见的自然灾害&#xff0c;在全球范围内造成了极为严重的威胁。近年来&#xff0c;针对洪水事件的检测分析&#xff0c;以及对于洪水灾害和灾后恢复能力的研究日渐增多&#xff0c;也产生了众多洪水数据集。今天&#xff0c;我们一起来收集整理一下相关数据集。&…

Hive-数据倾斜优化

数据倾斜的原因 1&#xff09;key分布不均匀&#xff0c;本质上就是业务数据有可能会存在倾斜 2&#xff09;某些SQL语句本身就有数据倾斜 关键词 情形 后果 Join A、其中一个表较小&#xff0c;但是key集中; B、两张表都是大表&#xff0c;key不均 分发到…

RuleOS:区块链开发的“新引擎”,点燃Web3创新之火

RuleOS&#xff1a;区块链开发的“新引擎”&#xff0c;点燃Web3创新之火 在区块链技术的浪潮中&#xff0c;RuleOS宛如一台强劲的“新引擎”&#xff0c;为个人和企业开发去中心化应用&#xff08;DApp&#xff09;注入了前所未有的动力。它以独特的设计理念和强大的功能特性&…

Windows编译环境搭建(MSYS2\MinGW\cmake)

我的音视频/流媒体开源项目(github) 一、基础环境搭建 1.1 MSYS2\MinGW 参考&#xff1a;1. 基于MSYS2的Mingw-w64 GCC搭建Windows下C开发环境_msys2使用mingw64编译 在Widndows系统上&#xff0c;使用gcc工具链&#xff08;g&#xff09;进行C程序开发&#xff1f;可以的&a…

TikTok美国战略升级:聚焦美食旅行,本地化服务如何重塑市场格局

平台深耕本土内容生态&#xff0c;餐饮旅游创作者迎流量红利&#xff0c;算法推荐机制激发地域经济新活力 过去一年&#xff0c;TikTok在美国市场的动作频频引发行业关注。从早期以娱乐、舞蹈为主的全球化内容&#xff0c;到如今将资源向美食、旅行两大垂类倾斜&#xff0c;这…

Unity Dots环境配置

文章目录 前言环境配置1.新建Unity 工程2.安装Entities包2.安装EntitiesGraphics包3.安装URP渲染管线 Dots窗口 前言 DOTS&#xff08;Data-Oriented Technology Stack&#xff09;是Unity推出的一种用于开发高性能游戏和应用的数据导向技术栈&#xff0c;包含三大核心组件&am…

manus对比ChatGPT-Deep reaserch进行研究类学术相关数据分析!谁更胜一筹?

没有账号&#xff0c;只能挑选一个案例 一夜之间被这个用全英文介绍全华班出品的新爆款国产AI产品的小胖刷频。白天还没有切换语言的选项&#xff0c;晚上就加上了。简单看了看团队够成&#xff0c;使用很长实践的Monica创始人也在其中。逐渐可以理解&#xff0c;重心放在海外产…

Kubernetes中的 iptables 规则介绍

#作者&#xff1a;邓伟 文章目录 一、Kubernetes 网络模型概述二、iptables 基础知识三、Kubernetes 中的 iptables 应用四、查看和调试 iptables 规则五、总结 在 Kubernetes 集群中&#xff0c;iptables 是一个核心组件&#xff0c; 用于实现服务发现和网络策略。iptables 通…

视频图像刷新到HTTP的原理

上一篇博客已经说了cgi拿到了共享内存的程序还需要处理的才能够真正刷新到网页里面去 HTTP协议介绍 HTTP中文名称是超文本传输协议&#xff0c;它是一个简单的请求.响应协议&#xff0c;HTTP协议它运行在TCP上面&#xff0c;它是互联网数据通信的基础。 几乎所有的网页请求和互…

2024四川大学计算机考研复试上机真题

2024四川大学计算机考研复试上机真题 2024四川大学计算机考研复试机试真题 历年四川大学计算机考研复试机试真题 在线评测&#xff1a;https://app2098.acapp.acwing.com.cn/ 分数求和 题目描述 有一分数序列&#xff1a; 2/1 3/2 5/3 8/5 13/8 21/13… 求出这个数列的前 …

Qt 实现绘图板(支持橡皮擦与 Ctrl+Z 撤销功能)[特殊字符]

作业&#xff1a; 1&#xff1a;实现绘图的时候&#xff0c;颜色的随时调整 2&#xff1a;追加橡皮擦功能 3&#xff1a;配合键盘事件&#xff0c;实现功能 当键盘按 ctrlz的时候&#xff0c;撤销最后一次绘图 头文件.h #ifndef WIDGET_H #define WIDGET_H#include <QWidge…

计算机网络(1) 网络通信基础,协议介绍,通信框架

网络结构模式 C/S-----客户端和服务器 B/S -----浏览器服务器 MAC地址 每一个网卡都拥有独一无二的48位串行号&#xff0c;也即MAC地址&#xff0c;也叫做物理地址、硬件地址或者是局域网地址 MAC地址表示为12个16进制数 如00-16-EA-AE-3C-40 &#xff08;每一个数可以用四个…

【无人机三维路径规划】基于CPO冠豪猪优化算法的无人机三维路径规划Maltab

代码获取基于CPO冠豪猪优化算法的无人机三维路径规划Maltab 基于CPO冠豪猪优化算法的无人机三维路径规划 一、CPO算法的基本原理与核心优势 冠豪猪优化算法&#xff08;Crested Porcupine Optimizer, CPO&#xff09;是一种新型元启发式算法&#xff0c;其灵感来源于冠豪猪的…

简洁实用的3个免费wordpress主题

高端大气动态炫酷的免费企业官网wordpress主题 非常简洁的免费wordpress主题&#xff0c;安装简单、设置简单&#xff0c;几分钟就可以搭建好一个wordpress网站。 经典风格的免费wordpress主题 免费下载 https://www.fuyefa.com/wordpress

RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)

RabbitMQ 核心功能 RabbitMQ 高级特性解析&#xff1a;RabbitMQ 消息可靠性保障 &#xff08;上&#xff09;-CSDN博客 RabbitMQ 高级特性&#xff1a;从 TTL 到消息分发的全面解析 &#xff08;下&#xff09;-CSDN博客 前言 最近再看 RabbitMQ&#xff0c;看了看自己之前写…