探究Kafka原理-4.API使用

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 练习
    • 生产者
    • Ack 应答机制
    • 消费者重要参数
  • API 开发:topic 管理
    • 列出主题
    • 查看主题信息
    • 创建主题
    • 删除主题
    • 其他管理
  • 练习
    • 消费者
    • 消费者

练习

需求:

写一个生产者,不断地去生成“用户行为事件” 并写入kafka

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

写一个消费者,不断地从kafka种取消费如上 “ 用户行为事件” 数据,并做统计计算,每五分钟,输出一次截至到当时的数据中出现过的用户总数。

生产者

public class KafkaTest{public static void main(String[] args){MyDataGen myDataGen = new MyDataGen();myDataGen.genData();}
}/*
业务数据生成器
*/
class MyDataGen{Producer<String, String> produce;public MyDataGen(){Properties props = new Properties();//设置 kafka 集群的地址props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");//序列化器props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");//ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的
props.put("acks", "all")producer = new KafkaProducer<>(props);}public void genData(){UserEvent userEvent = new UserEvent();while(true){// 造一条随机用户行为事件数据对象userEvent.setGuid(RandomUtils.nextInt(1,10000));userEvent.setEventId(RandomStringUtils.randomAlphabetic(5,8));userEvent.setTimeStamp(System.currentTimeMills());// 转成json串String json = JSON.toJSONString(userEvent);// 讲业务数据封装成ProducerRecord对象ProducerRecord<String,String> record = new ProducerRecord<String,String>("test",json);// 用producer写入kafkaproducer.send(record);Thread.sleep(RandomUtils.nextInt(500,2000));}}
}@NoArgsConStructor
@AllArgConstructor
@Getter
@Setter
class UserEvent{private long guid;private String eventId;private long timeStamp;
}

Ack 应答机制

Ack 应答机制参数配置

在这里插入图片描述

  • 0 : 生产者发出消息后不等待服务端的确认
  • 1 : 生产者发出消息后要求服务端的分区 leader 确保数据存储成功后发送一个确认信息
  • -1: 生产者发出消息后要求服务端的分区的 ISR 副本全部同步成功后发送一个确认信息

生产者的 ack=all,也不能完全保证数据发送的 100%可靠性

为什么?因为,如果服务端目标 partition 的同步副本只有 leader 自己了,此时,它收到数据就会给生产者反馈成功!

可以修改服务端的一个参数(分区最小 ISR 数[min.insync.replicas]>=2),来避免此问题;

消费者重要参数

fetch.min.bytes=1B 一次拉取的最小字节数

fetch.max.bytes=50M 一次拉取的最大数据量

fetch.max.wait.ms=500ms 拉取时的最大等待时长

max.partition.fetch.bytes = 1MB 每个分区一次拉取的最大数据量

max.poll.records=500 一次拉取的最大条数

connections.max.idle.ms=540000ms 网络连接的最大闲置时长

request.timeout.ms=30000ms 一次请求等待响应的最大超时时间 consumer 等待请求响应的最长时间

metadata.max.age.ms=300000 元数据在限定时间内没有进行更新,则会被强制更新

reconnect.backoff.ms=50ms 尝试重新连接指定主机之前的退避时间

retry.backoff.ms=100ms 尝试重新拉取数据的重试间隔

isolation.level=read_uncommitted 隔离级别! 决定消费者能读到什么样的数据
read_uncommitted:可以消费到 LSO(LastStableOffset)位置;
read_committed:可以消费到 HW(High Watermark)位置

max.poll.interval.ms 超过时限没有发起 poll 操作,则消费组认为该消费者已离开消费组

enable.auto.commit=true 开启消费位移的自动提交

auto.commit.interval.ms=5000 自动提交消费位移的时间间隔

API 开发:topic 管理

如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。如下所示:

在这里插入图片描述

为什么在网页上,或者用命令,都能做到topic的创建、删除、列出、查看详情,其底层逻辑是什么?

其本质上是通过不同方式调用了kafka提供的创建topic api

在这里插入图片描述

底层逻辑:web的后端java程序中,去调用相关的api

工具类 KafkaAdminClient 可以用来管理 broker、配置和 ACL (Access Control List),管理 topic

在这里插入图片描述

构造一个 KafkaAdminClient

AdminClient adminClient = KafkaAdminClient.create(props);

列出主题

ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> topics = listTopicsResult.names().get();
System.out.println(topics);

查看主题信息

DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3"));
Map<String, TopicDescription> res = describeTopicsResult.all().get();
Set<String> ksets = res.keySet();
for (String k : ksets) {System.out.println(res.get(k));
}

创建主题

// 参数配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092,doit02:9092,doit03:
9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);// 创建 admin client 对象
AdminClient adminClient = KafkaAdminClient.create(props);
// 由服务端 controller 自行分配分区及副本所在 broker
NewTopic tpc_3 = new NewTopic("tpc_3", 2, (short) 1);
// 手动指定分区及副本的 broker 分配
HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>();
// 分区 0,分配到 broker0,broker1
replicaAssignments.put(0,Arrays.asList(0,1));
// 分区 1,分配到 broker0,broker2
replicaAssignments.put(1,Arrays.asList(0,1));NewTopic tpc_4 = new NewTopic("tpc_4", replicaAssignments);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));// 从 future 中等待服务端返回
try {result.all().get();
} catch (Exception e) {e.printStackTrace();
}
adminClient.close();

删除主题

DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1",
"tpc_1"));
Map<String, KafkaFuture<Void>> values = deleteTopicsResult.values();
System.out.println(values);

其他管理

除了进行 topic 管理外,KafkaAdminClient 也可进行诸如动态参数管理,分区管理等各类管理操作;

练习

需求:

写一个生产者,不断地去生成“用户行为事件” 并写入kafka

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

写一个消费者,不断地从kafka种取消费如上 “ 用户行为事件” 数据,并做统计计算,每五分钟,输出一次截至到当时的数据中出现过的用户总数。 本质 去重guid数

消费者

首先分析思路,可以使用hashmap来进行统计思路,如下所示:

在这里插入图片描述

如果存在线程安全的问题,可以使用ConcurrentHashMap。

还有一个业务需求就是 每五分钟去输出一个结果,这个时机、节奏,如何去把控。

实际上就是一个线程源源不断的拉数据,一个线程定期的输出结果。

public class consumer{public static void main(String[] args){ConcurrentHashMap<Long, String> guidMap = new ConcurrentHashMap<>();// 启动数据消费线程new Thread(new ConsumeRunnable(guidMap)).start();// 启动统计及消费输出结果的线程(每5s输出一次)// 优雅一点实现定时调度,可以用各种定时调度器(有第三方的,也可以用JDK自己的,Timer)Timer timer = new Timer();timer.scheduleAtFixedRate(new StatisticTask(guidMap),0,1000);}
}class ConsumeRunnable implements Runnable{ConcurrentHashMap<Long, String> guidMap;public ConsumeRunnable(ConcurrentHashMap<Long, String> guidMap){this.guidMap = guidMap;}public void run(){Properties props = new Properties();// 定义 kakfa 服务的地址,不需要将所有 broker 指定上props.put("bootstrap.servers", "doitedu01);// key 的反序列化类props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// value 的反序列化类props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, noneprops.put("auto.offset.reset","latest");// 制定 consumer groupprops.put("group.id", "g1");while (true) {// 读取数据,读取超时时间为 5000msConsumerRecords<String, String> records = consumer.poll(5000);for (ConsumerRecord<String, String> record : records)String eventJson = record.value();UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);guidMap.put(userEvent.getGuid(), "");}}
}class StatisticTask extends TimerTask{ConcurrentHashMap<Long, String> guidMap;public StatisticTask(ConcurrentHashMap<Long, String> guidMap){this.guidMap = guidMap;}public void run(){System.out.println("截止到当前的用户总数为:"+guidMap.size());}
}

ConcurrentHashMap 中无论 key 还是 Value都不能填 null值,具体在源码中有体现:

其 put流程如下:

	public V put(K key, V value) {return putVal(key, value, false);// onlyIfAbsent如果是false,那么每次都会用新值替换掉旧值}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 其中 spread 方法会综合高位低位, 具有更好的 hash 性int hash = spread(key.hashCode());int binCount = 0;// 死循环for (Node<K,V>[] tab = table;;) {// f 是链表头节点// fh 是链表头结点的 hash// i 是链表在 table 中的下标Node<K,V> f; int n, i, fh;// 要创建 tableif (tab == null || (n = tab.length) == 0)// 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环// 因为是懒惰初始化的,所以直到现在才开始创建 初始化使用cas 创建,其它失败得再次进入循环,没有用syn 我们得线程并没有被阻塞住tab = initTable();// 要创建链表头节点else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 添加链表头使用了 cas, 无需 synchronized// 用cas将头节点加进去,如果加入失败了,继续循环if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;}// 帮忙扩容// 其实就是看你的头结点是不是 ForwardingNode,其对应得MOVED是一个负数else if ((fh = f.hash) == MOVED)// 帮忙之后, 进入下一轮循环// 锁住当前的链表,帮助去扩容tab = helpTransfer(tab, f);// 能进入这个else,说明 table既不处于扩容中,也不是处于table的初始化过程中,而且这时肯定发生了锁下标的冲突else {V oldVal = null;// 锁住链表头节点// 并没有锁住整个tab,而是锁住这个桶链表的头节点synchronized (f) {// 再次确认链表头节点没有被移动if (tabAt(tab, i) == f) {// 链表// 链表的头节点hash码大于等于 0 if (fh >= 0) {binCount = 1;// 遍历链表for (Node<K,V> e = f;; ++binCount) {K ek;
// 找到相同的 keyif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;// 更新if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 已经是最后的节点了, 新增 Node, 追加至链表尾if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNodeif ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}// 释放链表头节点的锁}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 增加 size 计数addCount(1L, binCount);return null;}private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;// 这个hash有没有被创建while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)// 让出cpu的使用权,如果cpu的时间片没有其它线程了,那么还是会分给这个线程,只是让他不至于充分利用cpu,少占用一点cpu的时间。Thread.yield();// 尝试将 sizeCtl 设置为 -1(表示初始化 table)// 而其它的线程,再次进入循环,首先 不小于0了,其次,之前的 sc也已经变了,cas失败,再次循环的时候,发现 tab已经不为空了,结束循环else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 计算出下一次要扩容的阈值sc = n - (n >>> 2);}} finally {// 计算出下一次要扩容的阈值sizeCtl = sc;}break;}}return tab;}// check 是之前 binCount 的个数// 运用了 longadder 的思想private final void addCount(long x, int check) {CounterCell[] as; long b, s;if (// 已经有了 counterCells, 向 cell 累加// 累加单元数组不为空(as = counterCells) != null ||// 还没有, 向 baseCount 累加// 一个基础数值累加!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (// 还没有 counterCellsas == null || (m = as.length - 1) < 0 ||// 还没有 cell(a = as[ThreadLocalRandom.getProbe() & m]) == null ||// cell cas 增加计数失败!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 创建累加单元数组和cell, 累加重试fullAddCount(x, uncontended);return;}if (check <= 1)return;// 获取元素个数s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// 看看元素的个数是否大于扩容的阈值while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// newtable 已经创建了,帮忙扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))// 首次调用,因为是懒惰初始化的,所以还没有创建transfer(tab, nt);}// 需要扩容,这时 newtable 未创建else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}

但是这里使用一个hashmap去记录去重的guid是有弊端的。

利用map或者set来记录guid本身,比较占用内存还有性能。

guid是一个Long值,8个byte,如果咱们的用户规模达到了10亿,那你这个set或者map中存储的guid就将达到10亿个。

相当于8 * 1000000000 /1024/1024 = 7629MB相当于一个Hashmap中存储了7G数据

那么有一些经典的数据结构,bitmap、bloomfilter、hyperloglog都可以解决去统计个数判重的场景。

在这里插入图片描述

输出个数:有几个1,就是出现过几种 guid:

之前使用hashmap存,直接存guid,8个byte一个guid == 64bit 现在bitmap里面,记一个guid,只要一个bit

并且hashmap是按需扩容的,这个过程是很浪费性能的,而bigmap要一开始初始化10亿个bit,有点浪费空间。

但是api会对最原始的bitmap进行一些优化,比如说稀疏型的向量的场景中,例如roaringbitmap工具包进行了优化,慢慢的进行了扩容。

public class consumer{public static void main(String[] args){// 使用roaringbitmap来记录RoaringBitmap bitmap = RoaringBitmap.bigmapOf();// 启动数据消费线程new Thread(new ConsumeRunnable(bitmap)).start();// 启动统计及消费输出结果的线程(每5s输出一次)// 优雅一点实现定时调度,可以用各种定时调度器(有第三方的,也可以用JDK自己的,Timer)Timer timer = new Timer();timer.scheduleAtFixedRate(new StatisticTask(bitmap),0,1000);}
}class ConsumeRunnable implements Runnable{RoaringBitmap bitmap;public ConsumeRunnable(RoaringBitmap bitmap){this.bitmap = bitmap;}public void run(){Properties props = new Properties();// 定义 kakfa 服务的地址,不需要将所有 broker 指定上props.put("bootstrap.servers", "doitedu01);// key 的反序列化类props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// value 的反序列化类props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, noneprops.put("auto.offset.reset","latest");// 制定 consumer groupprops.put("group.id", "g1");while (true) {// 读取数据,读取超时时间为 5000msConsumerRecords<String, String> records = consumer.poll(5000);for (ConsumerRecord<String, String> record : records)String eventJson = record.value();UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);bitmap.add(userEvent.getGuid());}}
}class StatisticTask extends TimerTask{RoaringBitmap bitmap;public StatisticTask(RoaringBitmap bitmap){this.bitmap = bitmap;}public void run(){System.out.println("截止到当前的用户总数为:"+bitmap.getCardinality());}
}

但是如果涉及到了多维聚合,那么单纯的使用bitmap可能还不够

比如 省市区 用户数 、 省市 用户数

江苏省,南京市,鼓楼区,5

江苏省,南京市,下关区,6

从用户访问记录中统计各区域的用户数

江苏省,南京市,鼓楼区,[0,1,0,1,1,1,0,1,0,0,0,0] 5

江苏省,南京市,下关区,[1,1,0,0,0,0,0,0,1,1,1,1] 6

要到的:江苏省,南京市 [1,1,0,1,1,1,0,1,1,1,1,1] 10

这也就是使用bitmap来实现高效率的,维度再均衡

所以这也就是数据结构与应用解耦

消费者

需求2:写一个消费者,不断地从kafka中消费如上"用户行为事件",并做出如下加工处理

给每一条数据,添加一个字段来标识,该条数据所属的用户今天是否第一次出现,如是,则标注1,否则标注0

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:1}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:0}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:1}

这个需求可以用HashMap来做

也可以用bitmap来做,启动bitmap.contain()能够判断是是否包含

如果是判重字符串,用bitmap就不太方便了,可以使用bloomfilter,但是bloomfilter会牺牲一定的准确率(因为bloomfilter会误判)

在判重方面,布隆过滤器比bitmap.contain()更适合。

布隆过滤器具有以下优势:

  1. 内存占用更小:布隆过滤器使用位数组和哈希函数来表示数据集合,相比于bitmap.contain()方法,它可以以较小的内存占用来存储大规模的数据集合。
  2. 高效的查询性能:布隆过滤器可以在常数时间内完成判重操作,即使数据集合非常庞大,也能以较高的速度进行查询。而bitmap.contain()方法可能需要遍历整个位图数组,时间复杂度较高。
  3. 可扩展性:布隆过滤器可以动态地添加新的元素,并且支持删除操作。相比之下,bitmap.contain()方法需要预先确定数据范围,不便于动态扩展。

虽然布隆过滤器具有以上优势,但也需要注意它存在一定的误判率(即可能将不存在的元素判断为存在)。因此,在判重的关键场景中,如果对结果的准确性要求非常高,可以使用其他更精确的去重方法进行验证。

public class consumer{public static void main(String[] args){  // 启动数据消费线程new Thread(new ConsumeRunnable()).start();}
}class ConsumeRunnable implements Runnable{BloomFilter<Long> bloomFilter;Properties props;public ConsumeRunnable(){bloomFilter = BloomFilter.create(Funnels.longFunnel(),1000000000,0.01);props = new Properties();// 定义 kakfa 服务的地址,不需要将所有 broker 指定上props.put("bootstrap.servers", "doitedu01);// key 的反序列化类props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// value 的反序列化类props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, noneprops.put("auto.offset.reset","latest");// 制定 consumer groupprops.put("group.id", "g1");}public void run(){while (true) {// 读取数据,读取超时时间为 5000msConsumerRecords<String, String> records = consumer.poll(5000);for (ConsumerRecord<String, String> record : records)String eventJson = record.value();UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);// 去布隆过滤器判断一下boolean mightContain = bloomFilter.mightContain(userEvent.getGuid());if(mightContain){userEvent.setFlag(0);}else{userEvent.setFlag(1);bloomFilter.put(userEvent.getGuid());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/189576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[读论文]meshGPT

概述 任务&#xff1a;无条件生成mesh &#xff08;无颜色&#xff09;数据集&#xff1a;shapenet v2方法&#xff1a;先trian一个auto encoder&#xff0c;用来获得code book&#xff1b;然后trian一个自回归的transformermesh表达&#xff1a;face序列。face按规定的顺序&a…

SAP_ABAP_编程基础_列表_自定义列表 / 多页列表 / 列表页面设置

SAP ABAP 顾问&#xff08;开发工程师&#xff09;能力模型_Terry谈企业数字化的博客-CSDN博客文章浏览阅读494次。目标&#xff1a;基于对SAP abap 顾问能力模型的梳理&#xff0c;给一年左右经验的abaper 快速成长为三年经验提供超级燃料&#xff01;https://blog.csdn.net/j…

WebSocket 前端使用vue3+ts+elementplus 实现连接

1.配置连接 websocket.ts文件如下 import { ElMessage } from "element-plus";interface WebSocketProps {url: string; // websocket地址heartTime?: number; // 心跳时间间隔&#xff0c;默认为 50000 msheartMsg?: string; // 心跳信息&#xff0c;默认为pingr…

TCP 重传、滑动窗口、流量控制、拥塞控制

1&#xff1a;重传机制 超时重传 快速重传 SACK 方法 Duplicate SACK 1&#xff1a;重传机制 超时重传&#xff1a;重传机制的其中一个方式&#xff0c;就是在发送数据时&#xff0c;设定一个定时器&#xff0c;当超过指定的时间后&#xff0c;没有收到对方的ACK确认应答报文…

java:springboot3集成swagger(springdoc-openapi-starter-webmvc-ui)

背景 网上集成 swagger 很多都是 Springfox 那个版本的&#xff0c;但是那个版本已经不更新了&#xff0c;springboot3 集成会报错 Typejavax.servlet.http.HttpServletRequest not present&#xff0c;我尝试了很多才知道现在用 Springdoc 了&#xff0c;今天我们来入门一下 …

OCR原理解析

目录 1.概述 2.应用场景 3.发展历史 4.基于传统算法的OCR技术原理 4.1 图像预处理 4.1.1 灰度化 4.1.2 二值化 4.1.3 去噪 4.1.4 倾斜检测与校正 4.1.4.2 轮廓矫正 4.1.5 透视矫正 4.2 版面分析 4.2.1 连通域检测文本 4.2.2 MSER检测文本 4.3 字符切割 4.3.1 连…

TiDB 在咪咕云原生场景下的实践

导读 咪咕是中国移动旗下的视频科技公司&#xff0c;门户系统是其核心业务之一。 为满足用户的多样化需求&#xff0c;咪咕计划对其数据库进行升级。 经过对中国主流国产数据库的测试评估后&#xff0c;咪咕选择了 TiDB&#xff0c;并成功将其落地于门户系统云化项目。 TiDB 为…

Linux系统之centos7编译安装Python 3.8

前言 CentOS (Community Enterprise Operating System) 是一种基于 Red Hat Enterprise Linux (RHEL) 进行源代码再编译并免费提供给用户的 Linux 操作系统。 CentOS 7 采用了最新的技术和软件包&#xff0c;并提供了强大的功能和稳定性。它适用于各种服务器和工作站应用场景&a…

分布式架构demo

1、外层创建pom 版本管理器 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository…

鉴源实验室 | 汽车网络安全攻击实例解析(三)

作者 | 张璇 上海控安可信软件创新研究院工控网络安全组 来源 | 鉴源实验室 社群 | 添加微信号“TICPShanghai”加入“上海控安51fusa安全社区” 引言&#xff1a;随着现代汽车技术的迅速发展&#xff0c;车辆的进入和启动方式经历了显著的演变。传统的物理钥匙逐渐被无钥匙进…

certbot—30秒部署你的HTTPS,永久免费,自动续约

在之前我已经介绍过部署反向代理的2种方式了。第一种是通过宝塔的反向代理配置然后开启HTTPS。 第二种是通过nginxproxymanager。 今天要给大家分享的是一个 certbot。 Certbot 是一个由 Lets Encrypt 开发的免费开源工具&#xff0c;用于自动化部署和管理 SSL/TLS 证书。它具有…

SpringBoot3.x + mp代码生成器(更新系列)

小伙伴们&#xff0c;有没有这样一个体验&#xff0c;每次开始写一个项目时&#xff0c;搭建项目环境&#xff0c;建entity&#xff0c;mapper&#xff0c;service&#xff0c;controller层文件的感到繁琐&#xff0c;这属实体力活呀&#xff01;然而&#xff0c;自从有了Mybat…

【二分查找】LeetCode1970:你能穿过矩阵的最后一天

本文涉及的基础知识点 二分查找算法合集 作者推荐 动态规划LeetCode2552&#xff1a;优化了6版的1324模式 题目 给你一个下标从 1 开始的二进制矩阵&#xff0c;其中 0 表示陆地&#xff0c;1 表示水域。同时给你 row 和 col 分别表示矩阵中行和列的数目。 一开始在第 0 …

【开源】基于JAVA的大病保险管理系统

项目编号&#xff1a; S 031 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S031&#xff0c;文末获取源码。} 项目编号&#xff1a;S031&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统配置维护2.2 系统参保管理2.3 大…

uniapp是否可以用elementUI等前端UI库、使用步骤以及需要注意的问题

文章目录 uniapp是否可以用elementUI等前端UI库使用方法和步骤问题如何解决 uniapp是否可以用elementUI等前端UI库 在PC端开发uniapp&#xff0c;可以用elementUI&#xff0c;因为elementUI就是PC端的。 在使用uniapp&#xff0c;选择vue2.0时&#xff0c;实测可以用nodejs16的…

java企业财务管理系统springboot+jsp

1、基本内容 &#xff08;1&#xff09;搭建基础环境&#xff0c;下载JDK、开发工具eclipse/idea。 &#xff08;2&#xff09;通过HTML/CSS/JS搭建前端框架。 &#xff08;3&#xff09;下载MySql数据库&#xff0c;设计数据库表&#xff0c;用于存储系统数据。 &#xff08;4…

三、使用CRT连接三台虚拟机

目录 1、建立连接 2、参数配置 3、设置主题,颜色和仿真 1、建立连接

SQL server 2016安装

1、关系数据库的基本概念。 行&#xff1a;每行成为一条“记录”或“元组”&#xff0c;用于描述一个对象的信息。 列&#xff1a;每列称为一个“字段”或“属性”&#xff0c;用于描述对象的一个属性。 2、主键与外键。 主键&#xff1a;键&#xff0c;即关键字。主键由一个或…

Android--Jetpack--Lifecycle详解

富贵本无根&#xff0c;尽从勤里得 一&#xff0c;定义 Lifecycle 是一个具备宿主生命周期感知能力的组件。它持有组件&#xff08;Activity/Fragment&#xff09;生命周期状态信息&#xff0c;并且允许其观察者监听宿主生命周期状态变化。 顾名思义&#xff0c;Lifecycle的主…

TA-Lib学习研究笔记(八)——Momentum Indicators 上

TA-Lib学习研究笔记&#xff08;八&#xff09;——Momentum Indicators 上 Momentum Indicators 动量指标&#xff0c;是最重要的股票分析指标&#xff0c;能够通过数据量化分析价格、成交量&#xff0c;预测股票走势和强度&#xff0c;大部分指标都在股票软件中提供。 1. A…