Kafka消费流程

Kafka消费流程

消息是如何被消费者消费掉的。其中最核心的有以下内容。

1、多线程安全问题

2、群组协调

3、分区再均衡

1.多线程安全问题

当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

对于线程安全,还可以进一步定义:

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

那么如何避免生产者和消费者的线程安全问题呢?

1.1 生产者

KafkaProducer的实现是线程安全的。

KafkaProducer就是一个不可变类。线程安全的,可以在多个线程中共享单个KafkaProducer实例

所有字段用private final修饰,且不提供任何修改方法,这种方式可以确保多线程安全。

image-20240114224103193

如何节约资源的多线程使用KafkaProducer实例

import com.msb.selfserial.User;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 类说明:多线程下使用生产者*/
public class KafkaConProducer {//发送消息的个数private static final int MSG_SIZE = 1000;//负责发送消息的线程池private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static CountDownLatch countDownLatch  = new CountDownLatch(MSG_SIZE);private static User makeUser(int id){User user = new User(id);String userName = "llp_"+id;user.setName(userName);return user;}/*发送消息的任务*/private static class ProduceWorker implements Runnable{private ProducerRecord<String,String> record;private KafkaProducer<String,String> producer;public ProduceWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {this.record = record;this.producer = producer;}public void run() {final String ThreadName = Thread.currentThread().getName();try {producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(null!=exception){exception.printStackTrace();}if(null!=metadata){System.out.println(ThreadName+"|" +String.format("偏移量:%s,分区:%s", metadata.offset(),metadata.partition()));}}});//执行countDown方法,代表一个任务结束,对计数器 - 1countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {for(int i=0;i<MSG_SIZE;i++){User user = makeUser(i);ProducerRecord<String,String> record = new ProducerRecord<String,String>("concurrent-ConsumerOffsets",null,System.currentTimeMillis(), user.getId()+"", user.toString());executorService.submit(new ProduceWorker(record,producer));}//执行await方法,代表等待计数器变为0时,再继续执行countDownLatch.await();System.out.println("生产者消息发送完毕");} catch (Exception e) {e.printStackTrace();} finally {producer.close();executorService.shutdown();}}}

1.2 消费者

KafkaConsumer的实现不是线程安全的

实现消费者多线程最常见的方式: 线程封闭 ——即为每个线程实例化一个 KafkaConsumer对象,各自消费分配的分区消息

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 类说明:多线程下正确的使用消费者,一个线程一个消费者*/
public class KafkaConConsumer {public static final int CONCURRENT_PARTITIONS_COUNT = 2;private static ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_PARTITIONS_COUNT);private static class ConsumerWorker implements Runnable{private KafkaConsumer<String,String> consumer;public ConsumerWorker(Map<String, Object> config, String topic) {Properties properties = new Properties();properties.putAll(config);//一个线程一个消费者this.consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(topic));}public void run() {final String ThreadName = Thread.currentThread().getName();try {while(true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, String> record:records){System.out.println(ThreadName+"|"+String.format("主题:%s,分区:%d,偏移量:%d," +"key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));//do our work}}} finally {consumer.close();}}}public static void main(String[] args) {/*消费配置的实例*/Map<String,Object> properties = new HashMap<String, Object>();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"c_test");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");for(int i = 0; i<CONCURRENT_PARTITIONS_COUNT; i++){//一个线程一个消费者executorService.submit(new ConsumerWorker(properties, "concurrent-ConsumerOffsets"));}}}

测试结果

image-20240114224603931

image-20240114224633281

2.群组协调

消费者要加入群组时,会向群组协调器发送一个JoinGroup请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

image.png

2.1组协调器

组协调器是Kafka服务端自身维护的。

组协调器( GroupCoordinator )可以理解为各个消费者协调器的一个中央处理器, 每个消费者的所有交互都是和组协调器( GroupCoordinator )进行的。

  1. 选举Leader消费者客户端
  2. 处理申请加入组的客户端
  3. 再平衡后同步新的分配方案
  4. 维护与客户端的心跳检测
  5. 管理消费者已消费偏移量,并存储至 __consumer_offset

kafka上的组协调器( GroupCoordinator )协调器有很多,有多少个 __consumer_offset分区, 那么就有多少个组协调器( GroupCoordinator )

默认情况下, __consumer_offset有50个分区, 每个消费组都会对应其中的一个分区,对应的逻辑为 hash(group.id)%分区数。

2.2消费者协调器

每个客户端(消费者的客户端)都会有一个消费者协调器, 他的主要作用就是向组协调器发起请求做交互, 以及处理回调逻辑

  1. 向组协调器发起入组请求
  2. 向组协调器发起同步组请求(如果是Leader客户端,则还会计算分配策略数据放到入参传入)
  3. 发起离组请求
  4. 保持跟组协调器的心跳线程
  5. 向组协调器发送提交已消费偏移量的请求

2.3消费者加入分组的流程

1、客户端启动的时候, 或者重连的时候会发起JoinGroup的请求来申请加入的组中。

2、当前客户端都已经完成JoinGroup之后, 客户端会收到JoinGroup的回调, 然后客户端会再次向组协调器发起SyncGroup的请求来获取新的分配方案

3、当消费者客户端关机/异常 时, 会触发离组LeaveGroup请求。

当然有主动的消费者协调器发起离组请求,也有组协调器一直会有针对每个客户端的心跳检测, 如果监测失败,则就会将这个客户端踢出Group。

4、客户端加入组内后, 会一直保持一个心跳线程,来保持跟组协调器的一个感知。

并且组协调器会针对每个加入组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再平衡。

2.4消费者消费的offset的存储

__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

kafka-consumer-groups.bat --bootstrap-server :9092 --group c_test --describe

image.png

那么如何使用 kafka 提供的脚本查询某消费者组的元数据信息呢?

/*** 类说明:如何根据消费分组找ConsumerOffsets文件*/
public class ConsumerOffsets {public static void main(String[] args) {String groupID = "c_test";// 4System.out.println(Math.abs(groupID.hashCode()) % 50);}
}

image-20240115213202834
__consumer_offsets 的每条消息格式大致如图所示

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值

2.5分区再均衡

当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生。从前面的知识中,我们知道,Kafka中,存在着消费者对分区所有权的关系,

这样无论是消费者变化,比如增加了消费者,新消费者会读取原本由其他消费者读取的分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为 再均衡

再均衡对Kafka很重要,这是消费者群组带来高可用性和伸缩性的关键所在。不过一般情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读取消息的,会造成整个群组一小段时间的不可用。

消费者通过向称为群组协调器的broker(不同的群组有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器认为它已经死亡,就会触发一次再均衡。

心跳由单独的线程负责,相关的控制参数为max.poll.interval.ms。

2.6消费者提交偏移量导致的问题

当我们调用poll方法的时候,broker返回的是生产者写入Kafka但是还没有被消费者读取过的记录,消费者可以使用Kafka来追踪消息在分区里的位置,我们称之为 偏移量 。消费者更新自己读取到哪个消息的操作,我们称之为 提交

消费者是如何提交偏移量的呢?消费者会往一个叫做_consumer_offset的特殊主题发送一个消息,里面会包括每个分区的偏移量。发生了再均衡之后,消费者可能会被分配新的分区,为了能够继续工作,消费者者需要读取每个分区最后一次提交的偏移量,然后从指定的地方,继续做处理。

分区再均衡的例子:

某软件公司,有一个项目,有两块的工作,有两个码农,一个小王、一个小李,一个负责一块(分区消费),干得好好的。突然一天,小王桌子一拍不干了,老子中了5百万了,不跟你们玩了,立马收拾完电脑就走了。这个时候小李就必须承担两块工作,这个时候就是发生了分区再均衡。

过了几天,你入职,一个萝卜一个坑,你就入坑了,你承担了原来小王的工作。这个时候又会发生了分区再均衡。

1)如果提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理,

2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

image.png

2.7 再均衡监听器示例

我们创建一个分区数是3的主题rebalance

kafka-topics.bat --bootstrap-server localhost:9092  --create --topic rebalance --replication-factor 1 --partitions 3

image.png

在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用 subscribe()方法时传进去一个 ConsumerRebalancelistener实例就可以了。

ConsumerRebalancelistener有两个需要实现的方法。

  1. public void
    onPartitionsRevoked( Collection< TopicPartition> partitions)方法会在

再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了

  1. public void
    onPartitionsAssigned( Collection< TopicPartition> partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用。

具体使用,我们先创建一个3分区的主题,然后实验一下,

在再均衡开始之前会触发onPartitionsRevoked方法

在再均衡开始之后会触发onPartitionsAssigned方法

生产者

/*** 类说明:多线程下使用生产者*/
public class RebalanceProducer {//发送消息的个数private static final int MSG_SIZE = 50;//负责发送消息的线程池private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static CountDownLatch countDownLatch  = new CountDownLatch(MSG_SIZE);private static User makeUser(int id){User user = new User(id);String userName = "llp_"+id;user.setName(userName);return user;}/*发送消息的任务*/private static class ProduceWorker implements Runnable{private ProducerRecord<String,String> record;private KafkaProducer<String,String> producer;public ProduceWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {this.record = record;this.producer = producer;}public void run() {final String ThreadName = Thread.currentThread().getName();try {producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(null!=exception){exception.printStackTrace();}if(null!=metadata){System.out.println(ThreadName+"|" +String.format("偏移量:%s,分区:%s", metadata.offset(),metadata.partition()));}}});countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {for(int i=0;i<MSG_SIZE;i++){User user = makeUser(i);ProducerRecord<String,String> record = new ProducerRecord<String,String>("rebalance",null,System.currentTimeMillis(), user.getId()+"", user.toString());executorService.submit(new RebalanceProducer.ProduceWorker(record,producer));Thread.sleep(600);}countDownLatch.await();} catch (Exception e) {e.printStackTrace();} finally {producer.close();executorService.shutdown();}}}

消费者

/*** 类说明:设置了再均衡监听器的消费者*/
public class RebalanceConsumer {public static final String GROUP_ID = "rebalance_consumer";private static ExecutorService executorService = Executors.newFixedThreadPool(3);public static void main(String[] args) throws InterruptedException {//先启动两个消费者new Thread(new ConsumerWorker(false)).start();new Thread(new ConsumerWorker(false)).start();Thread.sleep(5000);//再启动一个消费,这个消费者 运行几次后就会停止消费new Thread(new ConsumerWorker(true)).start();//Thread.sleep(5000000);}
}
/*** 类说明:消费者任务*/
public class ConsumerWorker implements Runnable{private final KafkaConsumer<String,String> consumer;/*用来保存每个消费者当前读取分区的偏移量*/private final Map<TopicPartition, OffsetAndMetadata> currOffsets;private final boolean isStop;public ConsumerWorker(boolean isStop) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的反序列化properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,RebalanceConsumer.GROUP_ID);/*取消自动提交*/properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);this.isStop = isStop;this.consumer = new KafkaConsumer<String, String>(properties);//保存  每个分区的消费偏移量this.currOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();System.out.println("consumer-hashcode:"+consumer.hashCode());consumer.subscribe(Collections.singletonList("rebalance"), new HandlerRebalance(currOffsets,consumer));}public void run() {final String id = Thread.currentThread().getId()+"";int count = 0;TopicPartition topicPartition = null;long offset = 0;try {while(true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));//业务处理//开始事务for(ConsumerRecord<String, String> record:records){System.out.println(id+"|"+String.format("处理主题:%s,分区:%d,偏移量:%d," +"key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));topicPartition = new TopicPartition(record.topic(), record.partition());offset = record.offset()+1;//获取偏移量currOffsets.put(topicPartition,new OffsetAndMetadata(offset, "no"));count++;//执行业务sql}if(currOffsets.size()>0){for(TopicPartition topicPartitionkey:currOffsets.keySet()){HandlerRebalance.partitionOffsetMap.put(topicPartitionkey, currOffsets.get(topicPartitionkey).offset());}//提交事务,同时将业务和偏移量入库(使用HashMap替代)}if(isStop&&count>=5){ //监听线程System.out.println(id+"-将关闭,当前偏移量为:"+currOffsets);consumer.commitSync();//跳出这个循环,最终执行finally中的关闭,此时消费者关闭break;}consumer.commitSync();}} finally {consumer.close();}}
}

在均衡监听器

/*** 类说明:再均衡监听器*/
public class HandlerRebalance implements ConsumerRebalanceListener {/*模拟一个保存分区偏移量的数据库表*/public final static ConcurrentHashMap<TopicPartition,Long>partitionOffsetMap = new ConcurrentHashMap<TopicPartition,Long>();private final Map<TopicPartition, OffsetAndMetadata> currOffsets;private final KafkaConsumer<String,String> consumer;//private final Transaction  tr事务类的实例public HandlerRebalance(Map<TopicPartition, OffsetAndMetadata> currOffsets,KafkaConsumer<String, String> consumer) {this.currOffsets = currOffsets;this.consumer = consumer;}//分区再均衡之前public void onPartitionsRevoked(Collection<TopicPartition> partitions) {final String id = Thread.currentThread().getId()+"";System.out.println(id+"-onPartitionsRevoked参数值为:"+partitions);System.out.println(id+"-服务器准备分区再均衡,提交偏移量。当前偏移量为:"+currOffsets);//我们可以不使用consumer.commitSync(currOffsets);//提交偏移量到kafka,由我们自己维护*///开始事务//偏移量写入数据库System.out.println("分区偏移量表中:"+partitionOffsetMap);for(TopicPartition topicPartition:partitions){partitionOffsetMap.put(topicPartition, currOffsets.get(topicPartition).offset());}consumer.commitSync(currOffsets);//提交业务数和偏移量入库  tr.commit}//分区再均衡完成以后public void onPartitionsAssigned(Collection<TopicPartition> partitions) {final String id = Thread.currentThread().getId()+"";System.out.println(id+"-再均衡完成,onPartitionsAssigned参数值为:"+partitions);System.out.println("分区偏移量表中:"+partitionOffsetMap);for(TopicPartition topicPartition:partitions){System.out.println(id+"-topicPartition"+topicPartition);//模拟从数据库中取得上次的偏移量Long offset = partitionOffsetMap.get(topicPartition);if(offset==null) continue;consumer.seek(topicPartition,partitionOffsetMap.get(topicPartition));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625988.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uni-app的学习【第三节】

五 运行环境判断与跨端兼容 uniapp为开发者提供了一系列基础组件,类似HTML里的基础标签元素,但uni-app的组件与HTML不同,而是与小程序相同,更适合手机端使用。 虽然不推荐使用 HTML 标签,但实际上如果开发者写了`div`等标签,在编译到非H5平台时也会被编译器转换为 `view`…

@RequiresApi(api = Build.VERSION_CODES.O)

问题 RequiresApi(api Build.VERSION_CODES.O) 详细问题 对于代码 // 格式化日期为MySQL的DATE类型格式private String formatDate(LocalDate date) {DateTimeFormatter formatter DateTimeFormatter.ofPattern("yyyy-MM-dd");return date.format(formatter);}o…

C# 面向切面编程之AspectCore初探

写在前面 AspectCore 是Lemon名下的一个国产Aop框架&#xff0c;提供了一个全新的轻量级和模块化的Aop解决方案。面向切面也可以叫做代码拦截&#xff0c;分为静态和动态两种模式&#xff0c;AspectCore 可以实现动态代理&#xff0c;支持程序运行时在内存中“临时”生成 AOP 动…

深入云原生—基于KubeWharf深度剖析-以公司实际应用场景为例深度解读

各位好&#xff0c;这里是难忘&#xff0c;本人对云原生也是研究了2年多了&#xff0c;算是略有所得&#xff0c;本次就来深入云原生—基于KubeWharf深度剖析场景与解读。我们需要先了解一下 KubeWharf&#xff0c;可能很多人都感觉到有点陌生吧&#xff0c;下面我们来一起学习…

助力工业焊缝质量检测,YOLOv7【tiny/l/x】不同系列参数模型开发构建工业焊接场景下钢材管道焊缝质量检测识别分析系统

焊接是一个不陌生但是对于开发来说相对小众的场景&#xff0c;在我们前面的博文开发实践中也有一些相关的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a;《轻量级模型YOLOv5-Lite基于自己的数据集【焊接质量检测】从零构建模型超详细教程》 《基于DeepLabV3Plus…

必示科技助力中国联通智网创新中心通过智能化运维(AIOps)通用能力成熟度3级评估

2023年12月15日&#xff0c;中国信息通信研究院隆重公布了智能化运维AIOps系列标准最新批次评估结果。 必示科技与中国联通智网创新中心合作的“智能IT故障监控定位分析能力建设项目”通过了中国信息通信研究院开展的《智能化运维能力成熟度系列标准 第1部分&#xff1a;通用能…

PHP项目如何自动化测试

开发和测试 测试和开发具有同等重要的作用 从一开始&#xff0c;测试和开发就是相向而行的。测试是开发团队的一支独立的、重要的支柱力量。 测试要具备独立性 独立分析业务需求&#xff0c;独立配置测试环境&#xff0c;独立编写测试脚本&#xff0c;独立开发测试工具。没有…

STM32--7针0.96寸OLED屏幕显示(4线SPI)

本文介绍基于STM32F103C8T60.96寸OLED&#xff08;7针&#xff09;的显示&#xff08;完整程序代码见文末链接&#xff09; 一、简介 OLED&#xff0c;即有机发光二极管&#xff08; Organic Light Emitting Diode&#xff09;。 OLED 由于同时具备自发光&#xff0c;不需背光…

C++ 设计模式之桥接模式

【声明】本题目来源于卡码网&#xff08;题目页面 (kamacoder.com)&#xff09; 【提示&#xff1a;如果不想看文字介绍&#xff0c;可以直接跳转到C编码部分】 【简介】什么是桥接模式 桥接模式&#xff08;Bridge Pattern&#xff09;是⼀种结构型设计模式&#xff0c;它的U…

倍福嵌入式PLC开发团队建设

倍福嵌入式PLC开发工程师确实比较难找&#xff0c;这是因为这个领域需要具备丰富的专业知识和技能&#xff0c;而且经验越丰富的工程师越难找到。以下是一些可能导致倍福嵌入式PLC开发工程师难找的原因&#xff1a; 具备相关技能的工程师数量相对较少&#xff1a;嵌入式PLC开发…

XUbuntu22.04之免费思维导图工具(二百零六)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

Kafka集群的安装与配置

一、安装JDK 1、在usr目录下新建Java目录&#xff0c;然后将下载的JDK拷贝到这个新建的Java目录中1 创建目录命令&#xff1a;mkdir /usr/java 2、进入到Java目录中解压下载的JDK 解压命令&#xff1a;tar -zxvf jdk-18_linux-x64_bin.tar.gz 在1主机上&#xff0c;将安装包…

SQL-用户管理与用户权限

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

FPGA之LUT

由于FPGA需要被反复烧写,它实现组合逻辑的基本结构不可能像ASIC那样通过固定的与非门来完成,而只能采用一种易于反复配置的结构。查找表可以很好地满足这一要求,目前主流FPGA都采用了基于SRAM工艺的查找表结构。LUT本质上就是一个RAM.它把数据事先写入RAM后,每当输入一个信号就…

【Python】tensor格式数据转为图像,并保存图像详解和示例

在项目中遇到一个tensor格式的数据&#xff0c;要保存为图像&#xff0c;此文对转换过程通过示例分享&#xff0c;以记录学习过程和帮助大家遇到同类问题时使用。 import torch import cv2 import numpy as np# 创建一个示例张量&#xff08;tensor&#xff09; input_tensor …

纯c实现顺序表 数据结构大全

我们已经知道数组是连续的内存地址&#xff0c;顺序表是由数组为基础的一种数据结构&#xff0c;拥有比数组更多的功能&#xff0c;在概念上属于线性结构&#xff0c;跟链表不同的是&#xff0c;顺序表在物理结构上也是线性的 什么是数据结构&#xff1f; 当我们想要使⽤⼤量使…

云计算平台建设总体技术方案详细参考

第1章. 基本情况 1.1. 项目名称 XX 公司 XX 云计算平台工程。 1.2. 业主公司 XX 公司。 1.3. 项目背景 1.3.1. XX 技术发展方向 XX&#xff0c;即运用计算机、网络和通信等现代信息技术手段&#xff0c;实现政府组织结构和工作流程的优化重组&#xff0c;超越时间、空间…

开源28181协议视频平台搭建流程

最近项目中用到流媒体平台&#xff0c;java平台负责信令部分&#xff0c;c平台负责流媒体处理&#xff0c;找了评分比较好的开源项目 https://gitee.com/pan648540858/wvp-GB28181-pro 流媒体服务基于 c写的 https://github.com/ZLMediaKit/ZLMediaKit 说明文档&#xff1a;h…

Visual Studio Code常用设置

此处用于记录下本人所使用 VScode 的使用习惯。其中主要包括&#xff1a;界面&#xff0c;主题&#xff0c;光标&#xff0c;文件保存等选项。 VSCode 用户区设置 相关介绍命令行方式进行配置可视化组件方式进行配置 更新 相关介绍 基本原理&#xff1a; Visual Studio Code 会…

【电子通识】各国电源插头标准和电压标准

在使用仪器时&#xff0c;通常会在使用之前去看下规格书。比如安装指南、快速使用指南等等来提取我们需要的信息。 一般大型的仪器供应商会卖往不同的国家&#xff0c;所以都会配置多种电源线。如下所示规格书中对仪器的电源线种类进行了说明。其中有中国、美国、加拿大、日本…