flink sink kafka的事务提交现象猜想

现象 

查看flink源码时 sink kafka有事务提交机制,查看源码发现是使用两阶段提交策略,而事务提交是checkpoint完成后才执行,那么如果checkpoint设置间隔时间比较长时,事务未提交之前,后端应该消费不到数据,而观察实际现象为写入kafka的消费数据可以立马消费。

测试用例

测试流程

  1. 编写任务1,设置较长的checkpoint时间,并且指定 CheckpointingMode.EXACTLY_ONCE,输出输出到kafka。
  2. 编写任务2消费任务的结果topic,打印控制台,验证结果。
  3. 根据现象查看源码,分析原因。

测试用例

测试任务1

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000*60l, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");//        超时时间,checkpoint没在时间内完成则丢弃env.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);//最小间隔时间(前一次结束时间,与下一次开始时间间隔)env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//        当 Flink 任务取消时,保留外部保存的 checkpoint 信息KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test001").setGroupId("my-group")
//                .setStartingOffsets(OffsetsInitializer()).setStartingOffsets(OffsetsInitializer.committedOffsets()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 从文件读取数据
//        DataStream<SensorReading> dataStream = env.addSource( new SourceTest4.MySensorSource() );DataStream<String> map = kafkaSource.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s;}});Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可properties.setProperty("transaction.timeout.ms","900000");
//        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("192.168.65.128:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("test002").setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();map.sinkTo(sink);// 打印输出env.execute();

测试任务2

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000*150l, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
//        当 Flink 任务取消时,保留外部保存的 checkpoint 信息Properties properties1 = new Properties();
//        properties1.put("isolation.level","read_committed");KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test002").setGroupId("my-group2").setProperties(properties1).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSource.print(" test2接受数据");// 打印输出env.execute();

测试结果分析

测试结果:

任务1开启后,无论是否执行checkpoint,任务checkpoint都可以正常消费数据,与预期不符合。

原因排查

查看kafkaSink 的源码,找到跟与两阶段提交相关的代码,1.18源码中TwoPhaseCommittingSink有重构。kafkasink实现TwoPhaseCommittingSink接口实现,创建Commiter和Writer。

@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;Committer<CommT> createCommitter() throws IOException;SimpleVersionedSerializer<CommT> getCommittableSerializer();@PublicEvolvingpublic interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {Collection<CommT> prepareCommit() throws IOException, InterruptedException;}
}--------------------------------------
public class KafkaSink<IN>implements StatefulSink<IN, KafkaWriterState>,TwoPhaseCommittingSink<IN, KafkaCommittable> {private final DeliveryGuarantee deliveryGuarantee;private final KafkaRecordSerializationSchema<IN> recordSerializer;private final Properties kafkaProducerConfig;private final String transactionalIdPrefix;KafkaSink(DeliveryGuarantee deliveryGuarantee,Properties kafkaProducerConfig,String transactionalIdPrefix,KafkaRecordSerializationSchema<IN> recordSerializer) {this.deliveryGuarantee = deliveryGuarantee;this.kafkaProducerConfig = kafkaProducerConfig;this.transactionalIdPrefix = transactionalIdPrefix;this.recordSerializer = recordSerializer;}/*** Create a {@link KafkaSinkBuilder} to construct a new {@link KafkaSink}.** @param <IN> type of incoming records* @return {@link KafkaSinkBuilder}*/public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}
-- 创建Committer@Internal@Overridepublic Committer<KafkaCommittable> createCommitter() throws IOException {return new KafkaCommitter(kafkaProducerConfig);}@Internal@Overridepublic SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {return new KafkaCommittableSerializer();}
-- 创建writer@Internal@Overridepublic KafkaWriter<IN> createWriter(InitContext context) throws IOException {return new KafkaWriter<IN>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,context.asSerializationSchemaInitializationContext(),Collections.emptyList());}@Internal@Overridepublic KafkaWriter<IN> restoreWriter(InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException {return new KafkaWriter<>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,context.asSerializationSchemaInitializationContext(),recoveredState);}@Internal@Overridepublic SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {return new KafkaWriterStateSerializer();}@VisibleForTestingprotected Properties getKafkaProducerConfig() {return kafkaProducerConfig;}
}
KafkaWriter和KafkaCommitter源码,

在KafkaWriter中snapshotState方法中发现如果deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE的开启事务的判断逻辑。

class KafkaWriter<IN>implements StatefulSink.StatefulSinkWriter<IN, KafkaWriterState>,TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, KafkaCommittable> {
.... 省略代码  @Overridepublic Collection<KafkaCommittable> prepareCommit() {if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {return Collections.emptyList();}// only return a KafkaCommittable if the current transaction has been written some dataif (currentProducer.hasRecordsInTransaction()) {final List<KafkaCommittable> committables =Collections.singletonList(KafkaCommittable.of(currentProducer, producerPool::add));LOG.debug("Committing {} committables.", committables);return committables;}// otherwise, we commit the empty transaction as is (no-op) and just recycle the producercurrentProducer.commitTransaction();producerPool.add(currentProducer);return Collections.emptyList();}@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
-- 开启事务判断        
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {currentProducer = getTransactionalProducer(checkpointId + 1);currentProducer.beginTransaction();}return Collections.singletonList(kafkaWriterState);}
。。。。。
}

 查看 KafkaCommitter的commit()方法发现producer.commitTransaction();操作

/*** Committer implementation for {@link KafkaSink}** <p>The committer is responsible to finalize the Kafka transactions by committing them.*/
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE ="because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n"+ "To avoid data loss, the application will restart.";private final Properties kafkaProducerConfig;@Nullable private FlinkKafkaInternalProducer<?, ?> recoveryProducer;KafkaCommitter(Properties kafkaProducerConfig) {this.kafkaProducerConfig = kafkaProducerConfig;}@Overridepublic void commit(Collection<CommitRequest<KafkaCommittable>> requests)throws IOException, InterruptedException {for (CommitRequest<KafkaCommittable> request : requests) {final KafkaCommittable committable = request.getCommittable();final String transactionalId = committable.getTransactionalId();LOG.debug("Committing Kafka transaction {}", transactionalId);Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =committable.getProducer();FlinkKafkaInternalProducer<?, ?> producer;try {producer =recyclable.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject).orElseGet(() -> getRecoveryProducer(committable));--- 事务提交producer.commitTransaction();producer.flush();recyclable.ifPresent(Recyclable::close);} catch (RetriableException e) {LOG.warn("Encountered retriable exception while committing {}.", transactionalId, e);request.retryLater();} catch (ProducerFencedException e) {......}}}
。。。。
}
分析结果

发现除了设置checkpoint还需要kafkasink单独设置.才会实现输出端的开启事务,因此在任务1中添加设置setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

 KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("192.168.65.128:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("test002").setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

再次验证任务任务2依然可以正常消费。这是有一点头大,不明白为什么?想到既然开启事务肯定有事务的隔离级别,查询了kafka的事务隔离级别,有两种,分别是读已提交和读未提交,默认消费事务是读未提交。

​
kafka的事务隔离级别:
读已提交(Read committed):此隔离级别保证消费者只能读取已经提交的消息。这意味着事务中的消息在提交之前对消费者是不可见的。使用此隔离级别可以避免消费者读取到未提交的事务消息,确保消费者只读取到已经持久化的消息。读未提交(Read Uncommitted):此隔离级别允许消费者读取未提交的消息。这意味着事务中的消息在提交之前就对消费者可见。使用此隔离级别可以实现更低的延迟,但可能会导致消费者读取到未提交的事务消息。​

在任务2中添加isolation.level="read_committed",设定读取消费事务级别为读已提交,再次测试,发现任务1执行完checkpoint前任务2消费不到数据。而命令行可以及时消费任务1的输出topic可可以消费到数据。结果与预期相同。

 Properties properties1 = new Properties();properties1.put("isolation.level","read_committed");KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test002").setGroupId("my-group2").setProperties(properties1)

注意事项

Kafka | Apache Flink

FlinkKafkaProducer 已被弃用并将在 Flink 1.15 中移除,请改用 KafkaSink

官网文档信息

Kafka | Apache Flink

Kafka Consumer 提交 Offset 的行为配置 #

Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。

配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。

  • 禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。

  • 启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。

kafkasink支持语义保证

kafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释:

  • DeliveryGuarantee.NONE 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
  • DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

推荐查看1.14版本和1.18版本结合起来看,在一些细节处理上有差异。

Kafka | Apache Flink

其他源码简介

如果查看1.18版本源码不太好理解两阶段提交,可以查看1.14.5的源码,发现FlinkKafkaProducer被标记废除请改用 KafkaSink,并将在 Flink 1.15 中移除, 在1.14.5中TwoPhaseCommitSinkFunction为抽象类,有明确定开启事务、预提交和提交的抽象方法,比较好理解。

 查看1.14.5版本的KafkaSink 的依赖,发现没有直接使用TwoPhaseCommitSinkFunction,但是查看源码可以看到使用了commiter和kafkawriter对象

public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> {   public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}
-- KafkaWriter 中会判断是否需要开启事务@Overridepublic SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(InitContext context, List<KafkaWriterState> states) throws IOException {final Supplier<MetricGroup> metricGroupSupplier =() -> context.metricGroup().addGroup("user");return new KafkaWriter<>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,new InitContextInitializationContextAdapter(context.getUserCodeClassLoader(), metricGroupSupplier),states);}-- 事务提交在kafkaCommitter@Overridepublic Optional<Committer<KafkaCommittable>> createCommitter() throws IOException {return Optional.of(new KafkaCommitter(kafkaProducerConfig));}@Overridepublic Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter()throws IOException {return Optional.empty();}...
}

KafkaWriter源码

@Overridepublic List<KafkaCommittable> prepareCommit(boolean flush) {if (deliveryGuarantee != DeliveryGuarantee.NONE || flush) {currentProducer.flush();}if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {final List<KafkaCommittable> committables =Collections.singletonList(KafkaCommittable.of(currentProducer, producerPool::add));LOG.debug("Committing {} committables, final commit={}.", committables, flush);return committables;}return Collections.emptyList();}
-- 快照状态开启事务@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {currentProducer = getTransactionalProducer(checkpointId + 1);currentProducer.beginTransaction();}return ImmutableList.of(kafkaWriterState);}

1.14.5 版本TwoPhaseCommitSinkFunction是一个抽象类 在1.18 中是接口

/*** Flink Sink to produce data into a Kafka topic. By default producer will use {@link* FlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link* FlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.** @deprecated Please use {@link org.apache.flink.connector.kafka.sink.KafkaSink}.*/
@Deprecated
@PublicEvolving
public class FlinkKafkaProducer<IN>extends TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer.KafkaTransactionState,FlinkKafkaProducer.KafkaTransactionContext> {。。。}
-- 1.14 版本TwoPhaseCommitSinkFunction 为抽象类@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>implements CheckpointedFunction, CheckpointListener { }-- 1.18 版本
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;Committer<CommT> createCommitter() throws IOException;SimpleVersionedSerializer<CommT> getCommittableSerializer();@PublicEvolvingpublic interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {Collection<CommT> prepareCommit() throws IOException, InterruptedException;}
}

FlinkKafkaProducer继承TwoPhaseCommitSinkFunction,会重写其中的方法,查看重写开启事务的方法
  -- FlinkKafkaProducer 中重写beginTransaction 方法@Overrideprotected FlinkKafkaProducer.KafkaTransactionState beginTransaction()throws FlinkKafkaException {switch (semantic) {case EXACTLY_ONCE:FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
-- 开启kafka的procder的事务producer.beginTransaction();return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);case AT_LEAST_ONCE:case NONE:// Do not create new producer on each beginTransaction() if it is not necessaryfinal FlinkKafkaProducer.KafkaTransactionState currentTransaction =currentTransaction();if (currentTransaction != null && currentTransaction.producer != null) {return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);}return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));default:throw new UnsupportedOperationException("Not implemented semantic");}}

只有当FlinkKafkaProducer.Semantic 为EXACTLY_ONCE时才会开启事务,查看其构造方法

 public FlinkKafkaProducer(String topicId,SerializationSchema<IN> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<IN> customPartitioner,FlinkKafkaProducer.Semantic semantic,int kafkaProducersPoolSize) {this(topicId,null,null,new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema),producerConfig,semantic,kafkaProducersPoolSize);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62732.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetcode 3224. 使差值相等的最少数组改动次数

题目链接&#xff1a;3224. 使差值相等的最少数组改动次数 题目&#xff1a; 给你一个长度为 n 的整数数组 nums &#xff0c;n 是偶数 &#xff0c;同时给你一个整数 k 。 你可以对数组进行一些操作。每次操作中&#xff0c;你可以将数组中任一元素替换为 0 到 k 之间的任一…

Y3编辑器文档4:触发器1(对话、装备、特效、行为树、排行榜、不同步问题)

文章目录 一、触发器简介1.1 触发器界面1.2 ECA语句编辑及快捷键1.3 参数设置1.4 变量设置1.5 实体触发器1.6 函数库与触发器复用 二、触发器的多层结构2.1 子触发器&#xff08;在游戏内对新的事件进行注册&#xff09;2.2 触发器变量作用域2.3 复合条件2.4 循环2.5 计时器2.6…

前端WebSocket应用——聊天实时通信的基本配置

使用 WebSocket 实现实时通信的 Vue 应用 前言1. WebSocketService 类 1.1 类属性1.2 构造函数和连接初始化1.3 WebSocket 连接1.4 事件处理方法1.5 发送和关闭 WebSocket 消息1.6 状态查询与回调注册1.7 完整代码 2. 在 Vue 组件中使用 WebSocketService 2.1 定义 WebSocket …

【开源】A065—基于SpringBoot的库存管理系统的设计与实现

&#x1f64a;作者简介&#xff1a;在校研究生&#xff0c;拥有计算机专业的研究生开发团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看项目链接获取⬇️&#xff0c;记得注明来意哦~&#x1f339; 赠送计算机毕业设计600个选题ex…

基于python实现自动化的验证码识别:探索与实践

基于python实现自动化的验证码识别&#xff1a;探索与实践 一、验证码的类型及特点&#xff08;一&#xff09;图像验证码&#xff08;二&#xff09;短信验证码&#xff08;三&#xff09;语音验证码 二、验证码识别的方法*&#xff08;一&#xff09;传统图像处理方法&#x…

Vue vs. React:两大前端框架的深度对比与分析(一)

前言 在当今快速发展的前端领域中&#xff0c;Vue和React作为两个备受瞩目的前端框架&#xff0c;已经成为许多开发者的首选。这两个框架凭借其出色的设计和强大的功能&#xff0c;在构建现代化、高效性能的Web应用方面扮演着重要角色。 Vue和React都以其独特的特点吸引了众多开…

windows安装使用conda

在Windows系统上安装和使用Conda的详细步骤如下&#xff1a; 一、下载Conda安装包 访问Conda的官方网站Anaconda | The Operating System for AI&#xff0c;点击“Downloads”按钮。在下载页面&#xff0c;选择适合您系统的安装包。通常&#xff0c;对于Windows系统&#xf…

websocket 服务 pinia 全局配置

websocket 方法类 // stores/webSocketStore.ts import { defineStore } from "pinia";interface WebSocketStoreState {ws: WebSocket | null; // WebSocket 实例callbacks: ((message: string) > void)[]; // 消息回调函数列表connected: boolean; // 连接状态…

Ariba Procurement: Administration_Cloud Basics

# SAP Ariba Procurement: Administration_Cloud Basics 认识Ariba Cloud SAP Ariba Procurement 是一个云计算平台… The Ariba Cloud 平台需要简单理解的概念: Datacenter数据中心:SAP Ariba在世界各地有许多数据中心。这些数据中心构成了Ariba云的基本物理基础设施。 …

vulnhub靶场【shenron】--1

前言 靶机&#xff1a;shenron-1 攻击&#xff1a;kali 都采用虚拟机&#xff0c;网卡为桥接模式 主机发现 使用arp-scan -l或netdiscover -r 192.168.1.1/24扫描 信息收集 使用nmap扫描端口 网站信息探测 查看页面&#xff0c;发现是apache2的默认界面&#xff0c;查看…

等保2.0数据库测评之SQL server数据库测评

一、SQL server数据库介绍 SQL server美国Microsoft公司推出的一种关系型数据库系统。SQL Server是一个可扩展的、高性能的、为分布式客户机/服务器计算所设计的数据库管理系统。 本次安装环境为Windows10专业版操作系统&#xff0c;数据库版本为Microsoft SQL Server 2019 (…

无人机之报警器的工作原理!

一、电量监测技术 电量监测是无人机电量指示和报警功能的基础。通过实时监测无人机的电池电量&#xff0c;系统能够准确判断电池的剩余使用时间&#xff0c;并在电量不足时发出报警。电量监测技术通常包括以下几个方面&#xff1a; 电压检测&#xff1a;无人机电池内部通常配…

【pyspark学习从入门到精通23】机器学习库_6

目录 分割连续变量 标准化连续变量 分类 分割连续变量 我们经常处理高度非线性的连续特征&#xff0c;而且只用一个系数很难拟合到我们的模型中。 在这种情况下&#xff0c;可能很难只通过一个系数来解释这样一个特征与目标之间的关系。有时&#xff0c;将值划分到离散的桶中…

解密时序数据库的未来:TDengine Open Day技术沙龙精彩回顾

在数字化时代&#xff0c;开源已成为推动技术创新和知识共享的核心力量&#xff0c;尤其在数据领域&#xff0c;开源技术的涌现不仅促进了行业的快速发展&#xff0c;也让更多的开发者和技术爱好者得以参与其中。随着物联网、工业互联网等技术的广泛应用&#xff0c;时序数据库…

QT 使用共享内存 实现进程间通讯

QSharedMemory&#xff1a;如果两个进程运行在同一台机器上&#xff0c;且对性能要求非常高&#xff08;如实时数据共享、图像渲染等&#xff09;&#xff0c;建议使用共享内存。 优点&#xff1a; 高性能&#xff1a; 共享内存是进程间通信的最快方式之一&#xff0c;因为数…

在Scala中对隐式转换格式与作用

隐式对象 格式&#xff1a;implicit object 作用&#xff1a;给函数的默认参数提供隐式值 object Scala12______10 { // case class DataBase(driver: String, url: String) // // implicit object mySql extends DataBase("mysql", "localhost:300") //…

go build command

文章目录 1.简介2.格式3.选项4.示例5.小结参考文献 1.简介 go build 是 Go 语言工具链中的一个命令&#xff0c;它用于编译 Go 源代码并生成可执行文件。 2.格式 go build [-o output] [build flags] [packages]可选的 -o 选项强制 build 将生成的可执行文件或对象写入指定的…

OpenCV实验:图片加水印

第二篇&#xff1a;图片添加水印&#xff08;加 logo&#xff09; 1. 实验原理 水印原理&#xff1a; 图片添加水印是图像叠加的一种应用&#xff0c;分为透明水印和不透明水印。水印的实现通常依赖于像素值操作&#xff0c;将水印图片融合到目标图片中&#xff0c;常用的方法…

WinDbg 中使用 !process 命令

PROCESS 81a979d0 SessionId: 0 Cid: 0210 Peb: 7ffda000 ParentCid: 063cDirBase: 145b9000 ObjectTable: e12fed70 HandleCount: 53.Image: Dbgview.exe 1. PROCESS 81a979d0 意义&#xff1a;PROCESS 是该进程对象的内核地址。用途&#xff1a;可以使用这个地址获…

深入解析下oracle的number底层存储格式

oracle数据库中&#xff0c;number数据类型用来存储数值数据&#xff0c;它既可以存储负数数值&#xff0c;也可以存储正数数值。相对于其他类型数据&#xff0c;number格式的数据底层存储格式要复杂得多。今天我们就详细探究下oracle的number底层存储格式。 一、环境搭建 1.…