2、Kafka 生产者

3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程
中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,
Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
在这里插入图片描述
3.1.2 生产者重要参数列表

在这里插入图片描述
3.2 异步发送 API
3.2.1 普通异步发送
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
在这里插入图片描述
2)代码编写
(1)创建工程 kafka
(2)导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

(3)创建包名:com.atguigu.kafka.producer
(4)编写不带回调函数的 API 代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发
送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3.3 同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {public static void main(String[] args) throws
InterruptedException, ExecutionException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息for (int i = 0; i < 10; i++) {// 异步发送 默认
// kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i));// 同步发送kafkaProducer.send(new 
ProducerRecord<>("first","kafka" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

3.4 生产者分区
3.4.1 分区好处
在这里插入图片描述
3.4.2 生产者发送消息的分区策略
1)默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

在这里插入图片描述
2)案例一
将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first", 
1,"","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

③在 IDEA 控制台观察回调信息。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

3)案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取
余得到 partition 值。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
分别发往 120kafkaProducer.send(new ProducerRecord<>("first", 
"a","prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试:
①key="a"时,在控制台查看结果。

主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

②key="b"时,在控制台查看结果。

主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2

③key="f"时,在控制台查看结果。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.4.3 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
2)实现步骤
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {/* 返回信息对应的分区* @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] 
keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")){partition = 0;}else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws 
InterruptedException {
Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
gu.kafka.producer.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, 
Exception e) {if (e == null){System.out.println(" 主题: " + 
metadata.topic() + "->" + "分区:" + metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

(4)测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

3.5 生产经验——生产者如何提高吞吐量
在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、
lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
} 

测试
①在 hadoop102 上开启 Kafka 消费者。

[hadoop103 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

[hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
prince 0
prince 1
prince 2
prince 3
prince 4

3.6 生产经验——数据可靠性
1)ack 应答原理
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2)代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new 
ProducerRecord<>("first","prince " + i));}// 5. 关闭资源kafkaProducer.close();}
}

3.7 生产经验——数据去重
3.7.1 数据传递语义
在这里插入图片描述
3.7.2 幂等性
1)幂等性原理
在这里插入图片描述
2)如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。

3.7.3 生产者事务
1)Kafka 事务原理
在这里插入图片描述
2)Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

3)单个 Producer,使用事务保证消息的仅一次发送

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {public static void main(String[] args) throws 
InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());// 设置事务 id(必须),事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new 
KafkaProducer<String, String>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {// 发送消息kafkaProducer.send(new ProducerRecord<>("first", 
"prince " + i));}
// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}

3.8 生产经验——数据有序
在这里插入图片描述
3.9 生产经验——数据乱序
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/112531.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV 笔记(2):图像的属性以及像素相关的操作

Part11. 图像的属性 11.1 Mat 的主要属性 在前文中&#xff0c;我们大致了解了 Mat 的基本结构以及它的创建与赋值。接下来我们通过一个例子&#xff0c;来看看 Mat 所包含的常用属性。 先创建一个 3*4 的四通道的矩阵&#xff0c;并打印出其相关的属性&#xff0c;稍后会详细…

安装Sentinel

大家好今天来安装Sentinel . 安装Sentinel 下载 : 大家可以选择相应版本(最新版本1.8.6) 官网下载地址 : Release v1.8.6 alibaba/Sentinel GitHub 链接&#xff1a;Sentinel_免费高速下载|百度网盘-分享无限制 (baidu.com) 提取码&#xff1a;8eh9 运行 : 将jar包放到任…

Youtrack Linux 安装

我们考虑最后应该使用的是 ZIP 方式的安装。 按照官方的说法如何设置运行 YouTrack 应该是非常简单的。 准备环境 根据官方的说法&#xff0c;我们需要做的就是下载 Zip 包&#xff0c;然后把 Zip 包解压到指定的目录中就可以了。 下载 当前官方的下载地址为&#xff1a;Ge…

基于nodejs+vue中学信息技术线上学习系统

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

单片机郭天祥(02)

1&#xff1a;解决keil5软件的乱码问题&#xff0c;修改编码为UTF-8 2&#xff1a;打开keil5使用debug对编写好的程序进行调试 给程序打上断点 使用仿真芯片 更改设备管理器相关设置 接通电源后点击debug连接到51单片机 使用stc-isp获取延时函数 将延时函数添加进入创建好的…

17-spring aop调用过程概述

文章目录 1.源码2. debug过程 1.源码 public class TestAop {public static void main(String[] args) throws Exception {saveGeneratedCGlibProxyFiles(System.getProperty("user.dir") "/proxy");ApplicationContext ac new ClassPathXmlApplication…

在JavaScript中,如何创建一个数组或对象?

在JavaScript中,可以使用以下方式创建数组和对象: 一:创建数组(Array): 1:使用数组字面量(Array Literal)语法,使用方括号 [] 包裹元素,并用逗号分隔: let array1 = []; // 空数组 let array2 = [1, 2, 3]; // 包含三个数字的数组 let array3 = [apple, banana,…

Nautilus Chain 与 Coin98 生态达成合作,加速 Zebec 生态亚洲战略进程

目前&#xff0c;行业内首个模块化 Layer3 架构公链 Nautilus Chain 已经上线主网&#xff0c;揭示了模块化区块链领域迎来了全新的进程。在主网上线后&#xff0c;Nautilus Chain 将扮演 Zebec 生态中最重要的底层设施角色&#xff0c;并将为 Zebec APP 以及 Zebec Payroll 规…

ESP32C3 LuatOS TM1650②动态显示累加整数

--注意:因使用了sys.wait()所有api需要在协程中使用 -- 用法实例 PROJECT "ESP32C3_TM1650" VERSION "1.0.0" _G.sys require("sys") local tm1650 require "tm1650"-- 拆分整数&#xff0c;并把最低位数存放在数组最大索引处 loc…

用Nginx搭建一个具备缓存功能的反向代理服务

在同一台服务器上&#xff0c;使用nginx提供服务&#xff0c;然后使用openresty提供反向代理服务。 参考《Ubuntu 20.04使用源码安装nginx 1.14.0》安装nginx。 参考《用Nginx搭建一个可用的静态资源Web服务器》搭建静态资源Web服务器&#xff0c;但是/nginx/conf/nginx.conf里…

Uniapp软件库源码 全新带勋章功能(包含前后端源码)

Uniapp软件库全新带勋章功能&#xff0c;搭建好后台 在前端找到 util 这个文件 把两个js文件上面的填上自己的域名&#xff0c; 电脑需要下载&#xff1a;HBuilderX 登录账号 没有账号就注册账号&#xff0c;然后上传文件&#xff0c;打包选择 “发行” 可以打包app h5等等。…

异常数据检测 | Python基于Hampel的离群点检测

文章目录 文章概述模型描述源码分享文章概述 在时间序列数据分析领域,识别和处理异常点是至关重要的任务。异常点或离群点是明显偏离预期模式的数据点,可能表明存在错误、欺诈或有价值的见解。 应对这一挑战的一种有效技术是汉普尔过滤器(Hampel Filter)。 模型描述 汉…

spark获取hadoop服务token

spark 作业一直卡在accepted 问题现象问题排查1.查看yarn app日志2.问题分析与原因 问题现象 通过yarn-cluster模式提交spark作业&#xff0c;客户端日志一直卡在submit app&#xff0c;没有运行 问题排查 1.查看yarn app日志 appid已生成&#xff0c;通过yarn查看app状态为…

Note——torch.size() umr_maximum() array.max() itertools.product()

torch.size Problem TypeError: ‘torch.Size’ object is not callable Reason Analysis torch.Size函数不可调用 因为torch只可以.size() 或 shape Solution 将y.shape()替换为y.size() 或 y.shape ytorch.normal(0,0.01,y.size())2 return umr_maximum(a, axis, None…

uniapp接入萤石微信小程序插件

萤石官方提供了一些适用于uniapp / 小程序的方案 如 小程序半屏 hls rtmp 等 都TM有坑 文档写的依托答辩 本文参考了uniapp小程序插件 以及 萤石微信小程序插件接入文档 效果如下 1. 插件申请 登录您的小程序微信公众平台&#xff0c;点击左侧菜单栏&#xff0c;进入设置页…

盒式交换机堆叠配置

目录 1.配置环形拓扑堆叠 2.设备组建堆叠 3.设备组件堆叠 堆叠 istack&#xff0c;是指将多台支持堆叠特性的交换机设备组合在一起&#xff0c;从逻辑上组合成一台交换设备。如图所示&#xff0c;SwitchA与 SwitchB 通过堆叠线缆连接后组成堆叠 istack&#xff0c;对于上游和…

百度地图API:JavaScript开源库几何运算判断点是否在多边形内(电子围栏)

百度地图JavaScript开源库&#xff0c;是一套基于百度地图API二次开发的开源的代码库。目前提供多个lib库&#xff0c;帮助开发者快速实现在地图上添加Marker、自定义信息窗口、标注相关开发、区域限制设置、几何运算、实时交通、检索与公交驾车查询、鼠标绘制工具等功能。 判…

网站批量替换关键词方法

注意替换操作之前先对文件做好备份 1.下载http://downinfo.myhostadmin.net/ultrareplace5.02.rar 解压出来,运行UltraReplace.exe 2.点击菜单栏中的配置&#xff0c;全选所有文件类型,或者根据自己的需求选择部分,如htm、html、php、asp等 3.若替换单个文件,点击文件,若是要…

html 按钮点击倒计时,限制不可点击

html 按钮点击倒计时&#xff0c;限制不可点击 e94cbabd25cfc7f3f53a50a235734c22.jpg <!DOCTYPE html> <html><head><meta http-equiv"Content-Type" content"text/html; charsetutf-8" /><title></title></head&…

飞速(FS)MTP®光纤跳线系列——数据中心布线理想选择

数据中心的重要定位要求其使用的光纤跳线具有高性能和高可靠性。飞速&#xff08;FS&#xff09;MTP光纤产品系列能够以简单的安装方式快速部署高密度链路&#xff0c;优化线缆管理&#xff0c;确保充分利用通道空间&#xff0c;显著减少安装时间和成本。 飞速&#xff08;FS&…