flink重温笔记(十二): flink 高级特性和新特性(1)——End-to-End Exactly-Once(端到端精确一致性语义)

Flink学习笔记

前言:今天是学习 flink 的第 12 天啦!学习了 flink 高级特性和新特性之 End-to-End Exactly-Once(端到端精确一致性语义),主要是解决大数据领域数据从数据源到数据落点的一致性,不会容易造成数据丢失的问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:端到端的一致性语义,说明每一步都算术,每一天的努力都不会白费,明天也要继续努力!


文章目录

  • Flink学习笔记
    • 四、Flink 高级特性和新特性
      • 1. End-to-End Exactly-Once
        • 1.1 流处理的数据语义
          • 1.1.1 At most once(最多一次)
          • 1.1.2 At least once(至少一次)
          • 1.1.3 Exactly once(精确一次)
          • 1.1.4 End to End Exactly once(端到端精确一次)
          • 1.1.5 流计算系统如何支持一次性语义
            • (1) At least once + 去重
            • (2) At least once + 幂等
            • (3) 分布式快照
            • (4) 方法汇总
        • 1.2 End-to-End Exactly-Once 实现
          • 1.2.1 Source
          • 1.2.2 Transformation
          • 1.2.3 sink
        • 1.3 Flink + Kafka 的 End-to-End Exactly Once
          • 1.3.1 版本声明
          • 1.3.2 两阶段提交-API
          • 1.3.3 两阶段提交-流程
        • 1.4 案例演示
          • 1.4.1 Flink + Kafka 实现 End-to-End Exactly Once
          • 1.4.2 Flink + MySQL 实现 End-to-End Exactly Once

四、Flink 高级特性和新特性

1. End-to-End Exactly-Once

1.1 流处理的数据语义

顺序:At most once(最多一次)< At least once(至少一次)< Exactly once(精确一次)< End to End Exactly once(端到端一次)


1.1.1 At most once(最多一次)

最简单的恢复方式,直接从失败的下个数据恢复程序,丢失刚刚失败的数据。


1.1.2 At least once(至少一次)

由于事件是可以重传的,可能造成数据重复。


1.1.3 Exactly once(精确一次)

依赖 checkpoint 机制,回滚恢复数据,保持所有记录仅影响内部状态一次,即不考虑部分数据泄露到下游。


1.1.4 End to End Exactly once(端到端精确一次)

Flink 应用从 Source 端开始到 Sink 端结束,保持所有记录影响内部和外部状态一次,即考虑部分数据泄露到下游。


1.1.5 流计算系统如何支持一次性语义
(1) At least once + 去重


(2) At least once + 幂等


(3) 分布式快照


(4) 方法汇总
Exactly Once 实现方式优点缺点
At least once + 去重1. 故障对性能的影响是局部的;
2. 故障的影响不一定随着拓扑大小而增加
1. 可能需要大量的存储和基础设施来支持;
2. 每个算子的每个事件都有资源开销
At least once + 幂等1. 实现简单,开销较低1. 依赖存储特性和数据特征
分布式快照1. 较小的性能和资源开销1. barrier 同步;
2. 任何算子发生故障都需要全局暂停和状态回滚;
3. 拓扑越大,对性能的潜在影响越大

1.2 End-to-End Exactly-Once 实现
1.2.1 Source

发生故障时需要支持重设数据的读取位置,如Kafka可以通过offset来实现(其他的没有offset系统,可以自己实现累加器计数)


1.2.2 Transformation
  • 分布式快照机制

    • 同 Spark 相比,Spark 仅仅是针对 Driver 的故障恢复 Checkpoint,
    • 而 Flink 的快照可以到算子级别,并且对全局数据也可以做快照,
    • Flink 的分布式快照受到 Chandy-Lamport 分布式快照算法启发,同时进行了量身定做。
  • Barrier

    • 数据栅栏是一个标记,不会干扰正常数据处理,
    • 一个数据源可以有多个 barrier,
    • 多个数据源,快流等慢流。
  • 异步和增量

    • 异步快照不会阻塞任务,
    • 增量快照,每次进行的全量快照是根据上一次更新的。

1.2.3 sink
  • 幂等写入

    • 任意多次向一个系统写入数据,只对目标系统产生一次结果影响。
    • key,和 value 可以控制不重复
  • 事务写入

    • 借鉴数据库的事务机制,结合自身 checkpoint 机制,

    • 分阶段快照,先保存数据不向外部系统提交,checkpoint 确认过上下游一致后,才向外部系统 commit。

    • 实现方式:

      • 1- 预写日志(Write-Ahead-Log,WAL)

        通用性强,但不能保证百分比,因为要写入内存这个易失介质。

      • 2- 两阶段提交(Two-Phase-Commit,2PC)

        如果外部系统自身支持事务(比如MySQL、Kafka),可以使用2PC方式,百分百端到端的Exactly-Once。

    • 缺点:

      • 牺牲了延迟
      • 输出不是实时写入,而是分批写入

1.3 Flink + Kafka 的 End-to-End Exactly Once
1.3.1 版本声明

Flink 1.4 版本之前,支持 Exactly Once 语义,仅限于应用内部。

Flink 1.4 版本之后,通过两阶段提交 (TwoPhaseCommitSinkFunction) 支持 End-To-End Exactly Once,而且要求 Kafka 0.11+。


1.3.2 两阶段提交-API

实现方法封装在抽象类:TwoPhaseCommitSinkFunction ,重写方法:

  • beginTransaction:

    开启事务前,在目标文件系统的临时目录中创建一个临时文件,处理数据时将数据写入此文件;

  • preCommit:

    在预提交阶段,刷写(flush)文件,然后关闭文件,之后就不能写入到文件了,将为下一检查点的任何后续写入启动新事务;

  • commit:

    在提交阶段,将预提交的文件原子性移动到真正的目标目录中,注意,会增加输出数据可见性的延迟;

  • abort:

    在中止阶段,删除临时文件。


1.3.3 两阶段提交-流程
  • 1- 数据源阶段

    对接数据源系统

  • 2- 预提交阶段(pre-commit)-内部状态

    Flink 开始 checkpoint,就会进入 pre-commit 阶段,同时 JobManager 的 Coordinator 会将 Barrier 注入数据流中

  • 3- 预提交阶段(pre-commit)-外部状态

    当所有的 barrier 在算子中成功进行一遍传递(就是 Checkpoint 完成),并完成快照后,则“预提交”阶段完成;

  • 4- commit 阶段

    所有算子完成“预提交”,就会发起一个commit“提交”动作,任何一个“预提交”失败都会回滚到最近的 checkpoint;


1.4 案例演示
1.4.1 Flink + Kafka 实现 End-to-End Exactly Once

例子1:普通方式——内部一致性语义,重点在生产者 API 设置上,只是简单序列化为字节流 SimpleStringSchema

package cn.itcast.day12.endtoend;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** @author lql* @time 2024-03-07 14:51:04* @description TODO:topic:test3 终端生产生数据,控制台打印 topic:test4数据*/
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);//todo 2)判断当前的环境env.setStateBackend(new HashMapStateBackend());if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");}else{env.getCheckpointConfig().setCheckpointStorage(args[0]);}//todo 3)设置checkpoint的其他参数//设置checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(2000L);//同一个时间只能有一个栅栏在运行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置checkpoint的执行模式。仅执行一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置checkpoint最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);//todo 4)接入数据源//指定topic的名称String topicName = "test";//实例化kafkaConsumer对象Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offsetprops.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);//在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是truekafkaSource.setCommitOffsetsOnCheckpoints(true);DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//todo 5)单词计数操作SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//todo 6)单词分组操作SingleOutputStreamOperator<Tuple2<String, Integer>> result_1 = wordAndOne.keyBy(t -> t.f0).sum(1);//todo 7)打印计算结果result_1.print();SingleOutputStreamOperator<String> result = result_1.map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + "_" + value.f1;}});result.printToErr();//todo 8)创建kafka的生产者实例//指定topic的名称String distTopicName = "test1";//实例化FlinkKafkaProducer对象Properties distProps = new Properties();distProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(distTopicName,new SimpleStringSchema(),distProps);// 容错//todo 4)将数据写入到kafkaresult.addSink(myProducer);//todo 8)启动作业env.execute();}
}

结果:在 node1 的 kafka 生产者模式终端输入数据到 test,词频统计结果写入到 topic:test1,但不保证外部一致性语义


例子2:超级方式——内部外部一致性语义

package cn.itcast.day12.endtoend;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** @author lql* @time 2024-03-07 14:51:04* @description TODO:topic:test3 终端生产生数据,控制台打印 topic:test4数据*/
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce_pro {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 在这里就不能开启了,因为 kafka//env.enableCheckpointing(5000);//todo 2)判断当前的环境env.setStateBackend(new HashMapStateBackend());if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");}else{env.getCheckpointConfig().setCheckpointStorage(args[0]);}//todo 3)设置checkpoint的其他参数//设置checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(2000L);//同一个时间只能有一个栅栏在运行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置checkpoint的执行模式。仅执行一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置checkpoint最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);//todo 4)接入数据源//指定topic的名称String topicName = "test";//实例化kafkaConsumer对象Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offsetprops.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);//在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是truekafkaSource.setCommitOffsetsOnCheckpoints(true);DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//todo 5)单词计数操作SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//todo 6)单词分组操作SingleOutputStreamOperator<Tuple2<String, Integer>> result_1 = wordAndOne.keyBy(t -> t.f0).sum(1);//todo 7)打印计算结果result_1.print();SingleOutputStreamOperator<String> result = result_1.map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + "_" + value.f1;}});result.printToErr();//todo 8)创建kafka的生产者实例//指定topic的名称String distTopicName = "test1";//实例化FlinkKafkaProducer对象Properties distProps = new Properties();distProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(distTopicName,new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),distProps,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);//todo 4)将数据写入到kafkaresult.addSink(myProducer);//todo 8)启动作业env.execute();}
}

结果:在 node1 的 kafka 生产者模式终端输入数据到 test,词频统计结果写入到 topic:test1,保证了内外部一致性语义

总结:

  • 在普通模式设置下,需要提前开启 checkpoint 模式
  • 在超级模式设置下,不要提前开启 checkpoint 模式,不然写不进数据
  • 在超级模式设置下,不是简单序列化而是事务写入:
    • new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
    • FlinkKafkaProducer.Semantic.EXACTLY_ONCE

1.4.2 Flink + MySQL 实现 End-to-End Exactly Once

例子:读取 socket 数据,写入 MySQL 数据库,删除数据库数据,也能继续累加结果,实现端到端一致性。

SQL建表:

create table test.t_wordcount
(word   varchar(255) not null primary key,counts int default 0 null
);

代码:

package cn.itcast.day12.endtoend;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class Kafka_Flink_MySQL_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//todo 1)初始化flink流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//todo 2)如果实现端对端一次性语义,必须要开启checkpointenv.enableCheckpointing(5000L);//todo 3)判断当前的环境env.setStateBackend(new HashMapStateBackend());if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");}else{env.getCheckpointConfig().setCheckpointStorage(args[0]);}//todo 4)设置checkpoint的其他参数//设置checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(2000L);//同一个时间只能有一个栅栏在运行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置checkpoint的执行模式。仅执行一次env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置checkpoint最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);//todo 5)接入数据源,读取文件获取数据DataStreamSource<String> lines = env.socketTextStream("node1", 9999);//todo 3)数据处理//  3.1:使用flatMap对单词进行拆分SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {String[] words = line.split(" ");//返回数据for (String word : words) {out.collect(word);}}});//  3.2:对拆分后的单词进行记一次数SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}});//  3.3:使用分组算子对key进行分组KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);//  3.4:对分组后的key进行聚合操作SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);//todo 6)将消费到的数据实时写入mysqlsumed.addSink(new MysqlTwoPhaseCommitSink());//todo 7)运行作业env.execute();}/*** 通过两端递交的方式实现数据写入mysql*/public static class MysqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, ConnectionState, Void> {public MysqlTwoPhaseCommitSink() {super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 每条数据执行一次该方法* @param connectionState* @param value* @param context* @throws Exception*/@Overrideprotected void invoke(ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception {System.err.println("start invoke.......");Connection connection = connectionState.connection;// 插入一条记录,但如果该记录的主键或唯一键已经存在,则更新该记录。PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?");pstm.setString(1, value.f0);pstm.setInt(2, value.f1);pstm.setInt(3, value.f1);// 插入数据一定是 executeUpdatepstm.executeUpdate();pstm.close();//手动制造异常if(value.f0.equals("hive")) {System.out.println(1/0);}}/*** 开启事务* @return* @throws Exception*/@Overrideprotected ConnectionState beginTransaction() throws Exception {System.out.println("=====> beginTransaction... ");Class.forName("com.mysql.jdbc.Driver");//closing inbound before receiving peer's close_notify,链接地址中追加参数:useSSL=falseConnection connection = DriverManager.getConnection("jdbc:mysql://node1:3306/test?characterEncoding=UTF-8&useSSL=false", "root", "123456");connection.setAutoCommit(false);return new ConnectionState(connection);}/*** 预递交* @param connectionState* @throws Exception*/@Overrideprotected void preCommit(ConnectionState connectionState) throws Exception {System.out.println("start preCommit...");}/*** 递交操作* @param connectionState*/@Overrideprotected void commit(ConnectionState connectionState) {System.out.println("start transaction...");Connection connection = connectionState.connection;try {connection.commit();connection.close();} catch (SQLException e) {throw new RuntimeException("提交事物异常");}}/*** 回滚操作* @param connectionState*/@Overrideprotected void abort(ConnectionState connectionState) {System.out.println("start abort...");Connection connection = connectionState.connection;try {connection.rollback();connection.close();} catch (SQLException e) {throw new RuntimeException("回滚事物异常");}}}static class ConnectionState {// transient 的变量能被忽略序列化private final transient Connection connection;ConnectionState(Connection connection) {this.connection = connection;}}
}

结果:mysql 数据库中删除数据后,再次添加数据后,仍会叠加数据。

总结:

  • 1- 两段递交:自定义 sink 中 需要继承 TwoPhaseCommitSinkFunction
  • 2- kyro 序列化连接状态,VoidSerializer 需要接上 INSTANCE 作为 Void 的序列化
  • 3- 开启事务时,要放弃自动提交
  • 4- transient 的变量能被忽略序列化,此处用于连接变量
  • 5- 数据库插入计算时,要使用 executeUpdate

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/730223.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

官宣!百度智能云千帆产品发布会3月21日北京见!

回望2023大模型狂奔的一年&#xff0c;百度智能云千帆大模型平台无疑是浓墨重彩的一笔。自2023年3月27日正式问世后&#xff0c;百度智能云千帆大模型平台以突飞猛进的速度持续发展。从模型、应用到生态&#xff0c;“千帆”书写着自身在大模型时代的答卷。 作为全球首个一站式…

web开发踩坑记

问题&#xff1a;npm install 在纯ipv6环境先无法安装包 现象&#xff1a; ping -6 registry.npmjs.org 是通的&#xff0c;说明可以解析到ipv6地址 解决方法&#xff1a; 在hosts文件中强制添加 registry.npmjs.org 解析到IPV6地址的记录 原因分析&#xff1a; 怀疑是npm的bug…

指针的学习5

目录 sizeof和strlen的区别 sizeof strlen 数组和指针笔试题解析 一维数组 字符数组 二维数组 指针运算笔试题解析 题目1&#xff1a; 题目2&#xff1a; 题目3&#xff1a; 题目4&#xff1a; 题目5&#xff1a; 题目6&#xff1a; 题目7&#xff1a; sizeof和…

Jmeter二次开发实现rsa加密

jmeter函数助手提供了大量的函数&#xff0c;像 counter、digest、random、split、strLen&#xff0c;这些函数在接口测试、性能测试中大量被使用&#xff0c;但是大家在实际工作&#xff0c;形形色色的测试需求不同&#xff0c;导致jmeter自带或者扩展插件给我们提供的函数无法…

MySQL GTID 简介 原理 应用场景 优点 注意事项

GTID&#xff08;Global Transaction Identifier&#xff09;是MySQL数据库中用于唯一标识事务的一种机制。GTID的引入旨在简化复制和故障恢复过程&#xff0c;确保数据一致性。在分布式系统和数据库复制中&#xff0c;GTID提供了一种跨多个数据库实例跟踪事务的方法。 GTID的…

vue3没有this怎么办?

在vue3中,新的组合式API中没有this,那我们如果需要用到this怎么办? 解决方法: getCurrentInstance 方法获取当前组件的实例&#xff0c;然后通过 ctx 或 proxy 属性获得当前上下文&#xff0c;这样我们就能在setup中使用router和vuex了 import { getCurrentInstance } from …

【无标题】linux 命令不生效,卡死

内核直接重启&#xff0c;不需要读取硬盘 [rootlocalhost opt]# ll-bash: /usr/bin/ls: Input/output error 解决方法 [rootlocalhost ~]# echo 1 > /proc/sys/kernel/sysrq [rootlocalhost opt]# echo b > /proc/sysrq-trigger

Redis中的SCAN渐进式扫描底层原理

Scan渐进式扫描原理 概述 由于Redis是单线程再处理用户的命令&#xff0c;而Keys命令会一次性遍历所有key&#xff0c;于是在命令执行过程中&#xff0c;无法执行其他命令。这就导致如果Redis中的key比较多&#xff0c;那么Keys命令执行时间就会比较长&#xff0c;从而阻塞Re…

代码随想录三刷 day17 | 二叉树之110.平衡二叉树 257. 二叉树的所有路径 404.左叶子之和

三刷day17 110.平衡二叉树257. 二叉树的所有路径404.左叶子之和 110.平衡二叉树 题目链接 解题思路&#xff1a; 求高度用后续遍历&#xff0c;先得到左子树和右子树的情况&#xff0c;然后再进行判断&#xff08;左右中&#xff09;使用递归遍历 代码如下&#xff1a; class…

即插即用篇 | YOLOv8 引入 ParNetAttention 注意力机制 | 《NON-DEEP NETWORKS》

论文名称:《NON-DEEP NETWORKS》 论文地址:https://arxiv.org/pdf/2110.07641.pdf 代码地址:https://github.com/imankgoyal/NonDeepNetworks 文章目录 1 原理2 源代码3 添加方式4 模型 yaml 文件template-backbone.yamltemplate-small.yamltemplate-large.yaml

程序员常用的几种算法

程序员常用的几种算法 一、程序员算法汇总二、程序员常用的几种算法1.选择排序算法1.1 选择排序算法解析&#xff1a;1.2 示例代码&#xff1a; 2.插入排序算法2.1 插入排序算法解析&#xff1a;2.2 示例代码&#xff1a; 3.冒泡排序算法3.1 冒泡排序算法解析&#xff1a;3.2 示…

LeetCode买卖股票的最佳时机

文章目录 LeetCode买卖股票的最佳时机121 买卖股票的最佳时机Ⅰ题目描述代码 122 买卖股票的最佳时间Ⅱ题目描述代码 123 买卖股票的最佳时机Ⅲ题目描述代码一&#xff08;会超时&#xff09;代码二&#xff08;dp&#xff09; 188 买卖股票的最佳时机Ⅳ题目描述代码 LeetCode买…

题记(51)--L1-023 输出GPLT

目录 一、题目内容 二、输入描述 三、输出描述 四、输入输出示例 五、完整C语言代码 一、题目内容 给定一个长度不超过10000的、仅由英文字母构成的字符串。请将字符重新调整顺序&#xff0c;按GPLTGPLT....这样的顺序输出&#xff0c;并忽略其它字符。当然&#xff0c;四…

【PyTorch】进阶学习:探索BCEWithLogitsLoss的正确使用---二元分类问题中的logits与标签形状问题

【PyTorch】进阶学习&#xff1a;探索BCEWithLogitsLoss的正确使用—二元分类问题中的logits与标签形状问题 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、Py…

微服务架构 | 多级缓存

INDEX 通用设计概述2 优势3 最佳实践 通用设计概述 通用设计思路如下图 内容分发网络&#xff08;CDN&#xff09; 可以理解为一些服务器的副本&#xff0c;这些副本服务器可以广泛的部署在服务器提供服务的区域内&#xff0c;并存有服务器中的一些数据。 用户访问原始服务器…

(未解决)macOS matplotlib 中文是方框

reference&#xff1a; Mac OS系统下实现python matplotlib包绘图显示中文(亲测有效)_mac plt 中文值-CSDN博客 module ‘matplotlib.font_manager‘ has no attribute ‘_rebuild‘解决方法_font_manager未解析-CSDN博客 # 问题描述&#xff08;笑死 显而易见 # solve 找到…

C++从零开始的打怪升级之路(day46)

这是关于一个普通双非本科大一学生的C的学习记录贴 在此前&#xff0c;我学了一点点C语言还有简单的数据结构&#xff0c;如果有小伙伴想和我一起学习的&#xff0c;可以私信我交流分享学习资料 那么开启正题 今天分享的是关于二叉树的题目 1.从前序与中序遍历序列构造二叉…

自编码器(Autoencoder, AE)

自编码器(Autoencoder, AE)是一种无监督学习算法,它利用神经网络来学习数据的高效表示(即编码)。自编码器的目标是能够通过输入数据学习到一个压缩的、分布式的表示,然后通过这种表示重构出原始数据。自编码器主要由两部分组成:编码器(Encoder)和解码器(Decoder)。 …

测试岗最好用的——十大软件测试工具

前言 目前由于软件测试工作在软件的生产过程中越来越重要&#xff0c;很多软件测试工具应运而生&#xff0c;这里介绍一下目前最流行的一些软件测试工具&#xff0c;一个十个&#xff0c;介绍如下&#xff1a;一、企业级自动化测试工具WinRunner 这款软件是Mercury Interacti…

【Linux】 yum —— Linux 的软件包管理器

Linux 的软件包管理器 yum yum 是什么什么是软件包查看软件包 yum 命令行工具yum 配置文件yum 凭什么可以支持下载呢&#xff1f;yum 生态yum 社区yum 的故障排除和资源支持yum 的持续集成和持续交付 yum 是什么 Yum&#xff08;Yellowdog Updater Modified&#xff09;是一个…