物流实时数仓:数仓搭建(DWS)一

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一


文章目录

  • 系列文章目录
  • 前言
  • 一、代码编写
    • 1.文件创建
      • 1.主程序
      • 2.实体类
      • 3.自定义触发器
      • 4.自定义聚合函数
      • 5.在HbaseUtil中添加查询方法
      • 6.JedisUtil工具类
      • 7.封装DimUtil工具类,使用旁路缓存优化查询维度
      • 8.修改实体类TmsConfigDimBean
      • 9.传递op
      • 10.获取线程池的工具类
      • 11.异步关联函数DimAsyncFunction
      • 12.ClickHouseUtil工具类
      • 13. DimSinkFunction
      • 14. TransientSink注解
    • 2.主程序
    • 3.开窗聚合
      • 1.MyTriggerFunction
      • 2.MyAggregationFunction
    • 4.关联维度信息
      • 1.DimAsyncFunction
      • 2.DimJoinFunction
      • 3.ThreadPoolUtil
      • 4.DimUtil
      • 5.JedisUtil
      • 6. HbaseUtil
      • 7. DwsBoundOrgSortDayBean
      • 8.补充维度字段
      • 9. MyBroadcastProcessFunction
      • 10. DimSinkFunction
    • 5.写入CK
      • 1. ClickHouseUtil
      • 2.TransientSink
  • 二、代码测试
    • 1.程序启动
    • 2.修改kafka分区
    • 3.ck建表
      • 1.建库
      • 2.建表
      • 3.物化视图
      • 4.查看结果
  • 总结


前言

这次博客,我们进行各机构分拣次数的统计。统计当日各机构的分拣次数,并补充城市、省份等维度信息,写入ClickHouse对应表。要求每十秒钟更新一次统计结果。

大体流程如图。
在这里插入图片描述


一、代码编写

1.文件创建

1.主程序

在这里插入图片描述

2.实体类

在这里插入图片描述

3.自定义触发器

在这里插入图片描述

4.自定义聚合函数

在这里插入图片描述

5.在HbaseUtil中添加查询方法

在这里插入图片描述

6.JedisUtil工具类

在这里插入图片描述

7.封装DimUtil工具类,使用旁路缓存优化查询维度

在这里插入图片描述

8.修改实体类TmsConfigDimBean

在这里插入图片描述

9.传递op

在这里插入图片描述

10.获取线程池的工具类

在这里插入图片描述

11.异步关联函数DimAsyncFunction

在这里插入图片描述

12.ClickHouseUtil工具类

在这里插入图片描述以上就是这次博客要更改或创建的java文件。

13. DimSinkFunction

当维度数据更新时,删除redis中的对应数据。
在这里插入图片描述

14. TransientSink注解

某些字段不需要写入ClickHouse,但对统计有帮助,我们可以通过添加自定义注解,在写出时获取字段的TransientSink注解,通过判断是否注解是否为空在写出时忽略指定字段。
在这里插入图片描述

2.主程序

DwsBoundOrgSortDay需要完成的任务如以下流程图。
在这里插入图片描述

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwsBoundOrgSortDayBean;
import com.atguigu.tms.realtime.utils.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class DwsBoundOrgSortDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// kafka读取数据String topic = "tms_dwd_bound_sort";String groupId = "dws_tms_dwd_bound_sort";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中的数据进行类型转换 jsonStr-> 实体类SingleOutputStreamOperator<DwsBoundOrgSortDayBean> dwsBoundOrgSortDayBeanSingleOutputStreamOperator = kafkaStrDS.map(new MapFunction<String, DwsBoundOrgSortDayBean>() {@Overridepublic DwsBoundOrgSortDayBean map(String jsonStr) throws Exception {DwdBoundSortBean dwdBoundSortBean = JSON.parseObject(jsonStr, DwdBoundSortBean.class);return DwsBoundOrgSortDayBean.builder().orgId(dwdBoundSortBean.getOrgId()).sortCountBase(1L).ts(dwdBoundSortBean.getTs() + 8 * 60 * 60 * 1000L).build();}});// 指定Watermark以及提取事件事件字段SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withWatermarkDS = dwsBoundOrgSortDayBeanSingleOutputStreamOperator.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsBoundOrgSortDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsBoundOrgSortDayBean>() {@Overridepublic long extractTimestamp(DwsBoundOrgSortDayBean boundOrgSortDayBean, long recordTimestamp) {return boundOrgSortDayBean.getTs();}}));//        withWatermarkDS.print("###");// 按照机构id进行分组KeyedStream<DwsBoundOrgSortDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsBoundOrgSortDayBean::getOrgId);// 开窗WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1L)));// 指定自定义触发器WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<>());// 聚合SingleOutputStreamOperator<DwsBoundOrgSortDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsBoundOrgSortDayBean>() {@Overridepublic DwsBoundOrgSortDayBean add(DwsBoundOrgSortDayBean value, DwsBoundOrgSortDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setSortCountBase(accumulator.getSortCountBase() + 1);return accumulator;}},new ProcessWindowFunction<DwsBoundOrgSortDayBean, DwsBoundOrgSortDayBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsBoundOrgSortDayBean> elements, Collector<DwsBoundOrgSortDayBean> out) throws Exception {for (DwsBoundOrgSortDayBean element : elements) {// 获取窗口起始时间long stt = context.window().getStart();// 将窗口时间左移8小时 并转换格式element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度(城市、省份)// 关联机构维度 获取机构名称// 异步I/OSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withOrgNameDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setOrgName(dimInfoJsonObj.getString("org_name"));String orgParentId = dimInfoJsonObj.getString("org_parent_id");sortDayBean.setJoinOrgId(orgParentId != null?orgParentId:sortDayBean.getOrgId());}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getOrgId());}},60,TimeUnit.SECONDS);// 补充城市IDSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityIdDS = AsyncDataStream.unorderedWait(withOrgNameDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityId(dimInfoJsonObj.getString("region_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getJoinOrgId());}},60,TimeUnit.SECONDS);// 关联地区维度表 根据城市的id获取城市名称以及当前城市所属的省份idSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityNameAndProvinceIdDS = AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityName(dimInfoJsonObj.getString("name"));sortDayBean.setProvinceId(dimInfoJsonObj.getString("parent_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getCityId());}},60, TimeUnit.SECONDS);// 关联地区维度表 根据省份的id获取省份的名称SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withProvinceDS = AsyncDataStream.unorderedWait(withCityNameAndProvinceIdDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setProvinceName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getProvinceId());}},60, TimeUnit.SECONDS);withProvinceDS.print(">>>>");// 将关联的结果写入ck中withProvinceDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_bound_org_sort_day_base values(?,?,?,?,?,?,?,?,?)"));env.execute();}
}

现在我们就按照主程序的调用来完成其他文件的编写。

3.开窗聚合

开窗之前没有用到新的函数,所以不说了。

1.MyTriggerFunction

自定义触发器

package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;//自定义触发器 每10s触发一次窗口计算
public class MyTriggerFunction<T>  extends Trigger<T, TimeWindow> {@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ValueStateDescriptor<Boolean> valueStateDescriptor= new ValueStateDescriptor<Boolean>("isFirstState",Boolean.class);ValueState<Boolean> isFirstState = ctx.getPartitionedState(valueStateDescriptor);Boolean isFirst = isFirstState.value();if(isFirst == null){//如果是窗口中的第一个元素//将状态中的值进行更新isFirstState.update(true);//注册定时器  当前事件时间向下取整后 + 10s后执行ctx.registerEventTimeTimer(timestamp -timestamp%10000L  + 2000L);}else if(isFirst){isFirstState.update(false);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}//time 表示事件时间触发器 触发时间@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {long end = window.getEnd();if(time < end){if(time + 2000L < end){ctx.registerEventTimeTimer(time + 2000L);}return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}
}

2.MyAggregationFunction

自定义聚合函数

package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.functions.AggregateFunction;public abstract class MyAggregationFunction<T> implements AggregateFunction<T,T,T> {@Overridepublic T createAccumulator() {return null;}@Overridepublic T getResult(T accumulator) {return accumulator;}@Overridepublic T merge(T a, T b) {return null;}
}

4.关联维度信息

1.DimAsyncFunction

异步I/O

package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DimJoinFunction;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.ThreadPoolUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.Collections;
import java.util.concurrent.ExecutorService;public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {private String tableName;public DimAsyncFunction(String tableName) {this.tableName = tableName;}private ExecutorService executorService;@Overridepublic void open(Configuration parameters) throws Exception {executorService = ThreadPoolUtil.getInstance();}@Overridepublic void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {// 从线程池中获取线程,发送异步请求executorService.submit(new Runnable() {@Overridepublic void run() {// 根据流中的对象获取要作为查询条件的主键或者外键Tuple2<String, String> keyNameAndValue = getCondition(obj);// 根据查询条件获取维度对象JSONObject dimInfoJsonObj = DimUtil.getDimInfo(TmsConfig.HBASE_NAMESPACE, tableName, keyNameAndValue);// 将维度对象的属性补充到流中的对象上if (dimInfoJsonObj != null) {join(obj, dimInfoJsonObj);}// 向下游传递数据resultFuture.complete(Collections.singleton(obj));}});}
}

2.DimJoinFunction

我们将需要DimAsyncFunction中一些需要子类实现的函数写入DimJoinFunction,当作接口接入。

package com.atguigu.tms.realtime.beans;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;public interface DimJoinFunction<T> {void join(T obj, JSONObject dimInfoJsonObj);Tuple2<String, String> getCondition(T obj);
}

3.ThreadPoolUtil

异步I/O中用作创建线程池的工具类

package com.atguigu.tms.realtime.utils;import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private static volatile ThreadPoolExecutor poolExecutor;public static synchronized ThreadPoolExecutor getInstance() {if (poolExecutor == null) {synchronized (ThreadPoolUtil.class){if (poolExecutor == null) {poolExecutor = new ThreadPoolExecutor(4,20,300,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE));}}}return poolExecutor;}
}

4.DimUtil

在维度关联时,我们需要从hbase中获取维度信息,为了为了优化查询速度,我们引入了redis,流程如图所示
在这里插入图片描述

package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class DimUtil {public static JSONObject getDimInfo(String namespace, String tableName, Tuple2<String, String> nameAndValue) {// 获取的查询条件中的字段名以及字段值String keyName = nameAndValue.f0;String keyValue = nameAndValue.f1;// 拼接从Redis中查询数据的KeyString redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;// 操作Redis的客户端Jedis jedis = null;// 用于存放从Redis查询的维度数据String dimJsonStr = null;// 用于封装返回结果JSONObject dimJsonObj = null;//  先从缓存中查询维度数据try {jedis = JedisUtil.getJedis();dimJsonStr = jedis.get(redisKey);if (StringUtils.isNotEmpty(dimJsonStr)) {// 如果在缓存中能够找到要查询的维度dimJsonObj = JSON.parseObject(dimJsonStr);} else {// 如果在缓存中,没有找到要查询的维度数据if ("id".equals(keyName)) {dimJsonObj = HbaseUtil.getRowByPrimaryKey(namespace, tableName, nameAndValue);} else {dimJsonObj = HbaseUtil.getRowByForeignKey(namespace, tableName, nameAndValue);}if (dimJsonObj != null && jedis != null) {jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());}}} catch (Exception e) {log.error("从Redis中查询维度数据发生了一场", e);} finally {if (jedis != null) {System.out.println("关闭客户端");jedis.close();}}return dimJsonObj;}// 从Redis中删除缓存的维度数据public static void delCached(String tableName, Tuple2<String, String> keyNameAndValue) {String keyName = keyNameAndValue.f0;String keyValue = keyNameAndValue.f1;String redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;Jedis jedis = null;try {jedis = JedisUtil.getJedis();jedis.decr(redisKey);}catch (Exception e){log.error("清除Redis中缓存的维度数据时发生了异常", e);}finally {if (jedis != null) {jedis.close();}}}
}

5.JedisUtil

用于连接reids的jedis客户端。
先在pom中引入依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version>
</dependency>

在这里插入图片描述

package com.atguigu.tms.realtime.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class JedisUtil {private static JedisPool jedisPool;static {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(1000);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000L);poolConfig.setTestOnBorrow(true);jedisPool=new JedisPool(poolConfig,"hadoop102",6379,10000);}public static Jedis getJedis(){System.out.println("创建Jedis客户端");Jedis jedis = jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis = getJedis();String pong = jedis.ping();System.out.println(pong);}
}

6. HbaseUtil

之前我们在HbaseUtil完成了创建表和插入操作,现在来完成查询操作。

package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HbaseUtil {private static Connection conn;static {try {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", TmsConfig.hbase_zookeeper_quorum);conn = ConnectionFactory.createConnection(conf);} catch (IOException e) {throw new RuntimeException(e);}}// 创建表public static void createTable(String nameSpace, String tableName, String... families) {Admin admin = null;try {if (families.length < 1) {System.out.println("至少需要一个列族");return;}admin = conn.getAdmin();// 判断表是否存在if (admin.tableExists(TableName.valueOf(nameSpace, tableName))) {System.out.println(nameSpace + ":" + tableName + "已存在");return;}TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));// 指定列族for (String family : families) {ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();builder.setColumnFamily(familyDescriptor);}admin.createTable(builder.build());} catch (IOException e) {throw new RuntimeException(e);} finally {if (admin != null) {try {admin.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 向hbase插入对象public static void putPow(String namespace, String tableName, Put put) {BufferedMutator mutator = null;try {BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(namespace, tableName));params.writeBufferSize(5 * 1024 * 1024);params.setWriteBufferPeriodicFlushTimeoutMs(3000L);mutator = conn.getBufferedMutator(params);mutator.mutate(put);} catch (IOException e) {throw new RuntimeException(e);} finally {if (mutator != null) {try {mutator.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 根据主键从Hbase表中查询一行数据public static JSONObject getRowByPrimaryKey(String namespace, String tableName, Tuple2<String, String> rowKeyNameAndKey) {Table table = null;JSONObject dimJsonObj = null;String rowKeyName = rowKeyNameAndKey.f0;String rowKeyValue = rowKeyNameAndKey.f1;try {table = conn.getTable(TableName.valueOf(namespace, tableName));Result result = table.get(new Get(Bytes.toBytes(rowKeyValue)));Cell[] cells = result.rawCells();if (cells.length > 0) {dimJsonObj = new JSONObject();dimJsonObj.put(rowKeyName, rowKeyValue);for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println("从Hbase表中没有找到对应的维度数据");}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}//根据外键从hbase表中查询一行数据public static JSONObject getRowByForeignKey(String namespace, String tableName, Tuple2<String, String> foreignKeyNameAndKey) {Table table = null;JSONObject dimJsonObj = null;try {table = conn.getTable(TableName.valueOf(namespace, tableName));Scan scan = new Scan();String foreignKeyName = foreignKeyNameAndKey.f0;String foreignKeyValue = foreignKeyNameAndKey.f1;SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes(foreignKeyName), CompareOperator.EQUAL,Bytes.toBytes(foreignKeyValue));singleColumnValueFilter.setFilterIfMissing(true);scan.setFilter(singleColumnValueFilter);ResultScanner scanner = table.getScanner(scan);Result result = scanner.next();if (result!=null){Cell[] cells = result.rawCells();if (cells.length > 0) {dimJsonObj = new JSONObject();dimJsonObj.put("id", Bytes.toString(result.getRow()));for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println("从Hbase表中没有找到对应的维度数据");}}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}
}

7. DwsBoundOrgSortDayBean

自定义的工具类,其中包含我们要写入ck的字段

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class DwsBoundOrgSortDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 用于关联获取省份信息的机构 ID@TransientSinkString joinOrgId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 分拣次数Long sortCountBase;// 时间戳Long ts;
}

8.补充维度字段

我代码编写我们需要维度表的外键字段,所以我们重新修改mysql维度表,添加外键字段。

DROP TABLE IF EXISTS `tms_config_dim`;
CREATE TABLE `tms_config_dim` (`source_table` varchar(200) NOT NULL COMMENT '数据源表',`sink_table` varchar(200) DEFAULT NULL COMMENT '目标表',`sink_family` varchar(200) DEFAULT NULL COMMENT '目标表列族',`sink_columns` varchar(200) DEFAULT NULL COMMENT '目标表列',`sink_pk` varchar(256) DEFAULT NULL COMMENT '主键字段',`foreign_keys` varchar(256) DEFAULT NULL COMMENT '外键查询字段',PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='物流实时配置表';

然后从新导入数据。
在这里插入图片描述
在这里插入图片描述

然后我们使用dimapp同步一下数据即可,具体方法可看Dim层搭建。

9. MyBroadcastProcessFunction

我们之前在DIM层的搭建中,在MyBroadcastProcessFunction的processElement函数中过滤掉了外键,但现在需要他,我们把它加上。
在传递前添加一段代码

// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)
String op = jsonObj.getString("op");
if ("u".equals(op)) {afterJsonObj.put("op", op);// 从配置表中获取当前维度表关联的外键名String foreignKeys = tmsConfigDimBean.getForeignKeys();// 定义个json对象,用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj = new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr = foreignKeys.split(",");for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before = jsonObj.getJSONObject("before");String foreignKeyBefore = before.getString(foreignName);String foreignKeyAfter = afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put("foreign_key", foreignjsonObj);
}

完成代码

package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.TmsConfigDimBean;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;// 自定义类 完成主流和广播流的处理
public class MyBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor;private Map<String, TmsConfigDimBean> configMap = new HashMap<>();private String username;private String password;public MyBroadcastProcessFunction(MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor, String[] args) {this.mapStateDescriptor = mapStateDescriptor;ParameterTool parameterTool = ParameterTool.fromArgs(args);this.username = parameterTool.get("mysql-username", "root");this.password = parameterTool.get("mysql-password", "000000");}@Overridepublic void open(Configuration parameters) throws Exception {// 将配置表中的数据进行预加载-JDBCClass.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://hadoop102:3306/tms_config?useSSL=false&useUnicode=true" +"&user=" + username + "&password=" + password +"&charset=utf8&TimeZone=Asia/Shanghai";Connection conn = DriverManager.getConnection(url);PreparedStatement ps = conn.prepareStatement("select * from tms_config.tms_config_dim");ResultSet rs = ps.executeQuery();ResultSetMetaData metaData = rs.getMetaData();while (rs.next()) {JSONObject jsonObj = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);Object columValue = rs.getObject(i);jsonObj.put(columnName, columValue);}TmsConfigDimBean tmsConfigDimBean = jsonObj.toJavaObject(TmsConfigDimBean.class);configMap.put(tmsConfigDimBean.getSourceTable(), tmsConfigDimBean);}rs.close();ps.close();conn.close();super.open(parameters);}@Overridepublic void processElement(JSONObject jsonObj, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {// 获取操作的业务数据库的表名String table = jsonObj.getString("table");// 获取广播状态ReadOnlyBroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 根据操作的业务数据库的表名 到广播状态中获取对应的配置信息TmsConfigDimBean tmsConfigDimBean;if ((tmsConfigDimBean = broadcastState.get(table)) != null || (tmsConfigDimBean = configMap.get(table)) != null) {// 如果对应的配置信息不为空 是维度信息// 获取after对象,对应的是影响的业务数据表中的一条记录JSONObject afterJsonObj = jsonObj.getJSONObject("after");// 数据脱敏switch (table) {// 员工表信息脱敏case "employee_info":String empPassword = afterJsonObj.getString("password");String empRealName = afterJsonObj.getString("real_name");String idCard = afterJsonObj.getString("id_card");String phone = afterJsonObj.getString("phone");// 脱敏empPassword = DigestUtils.md5Hex(empPassword);empRealName = empRealName.charAt(0) +empRealName.substring(1).replaceAll(".", "\\*");//知道有这个操作  idCard是随机生成的,和标准的格式不一样 所以这里注释掉// idCard = idCard.matches("(^[1-9]\\d{5}(18|19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$)")//     ? DigestUtils.md5Hex(idCard) : null;phone = phone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(phone) : null;afterJsonObj.put("password", empPassword);afterJsonObj.put("real_name", empRealName);afterJsonObj.put("id_card", idCard);afterJsonObj.put("phone", phone);break;// 快递员信息脱敏case "express_courier":String workingPhone = afterJsonObj.getString("working_phone");workingPhone = workingPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(workingPhone) : null;afterJsonObj.put("working_phone", workingPhone);break;// 卡车司机信息脱敏case "truck_driver":String licenseNo = afterJsonObj.getString("license_no");licenseNo = DigestUtils.md5Hex(licenseNo);afterJsonObj.put("license_no", licenseNo);break;// 卡车信息脱敏case "truck_info":String truckNo = afterJsonObj.getString("truck_no");String deviceGpsId = afterJsonObj.getString("device_gps_id");String engineNo = afterJsonObj.getString("engine_no");truckNo = DigestUtils.md5Hex(truckNo);deviceGpsId = DigestUtils.md5Hex(deviceGpsId);engineNo = DigestUtils.md5Hex(engineNo);afterJsonObj.put("truck_no", truckNo);afterJsonObj.put("device_gps_id", deviceGpsId);afterJsonObj.put("engine_no", engineNo);break;// 卡车型号信息脱敏case "truck_model":String modelNo = afterJsonObj.getString("model_no");modelNo = DigestUtils.md5Hex(modelNo);afterJsonObj.put("model_no", modelNo);break;// 用户地址信息脱敏case "user_address":String addressPhone = afterJsonObj.getString("phone");addressPhone = addressPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(addressPhone) : null;afterJsonObj.put("phone", addressPhone);break;// 用户信息脱敏case "user_info":String passwd = afterJsonObj.getString("passwd");String realName = afterJsonObj.getString("real_name");String phoneNum = afterJsonObj.getString("phone_num");String email = afterJsonObj.getString("email");// 脱敏passwd = DigestUtils.md5Hex(passwd);if (StringUtils.isNotEmpty(realName)) {realName = DigestUtils.md5Hex(realName);afterJsonObj.put("real_name", realName);}phoneNum = phoneNum.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(phoneNum) : null;email = email.matches("^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$")? DigestUtils.md5Hex(email) : null;afterJsonObj.put("birthday", DateFormatUtil.toDate(afterJsonObj.getInteger("birthday") * 24 * 60 * 60 * 1000L));afterJsonObj.put("passwd", passwd);afterJsonObj.put("phone_num", phoneNum);afterJsonObj.put("email", email);break;}// 过滤不需要的维度属性String sinkColumns = tmsConfigDimBean.getSinkColumns();filterColum(afterJsonObj, sinkColumns);// 补充输出目的的表名String sinkTable = tmsConfigDimBean.getSinkTable();afterJsonObj.put("sink_table", sinkTable);// 补充rowKeyString sinkPk = tmsConfigDimBean.getSinkPk();afterJsonObj.put("sink_pk", sinkPk);// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)String op = jsonObj.getString("op");if ("u".equals(op)) {afterJsonObj.put("op", op);// 从配置表中获取当前维度表关联的外键名String foreignKeys = tmsConfigDimBean.getForeignKeys();// 定义个json对象,用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj = new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr = foreignKeys.split(",");for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before = jsonObj.getJSONObject("before");String foreignKeyBefore = before.getString(foreignName);String foreignKeyAfter = afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put("foreign_key", foreignjsonObj);}// 将维度数据传递out.collect(afterJsonObj);}}private void filterColum(JSONObject afterJsonObj, String sinkColumns) {String[] fieldArr = sinkColumns.split(",");List<String> fieldList = Arrays.asList(fieldArr);Set<Map.Entry<String, Object>> entrySet = afterJsonObj.entrySet();entrySet.removeIf(entry -> !fieldList.contains(entry.getKey()));}@Overridepublic void processBroadcastElement(String jsonStr, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {JSONObject jsonObj = JSON.parseObject(jsonStr);// 获取广播状态BroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 获取对配置表的操作类型String op = jsonObj.getString("op");if ("d".equals(op)) {String sourceTable = jsonObj.getJSONObject("before").getString("source_table");broadcastState.remove(sourceTable);configMap.remove(sourceTable);} else {TmsConfigDimBean configDimBean = jsonObj.getObject("after", TmsConfigDimBean.class);String sourceTable = configDimBean.getSourceTable();broadcastState.put(sourceTable, configDimBean);configMap.put(sourceTable, configDimBean);}}
}

10. DimSinkFunction

添加清除代码

// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉if ("u".equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));// 删除当前维度数据在Redis中对应外键的缓存Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();for (Map.Entry<String, Object> entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}

完整代码

package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.HbaseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;import java.util.Map;
import java.util.Set;public class DimSinkFunction implements SinkFunction<JSONObject> {public void invoke(JSONObject jsonObj, Context context) throws Exception {// 获取输出目的地表名和rowKeyString sinkTable = jsonObj.getString("sink_table");String sinkPk = jsonObj.getString("sink_pk");jsonObj.remove("sink_table");jsonObj.remove("sink_pk");String op = jsonObj.getString("op");jsonObj.remove("op");JSONObject foreignKeyJsonObj = jsonObj.getJSONObject("foreign_key");jsonObj.remove("foreign_key");// 获取json中的每一个键值对Set<Map.Entry<String, Object>> entrySet = jsonObj.entrySet();Put put = new Put(jsonObj.getString(sinkPk).getBytes());for (Map.Entry<String, Object> entry : entrySet) {if (!sinkPk.equals(entry.getKey())) {put.addColumn("info".getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());}}System.out.println("向hbase表中插入数据");HbaseUtil.putPow(TmsConfig.HBASE_NAMESPACE, sinkTable, put);// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉if ("u".equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));// 删除当前维度数据在Redis中对应外键的缓存Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();for (Map.Entry<String, Object> entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}}
}

5.写入CK

1. ClickHouseUtil

先导入需要的依赖。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId></exclusion></exclusions>
</dependency>

在这里插入图片描述
ClickHouseUtil

package com.atguigu.tms.realtime.utils;import com.atguigu.tms.realtime.beans.TransientSink;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class ClickHouseUtil {// 获取SinkFunctionpublic static <T> SinkFunction<T> getJdbcSink(String sql) {SinkFunction<T> sinkFunction = JdbcSink.<T>sink(sql,new JdbcStatementBuilder<T>() {@Overridepublic void accept(PreparedStatement ps, T obj) throws SQLException {// 将流中对象的属性给问号占位符赋值// 获取单签流中对象岁数的类型 以及类中的属性Field[] fieldsArr = obj.getClass().getDeclaredFields();// 遍历所有属性int skipNum = 0;for (int i = 0; i < fieldsArr.length; i++) {Field field = fieldsArr[i];// 判断当前属性是否需要向流中保存TransientSink transientSink = field.getAnnotation(TransientSink.class);if (transientSink != null) {skipNum++;continue;}// 设置私有属性的访问权限field.setAccessible(true);try {Object fieldValue = field.get(obj);ps.setObject(i + 1 - skipNum, fieldValue);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}}},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(3000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(TmsConfig.CLICKHOUSE_DRIVER).withUrl(TmsConfig.CLICKHOUSE_URL).build());return sinkFunction;}
}

2.TransientSink

package com.atguigu.tms.realtime.beans;// 自定义主键 用于标记不需要向ck中保存的属性import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}

二、代码测试

1.程序启动

根据代码逻辑,我们需要启动以下程序。
hdfs、zk、kafka、hbase、redise、ck、OdsApp、DwdBoundRelevantApp、DimApp和DwsBoundOrgSortDay。其中DimApp只需启动一次完成维度数据更新即可。

2.修改kafka分区

再从kafka读取数据时,应该保证kafka有4个分区,不然聚合无法成功。

kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_bound_sort --partitions 4

3.ck建表

1.建库

DROP DATABASE IF EXISTS tms_realtime;
CREATE DATABASE IF NOT EXISTS tms_realtime;
USE tms_realtime;

2.建表

DROP TABLE IF EXISTS dws_bound_org_sort_day_base;
CREATE TABLE IF NOT EXISTS dws_bound_org_sort_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`province_id` String COMMENT '省份ID',`province_name` String COMMENT '省份名称',`sort_count_base` UInt64 COMMENT '分拣次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);

3.物化视图

DROP VIEW IF EXISTS dws_bound_org_sort_day;
CREATE MATERIALIZED VIEW IF NOT EXISTS dws_bound_org_sort_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `province_id` String, `province_name` String, `sort_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(sort_count_base, ts) AS sort_count
FROM dws_bound_org_sort_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;

4.查看结果

当运行程序后,开始生成数据,等待执行完成之后,可以在ck中使用如下代码查看。

clickhouse-client -m

-m 参数代表可以使用回车。

SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(sort_count) AS sort_count
FROM dws_bound_org_sort_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;

在这里插入图片描述


总结

至此,Dws的部分搭建就结束了,为了方便进行文件管理,我把项目开源到了github上。
项目地址:https://github.com/lcc-666/tms-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/596070.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

遥测终端机——连接智能世界的桥梁

在当今的智能化时代&#xff0c;数据的重要性日益凸显。各个行业都需要对数据进行实时监测、处理和分析&#xff0c;以提升生产效率、优化运营管理。遥测终端机作为连接智能世界的桥梁&#xff0c;正逐渐成为各行业的必备设备。 遥测终端机是一种集数据采集、存储、传输和管理于…

小红书12月内容趋势分析

为洞察小红书平台的内容创作趋势及品牌营销策略&#xff0c;新红推出12月月度榜单&#xff0c;从创作者、品牌、热搜词多方面入手&#xff0c;解析月榜数据&#xff0c;为从业者提供参考。 以下为12月部分榜单解析&#xff0c;想要查看更多行业榜单&#xff0c;创作优质内容&am…

vue3+Cesium 添加地面测控站台

效果 cesiumRadar.js import * as Cesium from cesium; export function addentities(viewer, res, index) {viewer.entities.add({id: index,position: Cesium.Cartesian3.fromDegrees(res[0], res[1]),wall: {positions: new Cesium.CallbackProperty(() > {return Cesiu…

kubernetes(K8s)的使用和常用命令

K8S kubernetes&#xff0c;由于k和s之间有8个字符&#xff0c;所以简称k8s&#xff0c;是一个全新的基于容器技术的分布式架构领先方案&#xff0c;是谷歌严格保密十几年的秘密武器----Borg系统的一个开源版本&#xff0c;于2015年7月发布第一个正式版本&#xff0c;它的本质…

【电商项目实战】实现订单超时支付取消

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《电商项目实战》。&#x1f3af;&#x1f3af; &am…

大创项目推荐 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

网络通信(9)-C#TCP服务端实例

本文使用Socket在C#语言环境下完成TCP服务端的实例。 实例完成的功能: 服务器能够连接多个客户端显示在列表中,实现实时刷新。 服务器接收客户端的字符串数据。 选中列表中的客户端发送字符串数据。 在VS中创建C# Winform项目,编辑界面,如下: UI文件 namespace MyTc…

多元线性回归案例--客户价值模型

文章目录 step 1&#xff1a;读取数据step 2&#xff1a;搭建模型step 3&#xff1a;构造回归方程step 4&#xff1a;评估模型 利用多元线性回归模型可以根据多个因素来预测客户价值&#xff0c;当模型搭建完成后&#xff0c;便可对不同价值的客户采用不同的业务策略。 这里以信…

RuoYi-Cloud-Plus使用minio进行文件上传图片后无法预览解决_修改minio配置minio桶权限---SpringCloud工作笔记198

在文件管理的位置,发现刚刚上传的图片文件,会显示 预览图片失败 后来经过多方查看,发现是minio的配置的问题 可以从这里: 可以看到首先登录RuoYi-Cloud-Plus系统然后,打开文件管理页面可以看到,当上传了图片文件以后 显示文件展示中,文件预览失败,那么这个时候,去修改minio的配…

python识别验证码+灰度图片base64转换图片

一、为后面识别验证码准备 1、base64转换为图片&#xff0c;保存本地、并且置灰 上文中的base64,后面的就是包含Base64编码的PNG图像的字符串复制下来 import base64 from PIL import Image import io# 这里是你的Base64编码的字符串 base64_data "iVBORw0KGgoAAAANSUhE…

鸿鹄电子招投标系统:源码级别解析电子招投标的精髓

招投标管理系统是一个集门户管理、立项管理、采购项目管理、采购公告管理、考核管理、报表管理、评审管理、企业管理、采购管理和系统管理于一体的综合性应用平台。它适用于招标代理、政府采购、企业采购和工程交易等业务的企业&#xff0c;旨在提高项目管理的效率和质量。该系…

大数据HCIE成神之路之特征工程——特征选择

特征选择 1.1 特征选择 - Filter方法1.1.1 实验任务1.1.1.1 实验背景1.1.1.2 实验目标1.1.1.3 实验数据解析1.1.1.4 实验思路 1.1.2 实验操作步骤 1.2 特征选择 - Wrapper方法1.2.1 实验任务1.2.1.1 实验背景1.2.1.2 实验目标1.2.1.3 实验数据解析1.2.1.4 实验思路 1.2.2 实验操…

【Spring】19 AOP介绍及实例详解

文章目录 1. 定义1&#xff09;什么意思呢&#xff1f;2&#xff09;如何解决呢&#xff1f; 2. 基本概念1&#xff09;切面&#xff08;Aspect&#xff09;2&#xff09;切点&#xff08;Pointcut&#xff09;3&#xff09;通知&#xff08;Advice&#xff09;4&#xff09;连…

iOS 组件开发教程——手把手轻松实现灵动岛

1、先在项目里创建一个Widget Target 2、一定要勾选 Include live Activity&#xff0c;然后输入名称&#xff0c;点击完成既可。 3、在 Info.plist 文件中声明开启&#xff0c;打开 Info.plist 文件添加 NSSupportsLiveActivities&#xff0c;并将其布尔值设置为 YES。 4、我…

MySQL之四大引擎、建库建表以及账号管理

目录 一. 数据库存储引擎 1.1 存储引擎查看 1.2 InnoDB 1.3 MyISAM 1.4 MEMORY 1.5 ARCHIVE 二. 数据库管理 2.1 元数据库简介 2.2 元数据库分类 2.3 数据库的增删改查及使用&#xff1a; 2.4 MySQL库的权限 三. 数据表管理 3.1 三大范式 3.2 基本数据类型 3.2.1 优化原则 3…

这个方法可以让你把图片无损放大

随着数字技术的不断发展&#xff0c;照片无损放大已经成为了摄影领域中的一项重要技术。照片无损放大能够让摄影师在不损失细节和画质的情况下&#xff0c;将照片放大到更大的尺寸&#xff0c;从而让观众能够更加清晰地欣赏到照片中的每一个细节。 今天推荐的这款软件主要是通…

Mysql隔离级别MVCC多版本并发控制机制

欢迎大家关注我的微信公众号&#xff1a; 传送门&#xff1a;Mysql事务原理与优化 目录 概述 undo日志版本链与read view机制详解 深入浅出分析MVCC可见性算法的操作示例 关于readview和可见性算法的原理解释 总结 概述 在之前的文章中讲过&#xff0c;Mysql在可重…

基于Segformer实现PCB缺陷检测(步骤 + 代码)

导 读 本文主要介绍基于Segformer实现PCB缺陷检测 &#xff0c;并给出步骤和代码。 背景介绍 PCB缺陷检测是电子制造的一个重要方面。利用Segformer等先进模型不仅可以提高准确性&#xff0c;还可以大大减少检测时间。传统方法涉及手动检查&#xff0c;无法扩展且容易出错…

魏副业而战:手机副业新风口,短剧内容创作实操,日赚500+的创业指南

我是魏哥&#xff0c;与其躺平&#xff0c;不如魏副业而战&#xff01; 今天魏哥给大家分享一个短剧推广的副业项目。 有人会有疑惑&#xff0c;短剧推广是去年爆火的副业项目&#xff0c;现在操作是不是有点晚了。 这个大家不要有太多的顾虑。 恰恰相反&#xff0c;短剧推广…

【springboot项目】之秒杀项目常见问题(Seckill)

秒杀问题分为两部分&#xff1a;用户查看商品详情页、用户下单 项目简介&#xff1a; 模拟了高并发场景的商城系统&#xff0c;它具备秒杀功能&#xff0c;为了解决秒杀场景下的高并发问题。引入了 redis 作为缓存中间件&#xff0c;1.主要作用是缓存预热、预减库存等等。2.针…