flink读取kafka数据存储iceberg

1、说明

使用flink实时的读取kafka的数据,并且实时的存储到iceberg中。好处是可以一边存数据,一边查询数据。当然使用clickhouse也可以实现数据的既存既取。而hive数据既存既读则会有问题。iceberg中数据读写数据都是从快照中开始的,读和写对应的不同快照,所以读写互不影响。而hive中写的时候数据就不能读。

下面是使用flink读取kafka数据存储到iceberg的例子。本案例,可以直接在本地直接运行,无需搭建hadoop,hive集群。其中遇到的问题及解决思路。用到kafka,可以直接使用docker,来搞一个,跑起来。

2、实现步骤

1)确保flink和iceberg的版本对应

这里使用的是(flink:1.13.5,iceberg: 0.12.1 )

2) 创建流式执行环境

  • 使用getExecutionEnvironment()的静态方法可以自动识别是本地环境还是集群服务环境。当然也可以使用createLocalEnvironment()方法创建本地环境。

    该环境变量类似于上下文,可以配置一些基本的信息,如并行度(默认是CPU数)、检查时间间隔(默认不检查)。

  • 这里设置检查为5000毫秒,到检查时间的时候 ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据(必须设置checkpoint)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

3)指定kafka数据源创建数据流

  • 设置读取kafka的一些参数,值的序列化使用的是字符串序列化模式(SimpleStringSchema);开始读取kafka数据,偏移量设置,则从最新的数据开始读取(latest)。

  • 从指定的source(源)中创建一个数据流。这里的 source 可能是Kafka读取数据,当然还可以从其他外部源如socketKinesis 或其他数据源读取的数据。WatermarkStrategy.noWatermarks(),这是一个水印策略。用于处理事件时间(event-time)和处理有延迟或乱序的数据。WatermarkStrategy.noWatermarks() 表示我们不想为这个数据流生成任何水印。这意味着我们可能是在做纯粹的基于处理时间(processing-time)的流处理,或者我们不关心事件时间的顺序。"kafka_source",这是给这个数据源分配的名称,主要用于日志记录和调试。

        KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("node1:9094,node2:9094").setTopics("topic_users").setGroupId("flink-test-1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkaDs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");

4)流式数据转换器

  • 上一步获取到的数据流,将数据流中的json字符串转为RowData . 当然输入的数据格式可能是其它格式,根据实际情况进行修改。
  • 注意GenericRowData中字符串格式必须使用 StringData.fromString(session.userId)来转换一下,否则无法存储到iceberg表中。
        SingleOutputStreamOperator<RowData> dataStream = kafkaDs.map((MapFunction<String, RowData>) value -> {Gson gson = new Gson();Sessions session = gson.fromJson(value, Sessions.class);GenericRowData row = new GenericRowData(9);row.setField(0, session.version);row.setField(1, StringData.fromString(session.userId));row.setField(2, StringData.fromString(session.appType));row.setField(3, session.loginTime);row.setField(4, StringData.fromString(session.clientIp));row.setField(5, StringData.fromString(session.service));row.setField(6, session.status);row.setField(7, StringData.fromString(session.channel));row.setField(8, StringData.fromString(timeStamp2DateStr(session.loginTime)));return row;});

5)创建表(创建Catalog、表Id,表Schema、表分区、表文件存储格式等)

这里内容看起来比较多,但也就是一件事,建表。

  • 创建 Hadoop 配置: 加载系统默认配置(core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)

  • 创建 Iceberg 的 Hadoop Catalog: 用来管理表创建、删除,以及元数据存储位置。这里使用hadoop存储元数据和实际数据。

  • 指定表标识符: 对应数据库名称和表的名称。对应的数据也会存在iceberg_db/flink_iceberg_tbl目录下。

  • 定义表的 Schema:
    这部分代码定义了表的结构,指定了每个字段的名称和类型。

  • 定义分区: PartitionSpec.builderFor(schema).identity("hour_p").build();按照hour_p字段来分区数据. 如果不见分许则使用 PartitionSpec.unpartitioned()

PartitionSpec spec = PartitionSpec.builderFor(schema).identity("hour_p").build();
  • 设置表属性: 设置了默认的文件格式为PARQUET

  • 检查表是否存在,如果不存在则创建

  • 创建 TableLoader: TableLoader是用于加载Iceberg表的工具,这个目录是本地的数据存储的目录,以及数据库、数据表对应的名称。如果使用hdfs,则改为相应目录结构即可。

TableLoader tableLoader = TableLoader.fromHadoopTable("data/flinkwarehouse/iceberg_db/flink_iceberg_tbl");
        // 创建默认的配置类,会自动加载hadoop相关的配置文件Configuration hadoopConf = new Configuration();// 设置Catalog的存储位置Catalog catalog = new HadoopCatalog(hadoopConf, "data/flinkwarehouse");// iceberg 数据库名称,数据表名称TableIdentifier name = TableIdentifier.of("iceberg_db", "flink_iceberg_tbl");// 数据表明模式,以及字段名称,字段类型Schema schema = new Schema(Types.NestedField.required(1, "version", Types.IntegerType.get()),Types.NestedField.required(2, "userId", Types.StringType.get()),Types.NestedField.required(3, "appType", Types.StringType.get()),Types.NestedField.required(4, "loginTime", Types.LongType.get()),Types.NestedField.required(5, "clientIp", Types.StringType.get()),Types.NestedField.required(6, "service", Types.StringType.get()),Types.NestedField.required(7, "status", Types.IntegerType.get()),Types.NestedField.required(8, "channel", Types.StringType.get()),Types.NestedField.required(9, "hour_p", Types.StringType.get()));// 设置分区 PartitionSpec spec = PartitionSpec.unpartitioned();PartitionSpec spec = PartitionSpec.builderFor(schema).identity("hour_p").build();// 设置 默认文件存储格式 parquet格式Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());Table table = null;if (!catalog.tableExists(name)) {table = catalog.createTable(name, schema, spec, props);} else {table = catalog.loadTable(name);}TableLoader tableLoader = TableLoader.fromHadoopTable("data/flinkwarehouse/iceberg_db/flink_iceberg_tbl");

6)创建FlinkSink读取流式数据存储到表中

  • 接收一个数据流
  • 指定相应的表
  • 数据写入加载器中

7)执行环境开始执行任务

这里给任务一个简短的描述。


env.execute("iceberg api and flink ");

3、全部代码如下

1)build.gradle依赖如下配置

def flink = [version: '1.13.5'
]
def hadoop = [version: '3.2.2'
]def iceberg = [version: '0.12.1'
]dependencies {implementation 'com.alibaba.ververica:ververica-connector-iceberg:1.13-vvr-4.0.7'implementation "org.apache.iceberg:iceberg-flink-runtime:${iceberg.version}"implementation "org.apache.flink:flink-java:${flink.version}"implementation "org.apache.flink:flink-streaming-java_2.11:${flink.version}"implementation "org.apache.flink:flink-clients_2.11:${flink.version}"implementation "org.apache.flink:flink-streaming-scala_2.11:${flink.version}"implementation "org.apache.flink:flink-connector-kafka_2.11:${flink.version}"implementation "org.apache.flink:flink-connector-base:${flink.version}"implementation "org.apache.hadoop:hadoop-client:${hadoop.version}"implementation "org.apache.flink:flink-table-runtime-blink_2.11:${flink.version}"implementation "org.apache.flink:flink-table:${flink.version}"implementation "org.apache.flink:flink-table-common:${flink.version}"implementation "org.apache.flink:flink-table-api-java:${flink.version}"implementation "org.apache.flink:flink-table-api-java-bridge_2.11:${flink.version}"implementation "org.apache.flink:flink-table-planner_2.11:${flink.version}"implementation "org.apache.flink:flink-table-planner-blink_2.11:${flink.version}"testImplementation 'junit:junit:4.11'// log4j and slf4j dependencies testImplementation 'org.slf4j:slf4j-log4j12:1.7.25'testImplementation 'log4j:log4j:1.2.17'implementation 'org.slf4j:slf4j-api:1.7.25'testImplementation 'org.slf4j:slf4j-nop:1.7.25'testImplementation 'org.slf4j:slf4j-simple:1.7.5'implementation 'com.google.code.gson:gson:2.3.1'// 没有这个会出现报错,使用compilecompile 'com.google.guava:guava:28.2-jre'}

2)案例代码

package com.subao.flink;import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;public class FlinkIceberg {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据(必须设置checkpoint)env.enableCheckpointing(5000);KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("124.70.194.33:9094,124.71.180.217:9094").setTopics("sessions").setGroupId("flink-test-1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkaDs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");SingleOutputStreamOperator<RowData> dataStream = kafkaDs.map((MapFunction<String, RowData>) value -> {Gson gson = new Gson();Sessions session = gson.fromJson(value, Sessions.class);GenericRowData row = new GenericRowData(9);row.setField(0, session.version);row.setField(1, StringData.fromString(session.userId));row.setField(2, StringData.fromString(session.appType));row.setField(3, session.loginTime);row.setField(4, StringData.fromString(session.clientIp));row.setField(5, StringData.fromString(session.service));row.setField(6, session.status);row.setField(7, StringData.fromString(session.channel));row.setField(8, StringData.fromString(timeStamp2DateStr(session.loginTime)));System.out.println(row);return row;});// 创建默认的配置类,会自动加载hadoop相关的配置文件Configuration hadoopConf = new Configuration();// 设置Catalog的存储位置Catalog catalog = new HadoopCatalog(hadoopConf, "data/flinkwarehouse");// iceberg 数据库名称,数据表名称TableIdentifier name = TableIdentifier.of("iceberg_db", "flink_iceberg_tbl");// 数据表明模式,以及字段名称,字段类型Schema schema = new Schema(Types.NestedField.required(1, "version", Types.IntegerType.get()),Types.NestedField.required(2, "userId", Types.StringType.get()),Types.NestedField.required(3, "appType", Types.StringType.get()),Types.NestedField.required(4, "loginTime", Types.LongType.get()),Types.NestedField.required(5, "clientIp", Types.StringType.get()),Types.NestedField.required(6, "service", Types.StringType.get()),Types.NestedField.required(7, "status", Types.IntegerType.get()),Types.NestedField.required(8, "channel", Types.StringType.get()),Types.NestedField.required(9, "hour_p", Types.StringType.get()));// 设置分区 PartitionSpec spec = PartitionSpec.unpartitioned();PartitionSpec spec = PartitionSpec.builderFor(schema).identity("hour_p").build();// 设置 默认文件存储格式 parquet格式Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());Table table = null;if (!catalog.tableExists(name)) {table = catalog.createTable(name, schema, spec, props);} else {table = catalog.loadTable(name);}TableLoader tableLoader = TableLoader.fromHadoopTable("data/flinkwarehouse/iceberg_db/flink_iceberg_tbl");FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).overwrite(false).build();env.execute("iceberg api and flink ");}private static String timeStamp2DateStr(long timestamp) {// 将时间戳转为LocalDateTime对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp * 1000), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHH");// 格式化日期时间return dateTime.format(formatter);}class Sessions {private int version;private String userId;private String appType;private long loginTime;private String clientIp;private String service;private int status;private String channel;public Sessions(int version, String userId, String appType, long loginTime, String clientIp, String service, int status, String channel) {this.version = version;this.userId = userId;this.appType = appType;this.loginTime = loginTime;this.clientIp = clientIp;this.service = service;this.status = status;this.channel = channel;}public int getVersion() {return version;}public void setVersion(int version) {this.version = version;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAppType() {return appType;}public void setAppType(String appType) {this.appType = appType;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}public String getClientIp() {return clientIp;}public void setClientIp(String clientIp) {this.clientIp = clientIp;}public String getService() {return service;}public void setService(String service) {this.service = service;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}@Overridepublic String toString() {return "Sessions{" +"version=" + version +", userId='" + userId + '\'' +", appType='" + appType + '\'' +", loginTime=" + loginTime +", clientIp='" + clientIp + '\'' +", service='" + service + '\'' +", status=" + status +", channel='" + channel + '\'' +'}';}}
}

4、遇到的问题

1)找不到方法 java.lang.NoSuchMethodError:com.google.common.base.Preconditions.checkState。

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V

这个主要是相应的com.google.guava的版本不兼容,在build.gradle中加入如下即可

compile 'com.google.guava:guava:28.2-jre'

排查思路:

a) 看到这个我就想应该是相应的包没有正常添加到依赖中。

可以百度一下看看是哪个包,发现是com.google.guava包。

b) 检查现有依赖中是否包含该包

因为使用的是gradle,所以使用gradle的命令:gradle dependencies,从结果中查看,发现使用的hadoop中有多个包,版本不同,最大的是27.0的包。

c) 尝试使用高版本的guava的包

compile 'com.google.guava:guava:28.2-jre'

经测试是可以的。但这里主要要使用compile关键字。

compileimplementation都表示依赖文件在引入项目的编译期、运行期和测试期使用。但是compile标识的包可被其它依赖包使用,implementation则不可以。

  • compile 关键字会将依赖项传递到项目的所有模块和依赖项中。这意味着如果模块 A 依赖于模块 B,并且模块 B 使用了compile 关键字引入了一些依赖项,那么这些依赖项也会传递给模块 A。

  • implementation 关键字只将依赖项传递到当前模块中,不会传递给依赖模块。这意味着如果模块 A 依赖于模块 B,并且模块 B 使用了implementation 关键字引入了一些依赖项,那么这些依赖项不会传递给模块 A。

由于compile 关键字会引入依赖的传递性,可能导致不可预期的副作用和冲突。为了解决这个问题,从 Gradle 3.4 版本开始,建议使用implementation 关键字代替compile 关键字,以减少依赖项传递引起的问题。

2) 数据文件为空

flink能够读取kafka的数据,但是不能将数据写到文件中。怎么办?于是问了问GPT,它说看看日志。我没有配置log4j的日志配置,当然看不到错误日志了。于是我就加上log4j.properties的配置文件。然后发现了如下错误:

2023-08-08 19:38:16 DEBUG JobMaster:658 - Archive local failure causing attempt 8d8beb11aa845d873ebbc317b21bf435 to fail: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.table.data.StringDataat org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)at org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter.get(FlinkParquetWriters.java:453)

原来是数据格式不兼容的问题,对于Flink中的RowData中使用的是StringData表示字符串,因此在存入的时候需要把String类型转为StringData即可。如:

 row.setField(2, StringData.fromString(session.appType));

总结:flink读取kafak数据用户的应该是比较多的,但是保存到iceberg中,这个相对用的比较少。flink中比较常用flink-sql来处理和数据表相关的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/32854.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Tensorrt导出engine文件加密(WindowsC++版)

扯皮的话:为了优化模型的inference,TensorRT会根据网络的定义执行优化包括特定平台的优化并生成inference engine。此过程被称为构建阶段,尤其是在嵌入式平台上会消耗大量的时间,因此,一个典型的应用程序只会被构建一次engine,然后将其序列化为plane file以供后续使用。需…

使用 ESP32 Arduino 和机器学习实现WIFI室内定位

在这个 Arduino 机器学习项目中,我们将使用附近的 WiFi 接入点来定位我们所在的位置。为了使该项目正常运行,您需要一块配备 WiFi 的板,例如 ESP8266、ESP32 或 MKR WiFI 1010。 什么是室内定位? 我们都习惯了 GPS 定位,我们的设备将使用卫星来跟踪我们在地球上的位置。GP…

数学建模—分类模型

本讲将介绍分类模型。对于而分类模型&#xff0c;我们将介绍逻辑回归&#xff08;logistic regression&#xff09;和Fisher线性判别分析两种分类算法&#xff1b;对于多分类模型&#xff0c;我们将简单介绍Spss中的多分类线性判别分析和多分类逻辑回归的操作步骤下。 本题按水…

PostgreSQL jsonb

PostgreSQL jsonb jsonb 函数以及操作符 在PostgreSQL中&#xff0c;有许多用于处理JSONB数据类型的内置函数和操作符。下面列出了一些常用的JSONB函数和操作符&#xff1a; jsonb_pretty(jsonb) 该函数将JSONB数据格式化为易读的多行字符串。jsonb_typeof(jsonb) 该函数返回…

什么是DDL、MDL?

DDL和MDL是与数据库相关的术语&#xff0c;它们有一些不同的含义。 DDL&#xff08;Data Definition Language&#xff0c;数据定义语言&#xff09;&#xff1a; DDL用于定义和管理数据库中的对象&#xff0c;如表、索引、视图等。它包含用于创建、修改、删除和管理数据库对象…

Paddle OCR V4 测试Demo

效果 项目 VS2022.net4.8OCRV4 代码 using OpenCvSharp; using Sdcb.PaddleInference; using Sdcb.PaddleOCR; using Sdcb.PaddleOCR.Models; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; usin…

Springboot开发常用注解

文章目录 1.RestController2.Data3.RequestMapping4.Builder 1.RestController RestController注解其实就是将 return 中的内容以 JSON字符串的形式返回客户端 controller的详解 2.Data Data详解 3.RequestMapping RequestMapping 4.Builder Builder

【Wamp】安装 | 局域网内设备访问

安装教程&#xff1a; https://wampserver.site/article/1.html 下载 https://www.wampserver.com/en/ 安装路径上不能有中文 安装好之后图标呈绿色 放入网页文件 将网页文件放置于wamp文件夹的www子文件夹 例如&#xff1a;\Wamp\program\www 修改http端口 WAMP服务器…

编写简单的.gitlab-ci.yml打包部署项目

服务器说明&#xff1a; 192.168.192.120&#xff1a;项目服务器 192.168.192.121&#xff1a;GitLab 为了可以使用gitlab的cicd功能&#xff0c;我们需要先安装GitLab Runner 安装GitLab Runner参考&#xff1a; GitLab实现CICD自动化部署_gitlab cidi_程序员xiaoQ的博客-CS…

【MFC】05.MFC六大机制:程序启动机制-笔记

MFC程序开发所谓是非常简单&#xff0c;但是对于我们逆向人员来说&#xff0c;如果想要逆向MFC程序&#xff0c;那么我们就必须了解它背后的机制&#xff0c;这样我们才能够清晰地逆向出MFC程序&#xff0c;今天这篇文章就来带领大家了解MFC的第一大机制&#xff1a;程序启动机…

AI:02-基于深度学习的动物图像检索算法的研究

文章目录 一、算法原理二、代码实现三、实验结果四、总结深度学习在计算机视觉领域中的应用越来越广泛,其中动物图像检索算法是一个重要的应用场景。本文将介绍一种基于深度学习的动物图像检索算法,并提供相应的代码实现。 一、算法原理 本算法采用卷积神经网络(Convolutio…

Sqlite3简介

SQLite3 简介 SQLite3 是一种轻量级的嵌入式数据库引擎&#xff0c;被广泛应用于各种应用程序中&#xff0c;包括移动设备、桌面应用程序和嵌入式系统。它以其简单、高效和零配置的特点而受到开发者的喜爱。 以下是 SQLite3 的一些重要特点&#xff1a; 嵌入式数据库引擎&…

前端项目打包报错JavaScript heap out of memory(js堆内存不足)

项目打包出现类似以下报错 <--- JS stacktrace --->FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out of memory1: 00007FF646E9B1EF v8::internal::CodeObjectRegistry::~CodeObjectRegistry1235992: 00007FF646E28BA6 v8::internal::Microta…

数据安全加固:深入解析滴滴ES安全认证技术方案

前文分别介绍了滴滴自研的ES强一致性多活是如何实现的、以及如何提升ES的性能潜力。由于ES具有强大的搜索和分析功能&#xff0c;同时也因其开源和易于使用而成为黑客攻击的目标。近些年&#xff0c;业界ES数据泄露事件频发, 以下是一些比较严重的数据泄露案件&#xff1a; 202…

Golang函数以及函数和方法的区别

在接触到go之前&#xff0c;我认为函数和方法只是同一个东西的两个名字而已&#xff08;在我熟悉的c/c&#xff0c;python&#xff0c;java中没有明显的区别&#xff09;&#xff0c;但是在golang中者完全是两个不同的东西。官方的解释是&#xff0c;方法是包含了接收者的函数。…

基于Dlib库+SVM+Tensorflow+PyQT5智能面相分析-机器学习算法应用(含全部工程源码)+训练及测试数据集

目录 前言总体设计系统整体结构图系统流程图模型流程 运行环境Python 环境TensorFlow环境界面编程环境 模块实现1. 数据预处理2. 模型构建1&#xff09;定义模型结构2&#xff09;交叉验证模型优化 3. 模型训练及保存4. 模型测试1&#xff09;摄像头调用2&#xff09;模型导入及…

题解 | #A.Alive Fossils# 2023牛客暑期多校8

A.Alive Fossils 签到 题目大意 给定 n n n 个字符串集&#xff0c;求它们的交集&#xff0c;按字典序输出 解题思路 逐一处理字符串集&#xff0c;开个 map 记录此前的交集&#xff0c;从当前集合中选走元素即可 时间复杂度 O ( n ) O(n) O(n) 参考代码 参考代码为已A…

neo4j电影库-关系查询

关系类型数量源数据目标数据属性ACTED_IN172演员电影roles&#xff08;角色扮演&#xff09;属性&#xff0c;属性值为数组DIRECTED44导演电影无PRODUCED15制片商电影无WROTE10作家电影无FOLLOWS3影评人影评人无REVIEWED9影评人电影summary&#xff08;影评摘要&#xff09;和 …

1036:A×B问题

【题目描述】 输入两个正整数A和B&#xff0c;求A*B的值。注意乘积的范围和数据类型的选择。 【输入】 一行&#xff0c;包含两个正整数A和B&#xff0c;中间用单个空格隔开。1<A,B<50000。 【输出】 一个整数&#xff0c;即A*B的值。 【输入样例】 3 4 【输出样…

Linux C++网络编程基础(1):TCP服务端与客户端的实现

目录 一、OSI七层网络模型二、TCP的特点三、TCP服务端代码服务端代码相关函数介绍sockaddr_inhtonssocketbindlistenacceptreadsendclose四、TCP客户端代码客户端相关代码介绍inet_ptonconnect一、OSI七层网络模型 网络协议是计算机网络中的规则,它们定义了计算机如何发送和接…