Go操作Kafka之kafka-go

Kafka是一种高吞吐量的分布式发布订阅消息系统,本文介绍了如何使用kafka-go这个库实现Go语言与kafka的交互。

Go社区中目前有三个比较常用的kafka客户端库 , 它们各有特点。

首先是IBM/sarama(这个库已经由Shopify转给了IBM),之前我写过一篇使用sarama操作Kafka的教程,相较于sarama, kafka-go 更简单、更易用。

segmentio/kafka-go 是纯Go实现,提供了与kafka交互的低级别和高级别两套API,同时也支持Context。

此外社区中另一个比较常用的confluentinc/confluent-kafka-go,它是一个基于cgo的librdkafka包装,在项目中使用它会引入对C库的依赖。

准备Kafka环境

这里推荐使用Docker Compose快速搭建一套本地开发环境。

以下docker-compose.yml文件用来搭建一套单节点zookeeper和单节点kafka环境,并且在8080端口提供kafka-ui管理界面。

version: '2.1'services:zoo1:image: confluentinc/cp-zookeeper:7.3.2hostname: zoo1container_name: zoo1ports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_SERVER_ID: 1ZOOKEEPER_SERVERS: zoo1:2888:3888kafka1:image: confluentinc/cp-kafka:7.3.2hostname: kafka1container_name: kafka1ports:- "9092:9092"- "29092:29092"- "9999:9999"environment:KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_BROKER_ID: 1KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_JMX_PORT: 9999KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizerKAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"depends_on:- zoo1kafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestports:- 8080:8080depends_on:- kafka1environment:DYNAMIC_CONFIG_ENABLED: "TRUE"

将上述docker-compose.yml文件在本地保存,在同一目录下执行以下命令启动容器。

docker-compose up -d

容器启动后,使用浏览器打开127.0.0.1:8080 即可看到如下kafka-ui界面。

在这里插入图片描述

点击页面右侧的“Configure new cluster”按钮,配置kafka服务连接信息。

在这里插入图片描述
填写完信息后,点击页面下方的“Submit”按钮提交即可。

在这里插入图片描述

安装kafka-go

执行以下命令下载 kafka-go依赖。

go get github.com/segmentio/kafka-go

注意:kafka-go 需要 Go 1.15或更高版本。

kafka-go使用指南

kafka-go 提供了两套与Kafka交互的API。

  • 低级别( low-level):基于与 Kafka 服务器的原始网络连接实现。
  • 高级别(high-level):对于常用读写操作封装了一套更易用的API。

通常建议直接使用高级别的交互API。

Connection

Conn 类型是 kafka-go 包的核心。它代表与 Kafka broker之间的连接。基于它实现了一套与Kafka交互的低级别 API。

发送消息

下面是连接至Kafka之后,使用Conn发送消息的代码示例。

// writeByConn 基于Conn发送消息
func writeByConn() {topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}
消费消息
// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer(上述代码中的b),如果传入的buffer太小(消息装不下)就会返回io.ErrShortBuffer错误。

如果不考虑内存分配的效率问题,也可以按以下代码使用batch.ReadMessage读取消息。

for {msg, err := batch.ReadMessage()if err != nil {break}fmt.Println(string(msg.Value))
}
创建topic

当Kafka关闭自动创建topic的设置时,可按如下方式创建topic。

// createTopicByConn 创建topic
func createTopicByConn() {// 指定要创建的topic名称topic := "my-topic"// 连接至任意kafka节点conn, err := kafka.Dial("tcp", "localhost:9092")if err != nil {panic(err.Error())}defer conn.Close()// 获取当前控制节点信息controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.Conn// 连接至leader节点controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic:             topic,NumPartitions:     1,ReplicationFactor: 1,},}// 创建topicerr = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}
}
通过非leader节点连接leader节点

下面的示例代码演示了如何通过已有的非leader节点的Conn,连接至 leader节点。

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {panic(err.Error())
}
defer conn.Close()
// 获取当前控制节点信息
controller, err := conn.Controller()
if err != nil {panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {panic(err.Error())
}
defer connLeader.Close()
获取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {panic(err.Error())
}
defer conn.Close()partitions, err := conn.ReadPartitions()
if err != nil {panic(err.Error())
}m := map[string]struct{}{}
// 遍历所有分区取topic
for _, p := range partitions {m[p.Topic] = struct{}{}
}
for k := range m {fmt.Println(k)
}

Reader

Reader是由 kafka-go 包提供的另一个概念,对于从单个主题-分区(topic-partition)消费消息这种典型场景,使用它能够简化代码。Reader 还实现了自动重连和偏移量管理,并支持使用 Context 支持异步取消和超时的 API。

注意: 当进程退出时,必须在 Reader 上调用 Close() 。Kafka服务器需要一个优雅的断开连接来阻止它继续尝试向已连接的客户端发送消息。如果进程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)终止,那么下面给出的示例不会调用 Close()。当同一topic上有新Reader连接时,可能导致延迟(例如,新进程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在进程关闭时关闭Reader。

消费消息

下面的代码演示了如何使用Reader连接至Kafka消费消息。

// readByReader 通过Reader接收消息
func readByReader() {// 创建Readerr := kafka.NewReader(kafka.ReaderConfig{Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},Topic:     "topic-A",Partition: 0,MaxBytes:  10e6, // 10MB})r.SetOffset(42) // 设置Offset// 接收消息for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))}// 程序退出前关闭Readerif err := r.Close(); err != nil {log.Fatal("failed to close reader:", err)}
}
消费者组

kafka-go支持消费者组,包括broker管理的offset。要启用消费者组,只需在 ReaderConfig 中指定 GroupID。

使用消费者组时,ReadMessage 会自动提交偏移量。

// 创建一个reader,指定GroupID,从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{"localhost:9092", "localhost:9093", "localhost:9094"},GroupID:  "consumer-group-id", // 指定消费者组idTopic:    "topic-A",MaxBytes: 10e6, // 10MB
})// 接收消息
for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}// 程序退出前关闭Reader
if err := r.Close(); err != nil {log.Fatal("failed to close reader:", err)
}

在使用消费者组时会有以下限制:

(*Reader).SetOffset 当设置了GroupID时会返回错误
(*Reader).Offset 当设置了GroupID时会永远返回 -1
(*Reader).Lag 当设置了GroupID时会永远返回 -1
(*Reader).ReadLag 当设置了GroupID时会返回错误
(*Reader).Stats 当设置了GroupID时会返回一个-1的分区

显式提交

kafka-go 也支持显式提交。当需要显式提交时不要调用 ReadMessage,而是调用 FetchMessage获取消息,然后调用 CommitMessages 显式提交。

ctx := context.Background()
for {// 获取消息m, err := r.FetchMessage(ctx)if err != nil {break}// 处理消息fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))// 显式提交if err := r.CommitMessages(ctx, m); err != nil {log.Fatal("failed to commit messages:", err)}
}

在消费者组中提交消息时,具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如,如果通过调用 FetchMessage 获取了单个分区的偏移量为 1、2 和 3 的消息,则使用偏移量为3的消息调用 CommitMessages 也将导致该分区的偏移量为 1 和 2 的消息被提交。

管理提交间隔

默认情况下,调用CommitMessages将同步向Kafka提交偏移量。为了提高性能,可以在ReaderConfig中设置CommitInterval来定期向Kafka提交偏移。

// 创建一个reader从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},GroupID:        "consumer-group-id",Topic:          "topic-A",MaxBytes:       10e6, // 10MBCommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
})

Writer

向Kafka发送消息,除了使用基于Conn的低级API,kafka-go包还提供了更高级别的 Writer 类型。大多数情况下使用Writer即可满足条件,它支持以下特性。

  • 对错误进行自动重试和重新连接。
  • 在可用分区之间可配置的消息分布。
  • 向Kafka同步或异步写入消息。
  • 使用Context的异步取消。
  • 关闭时清除挂起的消息以支持正常关闭。
  • 在发布消息之前自动创建不存在的topic。
发送消息
// 创建一个writer 向topic-A发送消息
w := &kafka.Writer{Addr:         kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:        "topic-A",Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布RequiredAcks: kafka.RequireAll,    // ack模式Async:        true,                // 异步
}err := w.WriteMessages(context.Background(),kafka.Message{Key:   []byte("Key-A"),Value: []byte("Hello World!"),},kafka.Message{Key:   []byte("Key-B"),Value: []byte("One!"),},kafka.Message{Key:   []byte("Key-C"),Value: []byte("Two!"),},
)
if err != nil {log.Fatal("failed to write messages:", err)
}if err := w.Close(); err != nil {log.Fatal("failed to close writer:", err)
}
创建不存在的topic

如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。


// 创建不存在的topic
// 如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。
func writeByWriter2() {writer := kafka.Writer{Addr:                   kafka.TCP("192.168.2.204:9092"),Topic:                  "kafka-test-topic",AllowAutoTopicCreation: true, //自动创建topic}messages := []kafka.Message{{Key:   []byte("Key-A"),Value: []byte("Hello World!"),},{Key:   []byte("Key-B"),Value: []byte("One!"),},{Key:   []byte("Key-C"),Value: []byte("Tow!"),},}const retries = 3//重试3次for i := 0; i < retries; i++ {ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()err := writer.WriteMessages(ctx, messages...)if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {time.Sleep(time.Millisecond * 250)continue}if err != nil {log.Fatal("unexpected error %v", err)}break}//关闭Writerif err := writer.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}
写入多个topic

通常,WriterConfig.Topic用于初始化单个topic的Writer。通过去掉WriterConfig中的Topic配置,分别设置每条消息的message.topic,可以实现将消息发送至多个topic。

w := &kafka.Writer{Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),// 注意: 当此处不设置Topic时,后续的每条消息都需要指定TopicBalancer: &kafka.LeastBytes{},
}err := w.WriteMessages(context.Background(),// 注意: 每条消息都需要指定一个 Topic, 否则就会报错kafka.Message{Topic: "topic-A",Key:   []byte("Key-A"),Value: []byte("Hello World!"),},kafka.Message{Topic: "topic-B",Key:   []byte("Key-B"),Value: []byte("One!"),},kafka.Message{Topic: "topic-C",Key:   []byte("Key-C"),Value: []byte("Two!"),},
)
if err != nil {log.Fatal("failed to write messages:", err)
}if err := w.Close(); err != nil {log.Fatal("failed to close writer:", err)
}

注意:Writer中的Topic和Message中的Topic是互斥的,同一时刻有且只能设置一处。

其他配置

TLS

对于基本的 Conn 类型或在 Reader/Writer 配置中,可以在Dialer中设置TLS选项。如果 TLS 字段为空,则它将不启用TLS 连接。

注意:不在Conn/Reder/Writer上配置TLS,连接到启用TLS的Kafka集群,可能会出现io.ErrUnexpectedEOF错误。

Connection
dialer := &kafka.Dialer{Timeout:   10 * time.Second,DualStack: true,TLS:       &tls.Config{...tls config...},  // 指定TLS配置
}conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{Timeout:   10 * time.Second,DualStack: true,TLS:       &tls.Config{...tls config...},  // 指定TLS配置
}r := kafka.NewReader(kafka.ReaderConfig{Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},GroupID:        "consumer-group-id",Topic:          "topic-A",Dialer:         dialer,
})
Writer

创建Writer时可以按如下方式指定TLS配置。

w := kafka.Writer{Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic:   "topic-A",Balancer: &kafka.Hash{},Transport: &kafka.Transport{TLS: &tls.Config{},  // 指定TLS配置},}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/810689.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Triton Server Python 后端优化

接上文 不使用 Docker 构建 Triton 服务器并在 Google Colab 平台上部署 HuggingFace 模型 MultiGPU && Multi Instance Config 追加 instance_group [{count: 4kind: KIND_GPUgpus: [ 0, 1 ]} ]Python Backend Triton 会根据配置信息启动四个实例&#xff0c;…

物联网数据服务平台

随着物联网技术的迅猛发展&#xff0c;海量数据的产生和应用成为推动工业数字化转型的核心动力。在这个数据为王的时代&#xff0c;如何高效地收集、处理、分析并应用这些数据&#xff0c;成为了企业关注的焦点。物联网数据服务平台应运而生&#xff0c;为企业提供了全面、高效…

CLR学习

视频链接&#xff1a;《CLR十分钟》系列之CLR运行模型_哔哩哔哩_bilibili 什么是 CLR 公共语言运行时&#xff08;Common Language Runtime CLR&#xff09; 是一个可有多种编程语言使用的 运行时&#xff0c;CLR 的核心功能&#xff08;比如 内存管理&#xff0c;程序集加载…

耐受强酸碱PFA试剂瓶高纯实验级进口聚四氟乙烯材质取样瓶

PFA取样瓶作为实验室中常备器皿耗材之一&#xff0c;主要用来盛放、储存和运输样品&#xff0c;根据使用条件不同&#xff0c;也可叫特氟龙试剂瓶、样品瓶、储样瓶、广口瓶、进样瓶等。广泛应用于半导体、新材料、多晶硅、硅材、微电子等行业。近年来随着新兴行业的快速发展&am…

【ARM 裸机】开发环境搭建

1、Ubuntu 和 Windows 文件互传 使用过程中&#xff0c;要频繁进行 Ubuntu 和 Windows 的文件互传&#xff0c;需要使用 FTP 服务&#xff1b; 1.1、开启 Ubuntu 下的 FTP 服务 //安装 FTP 服务 sudo apt-get install vsftpd //修改配置文件 sudo vi /etc/vsftpd.conf//重启…

某想主站的短信轰炸漏洞

很难想象主站居然还有这漏洞 某天的一个晚上&#xff0c;默默的打开了电脑&#xff0c;娴熟的打开了Burp suite, 看到一个很熟悉的注册登录页面&#xff0c;开始测试。 很难想象&#xff0c;还有验证码时效性&#xff0c;于是怼了半刻钟&#xff0c;终于让我逮到了他的数据包…

TechTool Pro for Mac v19.0.3中文激活版 硬件监测和系统维护工具

TechTool Pro for Mac是一款专为Mac用户设计的强大系统维护和故障排除工具。它凭借全面的功能、高效的性能以及友好的操作界面&#xff0c;赢得了广大用户的信赖和好评。 软件下载&#xff1a;TechTool Pro for Mac v19.0.3中文激活版 作为一款专业的磁盘和系统维护工具&#x…

IDEA 设置类注释模板作者、日期、描述等信息(推荐标准!)

idea注释模版配置 idea作为越来越多程序员使用的开发工具&#xff0c;平时的代码注释也非常的关键&#xff0c;类上注释和方法上注释每次换电脑或者新同事入职都要统一修改&#xff0c;找了网上好多教程都写的乱七八糟的啥都有&#xff0c;为方便统一就自己写一个操作方法&…

制氧机生产厂家如何确保氧气管道安全高效

制氧机作为生产氧气的关键设备&#xff0c;其安全性与高效性受到了广泛关注。作为制氧机生产厂家&#xff0c;确保氧气管道的安全高效运行&#xff0c;不仅是责任所在&#xff0c;更是对用户生命财产安全的有力保障。那么&#xff0c;制氧机生产厂家如何确保氧气管道安全高效生…

期货量化交易软件:MQL5 中的范畴论 (第 15 部分)函子与图论

概述 在上一篇文章中&#xff0c;我们目睹了前期文章中涵盖的概念&#xff08;如线性序&#xff09;如何视作范畴&#xff0c;以及为什么它们的“态射”在与其它范畴相关时即构成函子。在本文中&#xff0c;我们赫兹量化软件将阐述来自前期文章中的概括&#xff0c;即通过查看…

浙大恩特客户资源管理系统 i0004_openFileByStream.jsp 任意文件读取漏洞复现

0x01 产品简介 浙大恩特客户资源管理系统是一款针对企业客户资源管理的软件产品。该系统旨在帮助企业高效地管理和利用客户资源,提升销售和市场营销的效果。 0x02 漏洞概述 浙大恩特客户资源管理系统 i0004_openFileByStream.jsp接口处存在任意文件读取漏洞,未经身份验证攻…

数字证书在网络安全中的关键作用与日常应用

在当今数字化的时代&#xff0c;网络安全问题日益凸显&#xff0c;保护数据安全和用户隐私成为了人们关注的焦点。数字证书作为一种重要的网络安全技术&#xff0c;其在网络安全中扮演着关键的角色&#xff0c;并且在我们的日常生活中有着广泛的应用。现在给大家介绍简单介绍下…

在抖音做“奸商”!虽说不光彩,但能“发大财”!

打工人都知道的一句话&#xff1a;“做老板的都是周扒皮&#xff01;公司最赚钱的就是老板” 虽然手底下的员工都在骂老板压榨员工&#xff0c;但如果有一个让员工当老板的机会&#xff0c;我相信没有人会选择继续当牛做马 今天我就来给大家介绍一个&#xff1a;我正在做的“…

Flask快速搭建文件上传服务与接口

说明&#xff1a;仅供学习使用&#xff0c;请勿用于非法用途&#xff0c;若有侵权&#xff0c;请联系博主删除 作者&#xff1a;zhu6201976 一、需求背景 前端通过浏览器&#xff0c;访问后端服务器地址&#xff0c;将目标文件进行上传。 访问地址&#xff1a;http://127.0.0…

ChromeOS 中自启动 Fcitx5 和托盘 stalonetray

ChromeOS 更新的飞快&#xff0c;旧文章的方法也老是不好用&#xff0c;找遍了也没找到很好的可以开机自启动 Linux VM 和输入法、托盘的方法。 研究了一下&#xff08;不&#xff0c;是很久&#xff09;&#xff0c;终于找到个丑陋的实现。 方法基于 ChromeOS 123.0.6312.94…

淄博、哈尔滨、天水…社交媒体助推下的网红城市能“长红”吗?

烧烤卷饼带火山东传统工业小镇淄博&#xff1b; 冰雪狂欢让东北的哈尔滨在寒冬爆火&#xff1b; 一碗麻辣烫让西北天水小城变“网红”…… 在刚刚过去的清明假期&#xff0c;甘肃天水可谓是“热辣滚烫”&#xff0c;在春暖花开时节&#xff0c;迎来了属于它的春天。而被人们逐…

在Spring Boot中使用POI完成一个excel报表导入数据到MySQL的功能

最近看了自己玩过的很多项目&#xff0c;忽然发现有一个在实际开发中我们经常用到的功能&#xff0c;但是我没有正儿八经的玩过这个功能&#xff0c;那就是在Spring Boot中实现一个excel报表的导入导出功能&#xff0c;这篇博客&#xff0c;主要是围绕excel报表数据导入进行&am…

《由浅入深学习SAP财务》:第2章 总账模块 - 2.6 定期处理 - 2.6.1 月末操作:自动清账

2.6.1 月末操作&#xff1a;自动清账 清账是指会计科目的借贷挂账后的核销&#xff0c;包括客户、供应商和实行未清项管理的总账科目等。 总账模块实行未清项管理的科目有GR/IR&#xff08;Goods Receipt/Invoice Receipt&#xff09;、银行存款-清账&#xff08;较少使…

中国历年GDP统计-探数API统计

数据介绍 时间维度&#xff1a;1978年-2021年 单位&#xff1a;亿元 该数据来源于国家统计局发布的中国统计年鉴2021&#xff0c;为按当年价格计算的中国历年GDP以及人均GDP。 数据说明&#xff1a; 数据来源于国家统计局。

【教程】7代核显直通HDMI成功输出 PVE下玩AIO最有性价比的机器

大家好&#xff0c;我是村雨Mura&#xff0c;好久没写教程了&#xff0c;本期是7代核显直通&#xff0c;重点在于HDMI输出画面 本教程理论上适用于4代以后intel带核显CPU&#xff0c;如果你有直通成功经验欢迎评论区分享 前面有点啰嗦&#xff0c;想直接看教程&#xff0c;直…