实验3-实时数据流处理-Flink

1.前期准备

(1)Flink基础环境安装

参考文章:

利用docker-compose来搭建flink集群-CSDN博客

显示为这样就成功了

(2)把docker,docker-compose,kafka集群安装配置好

参考文章:

利用docker搭建kafka集群并且进行相应的实践-CSDN博客

这篇文章里面有另外两篇文章的链接,点进去就能够看到

(3)在windows上面,创建一个数据库mysql1(如果没有的话就需要创建),接着在这个数据库里面建一个表min_table

具体代码如下

create database if not exists mysql1; -- 注释符为‘-- '注意有个空格

use mysql1;

CREATE TABLE min_table (

    id INT AUTO_INCREMENT PRIMARY KEY,

    timestamp TIMESTAMP NOT NULL,

    quantity INT NOT NULL,

    amount DOUBLE NOT NULL,

    UNIQUE KEY unique_timestamp (timestamp)

);

create database if not exists mysql1; -- 注释符为‘-- '注意有个空格use mysql1;CREATE TABLE min_table (id INT AUTO_INCREMENT PRIMARY KEY,timestamp TIMESTAMP NOT NULL,quantity INT NOT NULL,amount DOUBLE NOT NULL,UNIQUE KEY unique_timestamp (timestamp));

(4)接着在安装配置了flink的linux虚拟机上面安装好mysql

参考文章:黑马大数据学习笔记4-Hive部署和基本操作_黑马大数据 hive笔记-CSDN博客

 (5)然后同样的在linux虚拟机上面的mysql中创建一个数据库mysql1(如果没有的话就需要创建),接着在这个数据库里面建一个表min_table

具体代码如下

create database if not exists mysql1; -- 注释符为‘-- '注意有个空格

use mysql1;

CREATE TABLE min_table (

    id INT AUTO_INCREMENT PRIMARY KEY,

    timestamp TIMESTAMP NOT NULL,

    quantity INT NOT NULL,

    amount DOUBLE NOT NULL,

    UNIQUE KEY unique_timestamp (timestamp)

);

create database if not exists mysql1; -- 注释符为‘-- '注意有个空格use mysql1;CREATE TABLE min_table (id INT AUTO_INCREMENT PRIMARY KEY,timestamp TIMESTAMP NOT NULL,quantity INT NOT NULL,amount DOUBLE NOT NULL,UNIQUE KEY unique_timestamp (timestamp));

(6)在idea里面新建一个Maven项目,名字叫做FlinkDemo然后往pom.xml中添加以下配置

<dependencies><!-- Flink 的核心库 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency><!-- Flink Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.1-1.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies>
<build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

这个和上面的是一个东西,就看你喜欢一键复制还是分别复制了

<dependencies>
    <!-- Flink 的核心库 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.18.0</version>
    </dependency>

    <!-- Flink Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.0.1-1.18</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.1-1.17</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.33</version>
    </dependency>


</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(7)在该项目的com.examle目录下创建三个文件

     目录结构如下

DatabaseSink.java
package com.example;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.types.Row;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;import java.sql.PreparedStatement;
import java.sql.Timestamp;public class DatabaseSink {private String url;private String username;private String password;public DatabaseSink(String url, String username, String password) {this.url = url;this.username = username;this.password = password;}public void addSink(DataStream<Tuple3<Timestamp, Long, Double>> stream) {stream.addSink(JdbcSink.sink("INSERT INTO min_table (timestamp, quantity, amount) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE quantity = quantity + VALUES(quantity), amount = amount + VALUES(amount)",(ps, t) -> {ps.setTimestamp(1, t.f0);ps.setLong(2, t.f1);ps.setDouble(3, t.f2);},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(this.url).withDriverName("com.mysql.jdbc.Driver").withUsername(this.username).withPassword(this.password).build()));}
}
LocalFlinkTest.java
package com.example;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;public class LocalFlinkTest {public static void main(String[] args) throws Exception {SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(333, // 尝试重启的次数org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟));env.setRestartStrategy(RestartStrategies.noRestart());KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址.setGroupId("testGroup") // 你的消费者组 ID.setTopics("foo") // 你的主题.setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // 从消费者组的最新偏移量开始消费.build();DataStream<String> stream = env.fromSource(source,WatermarkStrategy.noWatermarks(), "Kafka Source");
// flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
// 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
// 近 1 分钟与当天累计的总交易金额、交易数量
//                DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {// 假设文件的第一行是表头,这里跳过它return !value.startsWith("time");}}).flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,Double>>() {@Overridepublic void flatMap(String line, Collector<Tuple3<Timestamp, Long,Double>> out) {try {String[] fields = line.split(",");String s = fields[0];
// 解析时间字符串后,将日期时间对象的秒字段设置为 0Date date = sdf.parse(s);Timestamp sqlTimestamp = new Timestamp(date.getTime());double price = Double.parseDouble(fields[3]);long quantity = Long.parseLong(fields[4]);double amount = price * quantity;out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
// System.out.println(line);} catch (Exception e) {System.out.println(line);                        }}}); // 过滤掉解析失败的记录;// 计算每 500 毫秒的数据
// keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =transactionVolumes.keyBy(t -> t.f0).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce((Tuple3<Timestamp,Long ,Double> value1,Tuple3<Timestamp,Long ,Double> value2) -> {
//                            System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +value2.f2);});oneSecondAmounts.print();DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");dbSink.addSink(oneSecondAmounts);env.execute("Kafka Flink Demo");}
}
DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://localhost:3306/mysql1", "root", "123456");

这里的密码应该改成你自己的。(当然博主本人的是123456)

FlinkTest.java
package com.example;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;public class FlinkTest {public static void main(String[] args) throws Exception {SimpleDateFormat sdf = new SimpleDateFormat(("yyyy-MM-dd HH:mm"));SimpleDateFormat sdf_hour = new SimpleDateFormat("yyyy-MM-dd HH");final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(333, // 尝试重启的次数org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 延迟));env.setRestartStrategy(RestartStrategies.noRestart());KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.88.101:19092,192.168.88.101:29092,192.168.88.101:39092") // 你的 Kafka 服务器地址.setGroupId("testGroup") // 你的消费者组 ID.setTopics("foo") // 你的主题.setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // 从消费者组的最新偏移量开始消费.build();DataStream<String> stream = env.fromSource(source,WatermarkStrategy.noWatermarks(), "Kafka Source");
// flatMap 函数,它接收一个输入元素,并可以输出零个、一个或多个元素。
// 在这个函数中,输入元素是从 Kafka 中读取的一行数据,输出元素是一个包含交易量的元组。
// 近 1 分钟与当天累计的总交易金额、交易数量
//                DataStream<String> stream = env.readTextFile("D:\\idea\\flinkTest\\src\\main\\java\\com\\springbootdemo\\2.csv", "GBK");DataStream<Tuple3<Timestamp, Long, Double>> transactionVolumes = stream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {// 假设文件的第一行是表头,这里跳过它return !value.startsWith("time");}}).flatMap(new FlatMapFunction<String, Tuple3<Timestamp, Long,Double>>() {@Overridepublic void flatMap(String line, Collector<Tuple3<Timestamp, Long,Double>> out) {try {String[] fields = line.split(",");String s = fields[0];
// 解析时间字符串后,将日期时间对象的秒字段设置为 0Date date = sdf.parse(s);Timestamp sqlTimestamp = new Timestamp(date.getTime());double price = Double.parseDouble(fields[3]);long quantity = Long.parseLong(fields[4]);double amount = price * quantity;out.collect(Tuple3.of(sqlTimestamp, quantity, amount));
// System.out.println(line);} catch (Exception e) {System.out.println(line);                        }}}); // 过滤掉解析失败的记录;// 计算每 500 毫秒的数据
// keyBy(t -> t.f0)代表以第一个字段 Timestamp 为键,确保一个窗口内的时间都是相同的DataStream<Tuple3<Timestamp,Long ,Double>> oneSecondAmounts =transactionVolumes.keyBy(t -> t.f0).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce((Tuple3<Timestamp,Long ,Double> value1,Tuple3<Timestamp,Long ,Double> value2) -> {
//                            System.out.println(Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 + value2.f2));return Tuple3.of(value1.f0,value1.f1 + value2.f1, value1.f2 +value2.f2);});oneSecondAmounts.print();DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");dbSink.addSink(oneSecondAmounts);env.execute("Kafka Flink Demo");}
}
DatabaseSink dbSink = new DatabaseSink("jdbc:mysql://192.168.88.101:3306/mysql1", "root", "123456");

这里的密码和主机号(192.168.88.101)应该改成你自己的密码和主机号

2.开始实验,分为本地测试和flink测试

(1)启动node1,打开Finalshell,启动docker,启动kafka集群,flink集群

systemctl start docker
cd /export/server
docker-compose -f kafka.yml up -d
docker-compose -f flink.yml up -d
docker ps

效果如下

(2)先进行本地测试(这里只需要用到kafka集群)

打开两个node1的窗口
在第二个窗口进入kafka2容器,启动消费者进程

代码

docker exec -it kafka2 /bin/bash
cd /opt/bitnami/kafka/bin
kafka-console-consumer.sh --bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo

 效果如下

进入idea,运行这个文件LocalFlinkTest.java

在第一个窗口进入kafka1容器,发送文件的前5行

[root@node1 server]# docker exec -it kafka1 /bin/bash

root@a2f7152188c1:/#  cd /opt/bitnami/kafka/bin

root@a2f7152188c1:/opt/bitnami/kafka/bin# head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo

root@a2f7152188c1:/opt/bitnami/kafka/bin#

代码

docker exec -it kafka1 /bin/bash
cd /opt/bitnami/kafka/bin
head -n 5 /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo

接着在idea里面查看

在mysql里查看

到这里,本地测试就已经成功了!

(3)再进行flink测试,先在idea这里双击packge,然后去target目录看看有没有多出这两个文件(先运行文件FlinkTest.java先)

运行文件FlinkTest.java

在idea这里双击packge,然后去target目录看看有没有多出这两个文件 

进入网页node1:8081,上传这个名字更长的jar包

输入这个路径
D:\JetBrains\idea-project\FlinkDemo\target
(反正就是target目录的位置)

添加成功后

点一下那个玩意儿填入如下内容com.example.FlinkTest

这个com.example.FlinkTest是FlinkTest.java在项目中的路径

以及选择输入3

然后点击submit提交即可,结果显示正常运行

再回到node1的第一个窗口,
在这个位置
root@41d3910fe6c9:/opt/bitnami/kafka/bin#输入以下代码(kafka1的/opt/bitnami/kafka/bin目录下)来发个文件过去

代码

cat /bitnami/kafka/stock-part10.csv | kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo

任意点开一个,在监控参数中选择numRecordsInPerSecond可以查看每秒处理数据速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888570.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaweb-Mybaits

1.Mybaits入门 &#xff08;1&#xff09;介绍 &#xff08;2&#xff09; 2.Mybaits VS JDBC 3.数据库连接池 &#xff08;1&#xff09;SpringBoot默认连接池为hikari&#xff0c;切换为Druid有两种方式 方式一&#xff1a;加依赖 方式二&#xff1a;直接修改配置文件 …

Mybatis 关联查询

在 MyBatis 中&#xff0c;关联查询&#xff08;也称为复杂映射&#xff09;是指将多个表的数据通过 SQL 查询和结果映射的方式&#xff0c;组合成一个或多个 Java 对象。这种查询方式用于处理实体之间的关系&#xff0c;如一对一、一对多和多对多关系。通过关联查询&#xff0…

GPS模块/SATES-ST91Z8LR:电路搭建;直接用电脑的USB转串口进行通讯;模组上报定位数据转换地图识别的坐标手动查询地图位置

从事嵌入式单片机的工作算是符合我个人兴趣爱好的,当面对一个新的芯片我即想把芯片尽快搞懂完成项目赚钱,也想着能够把自己遇到的坑和注意事项记录下来,即方便自己后面查阅也可以分享给大家,这是一种冲动,但是这个或许并不是原厂希望的,尽管这样有可能会牺牲一些时间也有哪天原…

分布式光伏电站如何实现监控及集中运维管理?

安科瑞戴婷 Acrel-Fanny 前言 今年以来&#xff0c;在政策利好推动下光伏、风力发电、电化学储能及抽水蓄能等新能源行业发展迅速&#xff0c;装机容量均大幅度增长&#xff0c;新能源发电已经成为新型电力系统重要的组成部分&#xff0c;同时这也导致新型电力系统比传统的电…

大模型分类1—按应用类型

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl根据应用领域,大模型可分为自然语言处理、计算机视觉和多模态大模型。 1. 自然语言处理大模型(NLP) 1.1 应用领域与技术架构 自然语言处理大模型(NLP)的应用领域广泛,包括但不限于文本分类、…

2024 32kstar 的目前最佳开源RAG框架之一的 Langchain-Chatchat开源项目实践(一)

2024 32kstar 的目前最佳开源RAG框架之一的 Langchain-Chatchat开源项目实践&#xff08;一&#xff09; 文章目录 2024 32kstar 的目前最佳开源RAG框架之一的 Langchain-Chatchat开源项目实践&#xff08;一&#xff09;一、前言二、实践步骤1、软硬件要求&#xff08;1&#…

网络安全应急响应流程图

一、网络安全应急响应建设的背景和现状 当前&#xff0c;许多地区和单位已经初步建立了网络安全预警机制&#xff0c;实现了对一般网络安全事件的预警和处置。但是&#xff0c;由于网络与信息安全技术起步相对较晚&#xff0c;发展时间较短&#xff0c;与其他行业领域相比&…

2024 阿里云Debian12.8安装apach2【图文讲解】

1. 更新系统&#xff0c;确保您的系统软件包是最新的 sudo apt update sudo apt upgrade -y 2. 安装 Apache Web 服务器 apt install apache2 -y 3. 安装 PHP 及常用的扩展 apt install php libapache2-mod-php -y apt install php-mysql php-xml php-mbstring php-curl php…

本地windows环境下,在vscode里将go项目打成docker镜像,并运行访问

此处只展示一个简单go代码实例. #前提:需要装好docker和golang环境,本地docker启动且配置好镜像源地址: # 容器镜像加速服务-云港网络 1.首先在vscode中写一个简单输出的demo go mod init &#xff0b;go mod tidy编译一下,命令运行如下: 2.使用命令生成Dockerfile文件 $ g…

GIT的使用方法以及汉化方法

1.下载git软件&#xff0c;可以从官网下载 下载后默认安装即可。 2.找到一个文件夹&#xff0c;或者直接打开gitbash gitbash可以使用cd指令切换目录的 打开后输入 git clone https:[git仓库的网页]即可克隆仓库 就是这个地址 克隆后即可使用代码 如果忘记了命令可以使用 -…

前缀和(四)除自身以外数组的乘积

238. 除自身以外数组的乘积 给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&…

图解RabbitMQ七种工作模式生产者消费者模型的补充

文章目录 1.消费者模型2.生产者-消费者模型注意事项2.1资源释放顺序问题2.2消费者的声明问题2.3虚拟机和用户的权限问题 3.七种工作模式3.1简单模式3.2工作模式3.3发布/订阅模式3.4路由模式3.5通配符模式3.6RPC通信3.7发布确认 1.消费者模型 之前学习的这个消息队列的快速上手…

头歌 Linux之线程管理

第1关&#xff1a;创建线程 任务描述 通常我们编写的程序都是单进程&#xff0c;如果在一个进程中没有创建新的线程&#xff0c;则这个单进程程序也就是单线程程序。本关我们将介绍如何在一个进程中创建多个线程。 本关任务&#xff1a;学会使用C语言在Linux系统中使用pthrea…

BioDeepAV:一个多模态基准数据集,包含超过1600个深度伪造视频,用于评估深度伪造检测器在面对未知生成器时的性能。

2024-11-29, 由罗马尼亚布加勒斯特大学创建BioDeepAV数据集&#xff0c;它专门设计来评估最先进的深度伪造检测器在面对未见过的深度伪造生成器时的泛化能力&#xff0c;这对于提高检测器的鲁棒性和适应性具有重要意义。 数据集地址&#xff1a;biodeep 一、研究背景&#xff1…

工业—使用Flink处理Kafka中的数据_ChangeRecord1

使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,当某设备 30 秒状态连续为 “ 预警 ” ,输出预警 信息。当前预警信息输出后,最近30

Flink四大基石之State(状态) 的使用详解

目录 一、有状态计算与无状态计算 &#xff08;一&#xff09;概念差异 &#xff08;二&#xff09;应用场景 二、有状态计算中的状态分类 &#xff08;一&#xff09;托管状态&#xff08;Managed State&#xff09;与原生状态&#xff08;Raw State&#xff09; 两者的…

opencv-android编译遇到的相关问题处理

1、opencv-android sdk下载 下载地址&#xff1a;https://opencv.org/releases/ 下载安卓SDK即可 2、解压下载好的SDK 3、导入opencv的SDK到安卓项目中 导入步骤在/OpenCV-android-sdk/sdk/build.gradle文件的注释中写的非常详细&#xff0c;大家可安装官方给出的步骤导入。…

OpenSSH-9.9p1 OpenSSL-3.4.0 升级步骤详细

前言 收到漏洞扫描通知 OpenSSH 安全漏洞(CVE-2023-38408) OpenSSH 安全漏洞(CVE-2023-51385) OpenSSH 安全漏洞(CVE-2023-51384) OpenSSH 安全漏洞(CVE-2023-51767) OpenSSH 安全漏洞(CVE-2023-48795) OpenSSH&#xff08;OpenBSD SecureShell&#xff09;是加拿大OpenBSD计划…

Python毕业设计选题:基于Flask的医疗预约与诊断系统

开发语言&#xff1a;Python框架&#xff1a;flaskPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 系统首页 疾病信息 就诊信息 个人中心 管理员登录界面 管理员功能界面 用户界面 医生…

sql删除冗余数据

工作或面试中经常能遇见一种场景题&#xff1a;删除冗余的数据&#xff0c;以下是举例介绍相应的解决办法。 举例&#xff1a; 表结构&#xff1a; 解法1&#xff1a;子查询 获取相同数据中id更小的数据项&#xff0c;再将id不属于其中的数据删除。-- 注意&#xff1a;mysql中…