前言
今天分享的其实是一个面试上机方案,就是监测DM数据库数据,同步到ES,使用flink实现。基本套路,其实也没啥好说的,非要说也就是,国家队还是很多不跟你玩啊,虽然flink有阿里在背后,但是确实也没有带DM玩。也许是DM不配合,万一真有互联网公司带着玩,政府部门估计不敢用了,哈哈。
一、DM数据库连接
我这里是使用的DBeaver连接DM数据库的,自己建的驱动,以前有分享怎么建,这里就不细说。
看效果检查数据也是使用DBeaver连接的ES,ES要证书什么的,这里也是使用建驱动连接,跟DM一样的。
自建驱动连接DM、ES
ES免证书连接驱动下载
连接DM数据库配置驱动
二、实现方案
- 监视数据变化,DM数据库自己完成,采用触发器监视
- flink监视第一步监视的数据表,flink没有直接binlog这种方式监视DM数据库
- flink实现周期监视变更记录表
- 处理增、删、改业务逻辑同步ES
三、上代码
1.创建触发器
REATE OR REPLACE TRIGGER DB库名.RC_PASS_RECORD_CHANGE_LOG_TRIGGER
AFTER INSERT OR UPDATE OR DELETE ON DB库名.RC_PASS_RECORD
FOR EACH ROW
BEGINIF INSERTING THEN-处理插入INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('INSERT', :new.ID);ELSIF UPDATING THEN-处理更新INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('UPDATE', :new.ID);ELSIF DELETING THEN-处理删除INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('DELETE', :old.ID);END IF;
END;
2.flink代码
这里集成Springboot,所以先建最新的Springboot项目,引入flink的依赖、DM数据库、ES的连接依赖。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.zw</groupId><artifactId>olap-zw</artifactId><version>0.0.1-SNAPSHOT</version><name>olap-zw</name><description>olap-zw</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version><skipTests>true</skipTests><flink.version>1.20.1</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><!--<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jdbc</artifactId></dependency>--><!-- <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.6</version></dependency>--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--<dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.18</version></dependency>--><dependency><groupId>com.dameng</groupId><artifactId>DmJdbcDriver18</artifactId><version>8.1.3.62</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin><!-- Flink打包方式一 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><mainClass>com.zw.olapzw.OlapZwApplication</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
代码包结构与文件
配置文件
spring.application.name=olap-zwspring.datasource.url=jdbc:dm://192.168.1.22:5236?schema=DB库名&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false
spring.datasource.username=账号
spring.datasource.password=密码
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
#spring.datasource.druid.initial-size=5
#spring.datasource.druid.max-active=10
#spring.datasource.druid.min-idle=5
#spring.datasource.druid.max-wait=60000
#spring.datasource.druid.validation-query=SELECT 1
#spring.datasource.druid.test-while-idle=true
#spring.datasource.druid.time-between-eviction-runs-millis=30000
#spring.datasource.druid.min-evictable-idle-time-millis=60000es.host=192.168.1.21
es.port=9200logging.level.root=error
logging.level.com.zw.olapzw.sink=error
logging.level.com.zw.olapzw.source=error
entity -> SourceDataEntity
package com.zw.olapzw.entity;import lombok.Data;import java.util.Date;/*** @author zwmac*/
@Data
public class SourceDataEntity {private String id;private Long deviceId;private Long gateId;private String authObjCode;/*** 变更类型,来源触发器插入的标识*/private String changeType;/*** 同步时间*/private Date storageTime;
}
sink -> ResultSinkDataEntitySink
package com.zw.olapzw.sink;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.util.ConnUtil;
import com.zw.olapzw.util.ESClientUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;/*** @author zwmac*/
@Slf4j
public class ResultSinkDataEntitySink extends RichSinkFunction<SourceDataEntity> {@Resourceprivate ConnUtil connUtil;@Overridepublic void invoke(SourceDataEntity record, Context context) throws Exception {super.invoke(record, context);//收到的数据log.info("Start sink data entity");log.info("Sink data: {}", record);//设置存储时间record.setStorageTime(DateUtil.date());//组织sql,利用反射JSONObject dataJson = JSONUtil.parseObj(record);log.info("-- 同步数据dataJson: {}", dataJson);//TODO 连接ES,存储到ES// 连接 ElasticsearchConnUtil connUtil = SpringUtil.getBean(ConnUtil.class);String es_host = connUtil.getEsHost();String es_port = connUtil.getEsPort();RestHighLevelClient restHighLevelClient = ESClientUtil.getClient(es_host, Integer.parseInt(es_port));//使用RestHighLevelClient创建索引//根据changeType判断String changeType = record.getChangeType();switch (changeType) {case "INSERT"://创建索引sinkInsert(record, dataJson, restHighLevelClient);break;case "UPDATE":sinkUpdate(record, dataJson, restHighLevelClient);break;case "DELETE":sinkDel(record, dataJson, restHighLevelClient);break;}//关闭连接restHighLevelClient.close();log.info("end sink data entity");}/*** 删除数据* @param record* @param dataJson* @param restHighLevelClient* @throws IOException*/private void sinkDel(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {//使用restHighLevelClient删除数据DeleteRequest deleteRequest = new DeleteRequest("test_index", record.getId());DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);log.error("--- ES使用DeleteRequest删除返回deleteResponse: {}", deleteResponse);}/*** 更新数据* @param record* @param dataJson* @param restHighLevelClient* @throws IOException*/private void sinkUpdate(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {//使用restHighLevelClient更新数据UpdateRequest updateRequest = new UpdateRequest("test_index", record.getId());//String jsonStr = JSONUtil.toJsonStr(dataJson);//updateRequest.doc("data", jsonStr);Map<String, Object> updateFields = new HashMap<>();updateFields.put("deviceId", record.getDeviceId());updateFields.put("id", record.getId());updateFields.put("authObjCode", record.getAuthObjCode());updateFields.put("gateId", record.getGateId());if (MapUtil.isNotEmpty(updateFields)){updateRequest.doc(updateRequest);UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);log.error("---- ES使用UpdateRequest更新返回: {}", updateResponse);}else {log.error("------- es更新时 updateMap is empty");}}/*** 插入数据* @param record* @param dataJson* @param restHighLevelClient* @throws IOException*/private void sinkInsert(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {//使用restHighLevelClient存储数据IndexRequest indexRequest = new IndexRequest("test_index");indexRequest.id(record.getId());Map<String, Object> insertFields = new HashMap<>();insertFields.put("deviceId", record.getDeviceId());insertFields.put("id", record.getId());insertFields.put("authObjCode", record.getAuthObjCode());insertFields.put("gateId", record.getGateId());indexRequest.source(insertFields);IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);log.error("------ ES使用IndexRequest插入返回indexResponse: {}", indexResponse);}
}
source -> ResultSourceDataEntitySource
package com.zw.olapzw.source;import cn.hutool.extra.spring.SpringUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.util.ConnUtil;
import dm.jdbc.util.StringUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.springframework.stereotype.Component;import java.sql.*;/*** @author zwmac*/
@Component
@Slf4j
public class ResultSourceDataEntitySource extends RichSourceFunction<SourceDataEntity> {@Resourceprivate ConnUtil connUtil;String changeLogSql = "SELECT * FROM LAMP_ZF.RC_PASS_RECORD_CHANGE_LOG RPRCL";String recordSql = "SELECT * FROM LAMP_ZF.RC_PASS_RECORD RPR";Long startId = 0L;private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<SourceDataEntity> sourceContext) throws Exception {//TODO 从数据库中读取数据,这块要是跟springboot结合起来,就可以用一些orm框架,等等项目上的一些东西了log.info("Start source data entity");try {ConnUtil connUtil = SpringUtil.getBean(ConnUtil.class);String jdbcUrl = connUtil.getJdbcUrl();String jdbcUsername = connUtil.getJdbcUsername();String jdbcPassword = connUtil.getJdbcPassword();Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);Statement statement = connection.createStatement();// 执行查询语句while (isRunning) {StringBuffer sbf = new StringBuffer();sbf.append(changeLogSql);sbf.append(" WHERE RPRCL.ID > ");sbf.append(startId);sbf.append(" ORDER BY RPRCL.ID ASC");String querySql = sbf.toString();//log.info(querySql);log.error("---查询sql:" + querySql);ResultSet resultSet = statement.executeQuery(querySql);// 处理查询结果while (resultSet.next()) {// 读取每一行数据Long id = resultSet.getLong("ID");startId = id;//CHANGE_TYPEString changeType = resultSet.getString("CHANGE_TYPE");//CHANGE_IDString changeId = resultSet.getString("CHANGE_ID");//CREATE_TIMEDate changeDate = resultSet.getDate("CREATE_TIME");SourceDataEntity sourceDataEntity = new SourceDataEntity();sourceDataEntity.setChangeType(changeType);//如果是删除就不查了if("DELETE".equals(changeType)) {sourceDataEntity.setId(changeId);}else {//根据类型查询对应的数据StringBuffer recordSbf = new StringBuffer();recordSbf.append(recordSql);recordSbf.append(" WHERE RPR.ID = '" + changeId + "'");String queryRecordSql = recordSbf.toString();log.error("-- 查询记录sql:" + queryRecordSql);Statement recordSm = connection.createStatement();ResultSet recordRs = recordSm.executeQuery(queryRecordSql);while (recordRs.next()) {//解析数据sourceDataEntity.setId(recordRs.getString("ID"));sourceDataEntity.setDeviceId(recordRs.getLong("DEVICE_ID"));sourceDataEntity.setGateId(recordRs.getLong("GATE_ID"));sourceDataEntity.setAuthObjCode(recordRs.getString("AUTH_OBJ_CODE"));}}if (sourceDataEntity.getId() != null){//解决物理删除后的报错sourceDataEntity.setStorageTime(changeDate);sourceContext.collect(sourceDataEntity);}}//调试用的,生产可以根据情况加活不加Thread.sleep(10000L);}} catch (SQLException e) {log.error(e.getMessage());}}@Overridepublic void cancel() {isRunning = false;}
}
util -> ConnUtil
package com.zw.olapzw.util;import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Data
public class ConnUtil {@Value("${spring.datasource.url}")private String jdbcUrl;@Value("${spring.datasource.username}")private String jdbcUsername;@Value("${spring.datasource.password}")private String jdbcPassword;@Value("${es.host}")private String esHost;@Value("${es.port}")private String esPort;}
util -> ESClientUtil
package com.zw.olapzw.util;import lombok.Data;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author zwmac*/
public class ESClientUtil {private static RestHighLevelClient restHighLevelClient;public static RestHighLevelClient getClient(String host, int port) {RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http")).setHttpClientConfigCallback(new HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder;}});restHighLevelClient = new RestHighLevelClient(builder);return restHighLevelClient;}public static void closeClient() {try {if (restHighLevelClient != null) {restHighLevelClient.close();}} catch (IOException e) {e.printStackTrace();}}
}
OlapZwApplication
package com.zw.olapzw;import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.sink.ResultSinkDataEntitySink;
import com.zw.olapzw.source.ResultSourceDataEntitySource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;/*** @author zwmac*/
@SpringBootApplication
public class OlapZwApplication {/*public static void main(String[] args) {//SpringApplication.run(OlapZwApplication.class, args);//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(1);//开启checkpoint,每隔5秒钟做一次checkpointenv.enableCheckpointing(5000L);//指定checkpoint的一致性语义CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置任务关闭的时候保留最后一次checkpoint数据checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());//重试策略设置env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));//源数据DataStreamSource<SourceDataEntity> streamSource = env.addSource(new ResultSourceDataEntitySource(), "ResultSourceDataEntitySource");//下层处理streamSource.addSink(new ResultSinkDataEntitySink());System.out.println("Hello, OlapZwApplication!");try {env.execute("达梦数据库变更数据同步");} catch (Exception e) {System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());throw new RuntimeException(e);}}
*/public static void main(String[] args) {SpringApplication.run(OlapZwApplication.class, args);System.out.println("OlapZwApplication started");}@Beanpublic CommandLineRunner commandLineRunner(ApplicationContext ctx) {return args -> {//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(1);//开启checkpoint,每隔5秒钟做一次checkpointenv.enableCheckpointing(5000L);//指定checkpoint的一致性语义CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置任务关闭的时候保留最后一次checkpoint数据checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());//重试策略设置env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));//源数据SourceFunction source = new ResultSourceDataEntitySource();//添加数据源到运行环境DataStreamSource<SourceDataEntity> streamSource = env.addSource(source, "ResultSourceDataEntitySource");//下游处理逻辑streamSource.addSink(new ResultSinkDataEntitySink());try {env.execute("达梦数据库变更数据同步");} catch (Exception e) {System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());throw new RuntimeException(e);}System.out.println("flink CDC started");};}}
总结
- 其实感觉没啥好说的,整个代码后面分享到csdn的gitCode