springboot整合ELK+kafka采集日志

一、背景介绍

在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些问题,可能考虑采用ELK (elasticsearch+logstash+kibana)配合消息中间件去异步采集,统一展示去解决。

这里之所以要加入kafka是因为

  1. 如果直接利用logstash同步日志,则每个节点都需要部署logstash,且logstash会严重消耗性能、浪费资源;
  2. 当访问量特别高时,产生的日志速度也会特别快,kafka可以削峰限流、降低logstash的压力;
  3. 当logstash故障时消息可以存储到kafka中不会丢失。

二、 整体流程图

在这里插入图片描述

三、搭建kafka+zk环境

1、创建文件夹

mkdir /usr/elklog/kafka

2、在创建好的文件夹下创建文件docker-compose.yml

version: "2"services:zookeeper:image: docker.io/bitnami/zookeeper:3.8ports:- "2181:2181"environment:- ALLOW_ANONYMOUS_LOGIN=yesnetworks:- es_defaultkafka:image: docker.io/bitnami/kafka:3.2user: rootports:- "9092:9092"environment:- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.22:9092  #这里替换为你宿主机IP或host,在集群下,各节点会把这个地址注册到集群,并把主节点的暴露给客户端,不要注册localhost
#      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092depends_on:- zookeepernetworks:- es_default
networks:es_default:name: es_default
#    external: true
volumes:zookeeper_data:driver: localkafka_data:driver: local

3、在docker-compose.yml同级目录中输入启动命令

docker-compose up -d  

这里用的是docker-compose方式安装,安装之前需要先安装好docker和docker-compose

docker安装方式:https://blog.csdn.net/qq_38639813/article/details/129384923

docker-compose安装方式:https://blog.csdn.net/qq_38639813/article/details/129751441

四、搭建elk环境

1、拉取elk所需镜像

docker pull elasticsearch:7.10.1
docker pull kibana:7.10.1
docker pull elastic/metricbeat:7.10.1
docker pull elastic/logstash:7.10.1

2、创建文件夹:

mkdir /usr/elklog/elk
mkdir /usr/elklog/elk/logstash
mkdir /usr/elklog/elk/logstash/pipelinemkdir /usr/elklog/elk/es
mkdir /usr/elklog/elk/es/data

3、给data文件夹授权

chmod 777 /usr/elklog/elk/es/data

4、在/usr/elklog/elk/logstash/pipeline中创建logstash.conf

logstash.conf文件作用是将kafka中的日志消息获取出来 ,再推送给elasticsearch

input {kafka {bootstrap_servers => "192.168.3.22:9092"  #kafka的地址,替换为你自己的 client_id => "logstash"  auto_offset_reset => "latest"  consumer_threads => 5topics => ["demoCoreKafkaLog","webapiKafkaApp"]  #获取哪些topic,在springboot项目的logback-spring.xml中指定type => demo   #自定义
#    codec => "json"}
}output {stdout {  }elasticsearch {hosts => ["http://192.168.3.22:9200"]   #es的地址index => "demolog-%{+YYYY.MM.dd}"    #这里将会是创建的索引名,后续 kibana将会用不同索引区别#user => "elastic"#password => "changeme"}
}

5、在/usr/elklog/elk中创建docker-compose.yml

version: "2"services:elasticsearch:image: elasticsearch:7.10.1restart: alwaysprivileged: trueports:- "9200:9200"- "9300:9300"volumes:- /usr/elklog/elk/es/data:/usr/share/elasticsearch/dataenvironment:- discovery.type=single-nodenetworks:- es_defaultkibana:image: kibana:7.10.1restart: alwaysprivileged: trueports:- "5601:5601"environment:- ELASTICSEARCH_URL=http://192.168.3.22:9200depends_on:- elasticsearchnetworks:- es_defaultmetricbeat:image: elastic/metricbeat:7.10.1restart: alwaysuser: rootenvironment:- ELASTICSEARCH_HOSTS=http://192.168.3.22:9200depends_on:- elasticsearch- kibanacommand:  -E setup.kibana.host="192.168.3.22:5601" -E setup.dashboards.enabled=true -E setup.template.overwrite=false -E output.elasticsearch.hosts=["192.168.3.22:9200"] -E setup.ilm.overwrite=truenetworks:- es_defaultlogstash:image: elastic/logstash:7.10.1restart: alwaysuser: rootvolumes:- /usr/elklog/elk/logstash/pipeline:/usr/share/logstash/pipeline/  depends_on:- elasticsearch- kibananetworks:- es_default
networks:es_default:driver: bridgename: es_default

6、启动服务

docker-compose up -d

检验es是否安装成功:http://192.168.3.22:9200

在这里插入图片描述

检验kibana是否安装成功:192.168.3.22:5601
在这里插入图片描述

7、kibana设置中文

从容器中复制出kibana.yml,修改该文件,再复制回去,重启容器:

docker cp elk-kibana-1:/usr/share/kibana/config/kibana.yml kibana.yml在这个文件最后加上:     i18n.locale: "zh-CN"docker cp kibana.yml elk-kibana-1:/usr/share/kibana/config/kibana.yml重启kibana容器便可

五、springboot代码

1、引入依赖

<!-- Kafka资源的引入 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
<dependency><groupId>com.github.danielwegener</groupId><artifactId>logback-kafka-appender</artifactId><version>0.2.0-RC1</version>
</dependency>
<dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>6.4</version>
</dependency>

2、创建KafkaOutputStream

package com.elk.log;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;public class KafkaOutputStream extends OutputStream {Producer logProducer;String topic;public KafkaOutputStream(Producer producer, String topic) {this.logProducer = producer;this.topic = topic;}@Overridepublic void write(int b) throws IOException {this.logProducer.send(new ProducerRecord<>(this.topic, b));}@Overridepublic void write(byte[] b) throws IOException {this.logProducer.send(new ProducerRecord<String, String>(this.topic, new String(b, Charset.defaultCharset())));}@Overridepublic void flush() throws IOException {this.logProducer.flush();}
}

3、创建KafkaAppender

package com.elk.log;import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Layout;
import ch.qos.logback.core.OutputStreamAppender;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.util.StringUtils;import java.io.OutputStream;
import java.util.Properties;public class KafkaAppender<E> extends OutputStreamAppender<E> {private Producer logProducer;private  String bootstrapServers;private 	Layout<E> layout;private  String topic;public void setLayout(Layout<E> layout) {this.layout = layout;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}public void setTopic(String topic) {this.topic = topic;}@Overrideprotected void append(E event) {if (event instanceof ILoggingEvent) {String msg = layout.doLayout(event);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, 0,((ILoggingEvent) event).getLevel().toString(), msg);logProducer.send(producerRecord);}}@Overridepublic void start() {if (StringUtils.isEmpty(topic)) {topic = "Kafka-app-log";}if (StringUtils.isEmpty(bootstrapServers)) {bootstrapServers = "localhost:9092";}logProducer = createProducer();OutputStream targetStream = new KafkaOutputStream(logProducer, topic);super.setOutputStream(targetStream);super.start();}@Overridepublic void stop() {super.stop();if (logProducer != null) {logProducer.close();}}//创建生产者private Producer createProducer() {synchronized (this) {if (logProducer != null) {return logProducer;}Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);//判断是否成功,我们指定了“all”将会阻塞消息 0.关闭 1.主broker确认 -1(all).所在节点都确认props.put("acks", "0");//失败重试次数props.put("retries", 0);//延迟100ms,100ms内数据会缓存进行发送props.put("linger.ms", 100);//超时关闭连接//props.put("connections.max.idle.ms", 10000);props.put("batch.size", 16384);props.put("buffer.memory", 33554432);//该属性对性能影响非常大,如果吞吐量不够,消息生产过快,超过本地buffer.memory时,将阻塞1000毫秒,等待有空闲容量再继续props.put("max.block.ms",1000);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(props);}}}

4、创建logback-spring.xml,放到application.yml的同级目录

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds"><!--    <include resource="org/springframework/boot/logging/logback/base.xml"/>--><logger name="com.elk" level="info"/><!-- 定义日志文件 输入位置 --><property name="logPath" value="logs" />
<!--    <property name="logPath" value="D:/logs/truckDispatch" />--><!-- 控制台输出日志 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger -%msg%n</pattern><charset class="java.nio.charset.Charset">UTF-8</charset></encoder></appender><!-- INFO日志文件 --><appender name="infoAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>INFO</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 文件名称 --><fileNamePattern>${logPath}\%d{yyyyMMdd}\info.log</fileNamePattern><!-- 文件最大保存历史天数 --><MaxHistory>30</MaxHistory></rollingPolicy><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern><charset class="java.nio.charset.Charset">UTF-8</charset></encoder></appender><!-- DEBUG日志文件 --><appender name="debugAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>DEBUG</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 文件名称 --><fileNamePattern>${logPath}\%d{yyyyMMdd}\debug.log</fileNamePattern><!-- 文件最大保存历史天数 --><MaxHistory>30</MaxHistory></rollingPolicy><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern><charset class="java.nio.charset.Charset">UTF-8</charset></encoder></appender><!-- WARN日志文件 --><appender name="warnAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>WARN</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 文件名称 --><fileNamePattern>${logPath}\%d{yyyyMMdd}\warn.log</fileNamePattern><!-- 文件最大保存历史天数 --><MaxHistory>30</MaxHistory></rollingPolicy><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern><charset class="java.nio.charset.Charset">UTF-8</charset></encoder></appender><!-- ERROR日志文件 --><appender name="errorAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>ERROR</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 文件名称 --><fileNamePattern>${logPath}\%d{yyyyMMdd}\error.log</fileNamePattern><!-- 文件最大保存历史天数 --><MaxHistory>30</MaxHistory></rollingPolicy><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern><charset class="java.nio.charset.Charset">UTF-8</charset></encoder></appender><!--  往kafka推送日志  --><appender name="kafkaAppender" class="com.elk.log.KafkaAppender"><!-- kafka地址 --><bootstrapServers>192.168.3.22:9092</bootstrapServers><!-- 配置topic --><topic>demoCoreKafkaLog</topic><!-- encoder负责两件事,一是将一个event事件转换成一组byte数组,二是将转换后的字节数据输出到文件中 --><encoder><pattern>${HOSTNAME} %date [%thread] %level %logger{36} [%file : %line] %msg%n</pattern><charset>utf8</charset></encoder><!-- layout主要的功能就是:将一个event事件转化为一个String字符串 --><layout class="ch.qos.logback.classic.PatternLayout"><pattern>${HOSTNAME} %date [%thread] %level %logger{36} [%file : %line] %msg%n</pattern></layout></appender><!--  指定这个包的日志级别为error  --><logger name="org.springframework" additivity="false"><level value="ERROR" /><!-- 控制台输出 -->
<!--        <appender-ref ref="STDOUT" />--><appender-ref ref="errorAppender" /></logger><!-- 由于启动的时候,以下两个包下打印debug级别日志很多 ,所以调到ERROR--><!--  指定这个包的日志级别为error  --><logger name="org.apache.tomcat.util" additivity="false"><level value="ERROR"/><!-- 控制台输出 -->
<!--        <appender-ref ref="STDOUT"/>--><appender-ref ref="errorAppender"/></logger><!-- 默认spring boot导入hibernate很多的依赖包,启动的时候,会有hibernate相关的内容,直接去除 --><!--  指定这个包的日志级别为error  --><logger name="org.hibernate.validator" additivity="false"><level value="ERROR"/><!-- 控制台输出 -->
<!--        <appender-ref ref="STDOUT"/>--><appender-ref ref="errorAppender"/></logger><!--  监控所有包,日志输入到以下位置,并设置日志级别  --><root level="WARN"><!--INFO--><!-- 控制台输出 --><appender-ref ref="STDOUT"/><!-- 这里因为已经通过kafka往es中导入日志,所以就没必要再往日志文件中写入日志,可以注释掉下面四个,提高性能 --><appender-ref ref="infoAppender"/><appender-ref ref="debugAppender"/><appender-ref ref="warnAppender"/><appender-ref ref="errorAppender"/><appender-ref ref="kafkaAppender"/></root>
</configuration>

5、配置文件无需任何修改

server:tomcat:uri-encoding: UTF-8max-threads: 1000min-spare-threads: 30port: 8087connection-timeout: 5000msservlet:context-path: /

6、编写测试类

package com.elk.log;import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {@GetMapping("/testLog")public String testLog() {log.warn("gotest");return "ok";}@GetMapping("/testLog1")public Integer testLog1() {int i = 1/0;return i;}
}

六、利用kibana查看日志

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

注意:这里的索引名字就是logstash.conf中创建的索引名,出现这个也意味着整个流程成功

在这里插入图片描述

在这里插入图片描述
此时索引模式创建完毕,我创建的索引模式名字是demo*

在这里插入图片描述
在这里插入图片描述
这时就可以看到日志了,可以进一步调用测试接口去验证,我这里不在展示,至此全部完毕

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/10151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工业自动化数据485采集网关支持modbus协议

S475/S475E系列是一种功能丰富的设备&#xff0c;支持多种通信方式和输入输出功能。以下是对其功能的详细介绍&#xff1a; 通信方式&#xff1a; S475/S475E系列支持多种通信方式&#xff0c;包括短信、RS485、语音拨号、GPRS/3G/4G无线数据网络四种告警方式。这意味着用户可…

AXI协议之AXILite开发设计(二)

微信公众号上线&#xff0c;搜索公众号小灰灰的FPGA,关注可获取相关源码&#xff0c;定期更新有关FPGA的项目以及开源项目源码&#xff0c;包括但不限于各类检测芯片驱动、低速接口驱动、高速接口驱动、数据信号处理、图像处理以及AXI总线等 二、AXI-Lite关键代码分析 1、时钟与…

营销系统积分数据库设计

营销系统总体数据-业务功能模型 在当今日益竞争的市场中&#xff0c;如何提高客户留存率和忠诚度&#xff0c;已成为各大企业迫切需要解决的问题。而积分商城/系统作为一种新型的营销方式&#xff0c;受到青睐。 积分商城/系统是指将用户在使用产品或服务时产生的积分&#xf…

X - Transformer

回顾 Transformer 的发展 Transformer 最初是作为机器翻译的序列到序列模型提出的&#xff0c;而后来的研究表明&#xff0c;基于 Transformer 的预训练模型&#xff08;PTM&#xff09; 在各项任务中都有最优的表现。因此&#xff0c;Transformer 已成为 NLP 领域的首选架构&…

mac m1 触控栏TouchBar功能栏异常

电脑可能在高温下运行时间过长&#xff0c;导致TouchBar之前正常显示的调整屏幕亮度与调整声音等功能的按钮均丢失&#xff0c;然后看了一眼键盘设置&#xff0c;设置也是正常的&#xff0c;已勾选显示功能栏 下面请看 如何在MacBook Pro&#xff08;macOS Monterey&#xff0…

基于Java+SpringBoot+vue前后端分离社区医院信息平台设计实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

【C++】智能指针

文章目录 1. 为什么需要智能指针&#xff1f;2. 智能指针的使用智能指针的常见问题1.使用对象的生命周期去控制资源2. 像指针一样使用3. 拷贝问题auto_ptr ——管理权转移unique_ptr ——防拷贝C98版本C11版本 shared_ptr (根本解决拷贝问题)赋值代码实现 weak_ptr —— 循环引…

RocketMQ教程-(4)-领域模型-消费者(Consumer)

本文介绍 Apache RocketMQ 中消费者&#xff08;Consumer&#xff09;的定义、模型关系、内部属性、行为约束、版本兼容性及使用建议。 定义​ 消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中&#xff0c;从 Apache RocketMQ 服务…

【JVM】详解对象的创建过程

文章目录 1、创建对像的几种方式1、new关键字2、反射3、clone4、反序列化 2、创建过程步骤 1、检查类是否已经被加载步骤 2、 为对象分配内存空间1、指针碰撞针对指针碰撞线程不安全&#xff0c;有两种方案&#xff1a; 2、空闲列表选择哪种分配方式 步骤3、将内存空间初始化为…

如何在armv6 armv7 armv8(aarch64)嵌入式板子上面安装nginx服务器,支持H265码流

如何在armv6 armv6 armv8 aarch64 嵌入式板子上面安装nginx服务器支持推送H265的视频流 开始吧 一&#xff0c;准备工作二&#xff0c;configure时遇到的出错问题1、checking for C compiler … found but is not working2&#xff0c;error: can not detect int size3&#xf…

【博客682】k8s apiserver bookmarks机制以更高效检测变更

k8s apiserver bookmarks机制以更高效检测变更 list-watch背景&#xff1a; List-Watch 是kubernetes中server和client通信的最核心的机制&#xff0c; 比如说api-server监听etcd&#xff0c; kubelet监听api-server&#xff0c; scheduler监听api-server等等&#xff0c;其实…

Jmeter场景设置与监听

Jmeter场景设置 场景的概念: 场景是用来尽量真实模拟用户操作的工作单元&#xff0c;场景设计源自于用户真实操作。 场景设计: 场景设计是根据收集分析用户的实际操作而定义的Jmeter脚本的执行策略。 性能测试中涉及的基本场景有两种&#xff0c;即单一业务场景和混合业务场景…

链表是否有环、环长度、环起点

问题引入 如何检测一个链表是否有环&#xff0c;如果有&#xff0c;那么如何确定环的长度及起点。 引自博客&#xff1a;上述问题是一个经典问题&#xff0c;经常会在面试中被问到。我之前在杭州一家网络公司的电话面试中就很不巧的问到&#xff0c;当时是第一次遇到那个问题&…

RocketMQ集群4.9.2升级4.9.6版本

本文主要记录生产环境短暂停机升级RocketMQ版本的过程 一、整体思路 1.将生产环境MQ4.9.2集群同步到测试环境&#xff0c;并启动&#xff0c;确保正常运行。 2.参照4.9.2配置4.9.6集群 3.停掉4.9.2集群&#xff0c;启动4.9.6集群&#xff0c;测试确保正常运行。 4.停掉4.9.6集…

git使用教程

一 创建环境 参考 Git 安装配置 | 菜鸟教程 (runoob.com)https://www.runoob.com/git/git-install-setup.html 1.1 配置 $ git config --global user.name "runoob" $ git config --global user.email test@runoob.com 1.2 创建一个新文件夹 在新的文件夹执行(…

文件上传漏洞总结2

文件上传的大体都已经学习过了 这个假期在给他强化一下 什么是webshell webshell是web入侵的脚本攻击工具。webshell就是一个asp或php木马后门&#xff0c;黑客在入侵了一个网站后&#xff0c;常常在将这些asp或php木马后门文件放置在网站服务器的web目录中&#xff0c;与正常…

本地缓存LoadingCache

引入依赖 <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.1.1-jre</version> </dependency>主要代…

(三)InfluxDB入门(借助Web UI)

以下内容来自 尚硅谷&#xff0c;写这一系列的文章&#xff0c;主要是为了方便后续自己的查看&#xff0c;不用带着个PDF找来找去的&#xff0c;太麻烦&#xff01; 第 3 章 InfluxDB入门&#xff08;借助Web UI&#xff09; 借助Web UI&#xff0c;我们可以更好地理解InfluxD…

C语言入门篇(八)

前言   本篇分享的是部分操作符的概念与用法&#xff0c;从经典例题入手&#xff0c;带你快速了解和掌握。   收录专栏&#xff1a;浅谈C语言 操作符详解上 1. 操作符分类2. 算术操作符3. 移位操作符3.1 左移操作符3.2 右移操作符 4. 位操作符5. 赋值操作符6. 单目操作符6.…

Excel双向柱状图的绘制

Excel双向柱状图在绘制增减比较的时候经常用到&#xff0c;叫法繁多&#xff0c;双向柱状图、上下柱状图、增减柱状图都有。 这里主要介绍一下Excel的基础绘制方法和复杂一点的双向柱状图的绘制 基础双向柱状图的绘制 首先升降的数据如下&#xff1a; 月份上升下降20220359-…