文章目录
- 一、前言
- 二、项目背景
- 三、实现方案
- 四、思路延伸
- 1. 优先级队列
- 1.1 concurrent 包下的 PriorityBlockingQueue
- 1.2 Redisson 的优先级阻塞队列
- 2. jvisualvm 远程连接
- 3. Jstack 高 CPU 排查
- 五、参考内容
一、前言
本系列用来记录一些在实际项目中的小东西,并记录在过程中想到一些小东西,因为是随笔记录,所以内容不会过于详细。
二、项目背景
项目存在一个功能:对PDF文件进行压缩,且要求PDF 每页大小小于400KB。由于无法判断PDF 每页的大小,所以项目实现方案是将PDF 每页读取后转成图片再进行压缩到合适大小,最后将压缩后的图片再重新生成为 PDF。
在一通实现(东抄西抄 )后,上述功能实现后便直接上线,但是上线后暴露出如下问题:
- 对于多页数 PDF 的压缩效率太低:由于无法判定PDF每页是否满足大小,所以只能将PDF每页都进行 转图片、压缩、转PDF的操作。对于客户动辄50+页数的PDF,处理效率太低。并且由于存在压缩超时的判定限制,大页数PDF极有可能被判定为压缩超时。
为了解决上述问题,准备开启多线程以页为维度进行压缩,提高多页数PDF的解析效率。但经过测试,上述的PDF处理过程极其耗费资源,本地在测试时直接OOM,因此也要控制并发量。
三、实现方案
基础实现依托于下面的工具类,调用 ImgToPdfUtils#compressPdf 方法可完成压缩功能。
package com.kingfish.springcommondemo.docs;import com.google.common.collect.Lists;
import com.itextpdf.text.Document;
import com.itextpdf.text.Image;
import com.itextpdf.text.PageSize;
import com.itextpdf.text.pdf.*;
import com.kingfish.common.api.CommonBizException;
import com.kingfish.common.utils.ThreadUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.rendering.PDFRenderer;import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;/*** @Author : kingfish* @Email : kingfishx@163.com* @Date : 2024/1/28 15:16* @Desc :*/
@Slf4j
public class PdfCompressUtils {/*** 每页最大*/private static final int SINGLE_PDF_MAX_SIZE = 350 * 1000;/*** PDF 压缩** @param pdfBytes* @return*/public static byte[] syncCompressPdf(byte[] pdfBytes) throws Exception {final List<byte[]> imageBytesList = pdf2Images(pdfBytes);ByteArrayOutputStream bos = new ByteArrayOutputStream();Document doc = new Document();PdfCopy pdfCopy = new PdfCopy(doc, bos);pdfCopy.setFullCompression();pdfCopy.setCompressionLevel(PdfStream.BEST_COMPRESSION);doc.open();imageBytesList.forEach(imageBytes ->copyFileToPdf(pdfCopy, compressImage2Pdf(imageBytes, SINGLE_PDF_MAX_SIZE)));pdfCopy.close();doc.close();return bos.toByteArray();}/*** PDF 压缩** @param pdfBytes* @return*/public static byte[] asyncCompressPdf(byte[] pdfBytes) throws Exception {final List<byte[]> imageBytesList = pdf2Images(pdfBytes);ByteArrayOutputStream bos = new ByteArrayOutputStream();Document doc = new Document();PdfCopy pdfCopy = new PdfCopy(doc, bos);pdfCopy.setFullCompression();pdfCopy.setCompressionLevel(PdfStream.BEST_COMPRESSION);doc.open();final List<CompletableFuture<byte[]>> completableFutures =imageBytesList.stream().map(imageBytes ->CompletableFuture.supplyAsync(() ->compressImage2Pdf(imageBytes, SINGLE_PDF_MAX_SIZE),ThreadUtil.getIoPool())).collect(Collectors.toList());CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();completableFutures.stream().filter(Objects::nonNull).map(CompletableFuture::join).forEach(pageBytes -> copyFileToPdf(pdfCopy, pdfBytes));pdfCopy.close();doc.close();return bos.toByteArray();}/*** pdf 转图片** @param pdfBytes* @return*/private static List<byte[]> pdf2Images(byte[] pdfBytes) {List<byte[]> results = Lists.newArrayList();try (PDDocument document = PDDocument.load(pdfBytes)) {PDFRenderer renderer = new PDFRenderer(document);for (int i = 0; i < document.getNumberOfPages(); ++i) {// DPI 越大,清晰度越高BufferedImage bufferedImage = renderer.renderImageWithDPI(i, 300);ByteArrayOutputStream out = new ByteArrayOutputStream();ImageIO.write(bufferedImage, "jpeg", out);results.add(out.toByteArray());}} catch (IOException e) {log.error("pdf转图片出错", e);}return results;}/*** 生成PDF** @param pdfCopy* @param pdfBytes* @throws IOException* @throws BadPdfFormatException*/@SneakyThrowspublic static void copyFileToPdf(PdfCopy pdfCopy, byte[] pdfBytes) {PdfReader reader = new PdfReader(pdfBytes);int totalPages = reader.getNumberOfPages();for (int j = 1; j <= totalPages; j++) {pdfCopy.addPage(pdfCopy.getImportedPage(reader, j));}reader.close();}/*** 图片压缩** @param imageBytes* @param maxSize* @return*/public static byte[] compressImage2Pdf(byte[] imageBytes, int maxSize) {try (ByteArrayOutputStream resultBos = new ByteArrayOutputStream()) {Document document;if (imageBytes.length > maxSize) {// 递归压缩imageBytes = compressImageCycle(imageBytes, maxSize, 0);}// 绘制图片转为 PDFImage image = Image.getInstance(imageBytes);image.setCompressionLevel(PdfStream.BEST_COMPRESSION);float scaledWidth = image.getScaledWidth();float scaledHeight = image.getScaledHeight();if (scaledWidth > scaledHeight) {image.scaleToFit(842.0F, 575.0F);document = new Document(PageSize.A4.rotate(), 0, 0, 0, 0);} else {image.scaleToFit(575.0F, 842.0F);document = new Document(PageSize.A4, 0, 0, 0, 0);}PdfWriter writer = PdfWriter.getInstance(document, resultBos);writer.setCompressionLevel(PdfStream.BEST_COMPRESSION);writer.setFullCompression();document.open();document.add(image);document.close();return resultBos.toByteArray();} catch (Exception exception) {throw new CommonBizException(exception);}}/*** @param bytes 原图片字节数组* @return*/private static byte[] compressImageCycle(byte[] bytes, int maxSize, int cycle) throws IOException {double accuracy = getAccuracy(bytes.length / 1000);//计算宽高BufferedImage bim = ImageIO.read(new ByteArrayInputStream(bytes));int imgWidth = bim.getWidth();int imgHeight = bim.getHeight();int desWidth = new BigDecimal(imgWidth).multiply(new BigDecimal(accuracy)).intValue();int desHeight = new BigDecimal(imgHeight).multiply(new BigDecimal(accuracy)).intValue();// 构造一个类型为预定义图像类型之一的 BufferedImageBufferedImage tag = new BufferedImage(desWidth, desHeight, BufferedImage.TYPE_INT_RGB);// 这边是压缩的模式设置tag.getGraphics().drawImage(bim.getScaledInstance(desWidth, desHeight, java.awt.Image.SCALE_SMOOTH), 0, 0,null);//将图片按JPEG压缩,保存到out中ByteArrayOutputStream baos = new ByteArrayOutputStream();ImageIO.write(tag, "jpeg", baos);cycle++;int srcSize = baos.size();if (srcSize > maxSize && cycle < 6) {
// log.info(srcSize / 1000 + "KB文件大于" + maxSize / 1000 + "KB,第" + (cycle + 1) + "次进行压缩");return compressImageCycle(baos.toByteArray(), maxSize, cycle);}return baos.toByteArray();}/*** 自动调节精度** @param size 源图片大小* @return 图片压缩质量比*/private static double getAccuracy(long size) {double accuracy;if (size < 900) {accuracy = 0.85;} else if (size < 2047) {accuracy = 0.6;} else if (size < 3275) {accuracy = 0.44;} else {accuracy = 0.4;}return accuracy;}
}
使用20个PDF 文件 模拟测试调用,方法如下:
public class PdfDemoMain {static {LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();List<Logger> loggerList = loggerContext.getLoggerList();loggerList.forEach(logger -> {logger.setLevel(Level.INFO);});}public static void main(String[] args) throws Exception {sync();async();}/*** 同步调用*/private static void sync() {final File[] files = new File("C:\\Users\\Administrator\\Desktop\\compress\\压缩前").listFiles();FileUtil.del("C:\\Users\\Administrator\\Desktop\\compress\\压缩后\\");StopWatch stopWatch = new StopWatch();stopWatch.start();// 模拟并发调用final CompletableFuture[] cfs = Arrays.stream(files).map(file ->CompletableFuture.runAsync(() -> {try {// 同步压缩byte[] compressBytes = PdfCompressUtils.syncCompressPdf(FileUtil.readBytes(file));FileUtil.writeBytes(compressBytes, "C:\\Users\\Administrator\\Desktop\\compress\\压缩后\\" + file.getName() + ".pdf");} catch (Exception e) {throw new RuntimeException(e);}})).toArray(CompletableFuture[]::new);CompletableFuture.allOf(cfs).join();stopWatch.stop();System.out.println("同步花费时长: " + stopWatch.getTotalTimeSeconds());}/*** 异步调用*/private static void async() {final File[] files = new File("C:\\Users\\Administrator\\Desktop\\compress\\压缩前").listFiles();FileUtil.del("C:\\Users\\Administrator\\Desktop\\compress\\压缩后\\");StopWatch stopWatch = new StopWatch();stopWatch.start();// 模拟并发调用final CompletableFuture[] cfs = Arrays.stream(files).map(file ->CompletableFuture.runAsync(() -> {try {// 异步压缩byte[] compressBytes = PdfCompressUtils.asyncCompressPdf(FileUtil.readBytes(file));FileUtil.writeBytes(compressBytes, "C:\\Users\\Administrator\\Desktop\\compress\\压缩后\\" + file.getName() + ".pdf");} catch (Exception e) {throw new RuntimeException(e);}})).toArray(CompletableFuture[]::new);CompletableFuture.allOf(cfs).join();stopWatch.stop();System.out.println("异步花费时长: " + stopWatch.getTotalTimeSeconds());}
}
测试结果如下:
可以看到效率具有非常明显的提升,但是需要注意的是:
- PdfCompressUtils 的异步方法并非是所有情况都适用。当观察机器的CPU,如果CPU本身已经接近满载,再使用异步方法可能并不会提升效率。
- PdfCompressUtils 压缩方法的目的是将PDF 每页大小压缩尽量接近400Kb,因此压缩后的PDF大小可能会变更大,因为原先一页可能只有20KB,压缩后可能变成了380KB。
- PdfCompressUtils 压缩过程中对于PDF 可能会存在多次压缩,因为无法把握合适的压缩系数。比如500KBPDF 压缩一次后可能是420KB,则需要再次压缩。
四、思路延伸
PdfCompressUtils 中提供的压缩功能个人感觉并非最优解,效率低且占用资源非常高,在本地测试的时候因为过高的并发度导致 OOM 的发生,因此还需要控制 PdfCompressUtils 方法请求的并发度,否则可能会导致OOM的发送。而并发度的控制有两个方面:
- 异步压缩开启的线程数量:这个可以直接通过 ThreadUtil.getIoPool() 的线程池来控制。
- 服务接口被请求的并发控制:这个首先想到的就是队列,对于有优先级要求的情况下,单节点的情况下可以使用java.util.concurrent.PriorityBlockingQueue,而多节点则可以使用 Redisson 提供的 PriorityBlockingQueue。
1. 优先级队列
1.1 concurrent 包下的 PriorityBlockingQueue
下面以 java.util.concurrent.PriorityBlockingQueue 为例,在 PdfCompressUtils 中增加如下内容:
@Slf4j
public class PdfCompressUtils {.../*** 文件队列*/private static final PriorityBlockingQueue<PriorityFile> FILE_QUEUES = Queues.newPriorityBlockingQueue();static {// 可以通过线程池的线程数量来控制队列消费的并发度,这里使用单线程是为了方便测试final ExecutorService executorService =Executors.newSingleThreadExecutor();executorService.submit(PdfCompressUtils::runCompressTask);}private static void runCompressTask() {while (true) {try {// 睡眠10s 也是为了方便测试,让所有PDF都入队后再出队Thread.sleep(10000);// 从队列中取出final PriorityFile priorityFile = FILE_QUEUES.take();log.info("文件 {} 优先级为 {} 从队列中取出", priorityFile.getFile().getName(), priorityFile.getPriority());// 压缩final byte[] results = asyncCompressPdf(FileUtil.readBytes(priorityFile.getFile()));// 压缩完成回调priorityFile.getCallback().accept(results);} catch (Exception e) {// TODO : 文件压缩失败之后的处理}}}/*** PDF 压缩* @param file 要压缩的PDF* @param priority 优先级,越大优先级越高* @param callback 压缩回调,因为附件的优先级可能比较低导致一直没有压缩,因此使用回调的方式,当附件压缩完成时调用 callback 方法* @throws Exception*/public static void asyncCompressPdf(File file, int priority, Consumer<byte[]> callback) throws Exception {log.info("文件 {} 优先级为 {} 投递到队列", file.getName(), priority);FILE_QUEUES.offer(new PriorityFile(file, priority, callback));}/*** 优先级文件*/@Getterstatic class PriorityFile implements Comparable<PriorityFile> {/*** 文件*/private File file;/*** 优先级*/private int priority;/*** 结果回调*/private Consumer<byte[]> callback;public PriorityFile(File file, int priority, Consumer<byte[]> callback) {this.file = file;this.priority = priority;this.callback = callback;}@Overridepublic int compareTo(PriorityFile o) {return o.getPriority() - this.priority;}}...}
可以看到压缩结果是按照优先级的顺序压缩的
总结:
- 通过静态代码块中的线程池来控制压缩的并发度,防止OOM
- 通过FILE_QUEUES 来控制压缩的优先级。
- 这里其实存在一个问题,当PDF入队后,服务宕机重启,队列中就没有该PDF记录了,解决方式可以是在当PDF入队后将PDF记录到Redis 或数据库中,当压缩成功后再移除,每次服务启动时加载Redis或数据库中的PDF压缩记录即可。
1.2 Redisson 的优先级阻塞队列
如果要使用 Redisson 的 优先级阻塞队列,则进行如下改造:
@Slf4j
public class PdfCompressUtils {...private static RPriorityBlockingQueue<PriorityFile> FILE_QUEUES = null;private static RedissonClient REDISSON_CLIENT = null;static {Config config = new Config();config.useSingleServer().setTimeout(1000000).setDatabase(0).setAddress("redis://127.0.0.1:6379");REDISSON_CLIENT = Redisson.create(config);FILE_QUEUES = REDISSON_CLIENT.getPriorityBlockingQueue("FILE_QUEUE");// 可以通过线程池的线程数量来控制队列消费的并发度,这里使用单线程是为了方便测试final ExecutorService executorService =Executors.newSingleThreadExecutor();executorService.submit(PdfCompressUtils::runCompressTask);}private static void runCompressTask() {while (true) {try {// 睡眠10s 也是为了方便测试,让所有PDF都入队后再出队Thread.sleep(10000);final PriorityFile priorityFile = FILE_QUEUES.take();final File file = priorityFile.getFile();log.info("文件 {} 优先级为 {} 从队列中取出", file.getName(), priorityFile.getPriority());final byte[] results = asyncCompressPdf(FileUtil.readBytes(file));// 因为队列内容需要序列化到Redis中,所以无法使用回调函数,因此结果处理直接在这里处理FileUtil.writeBytes(results, "C:\\Users\\Administrator\\Desktop\\compress\\压缩后\\" + file.getName() + ".pdf");log.info("文件 {} 压缩完成", file.getName());} catch (Exception e) {// TODO : 文件压缩失败之后的处理log.info("文件压缩失败", e);}}}/*** PDF 压缩** @param file* @param priority* @throws Exception*/public static void asyncCompressPdf(File file, int priority) throws Exception {log.info("文件 {} 优先级为 {} 投递到队列", file.getName(), priority);FILE_QUEUES.offer(new PriorityFile(file, priority));}/*** 优先级文件*/@Getterstatic class PriorityFile implements Comparable<PriorityFile>, Serializable {/*** 文件*/private File file;/*** 优先级*/private int priority;public PriorityFile() {}public PriorityFile(File file, int priority) {this.file = file;this.priority = priority;}@Overridepublic int compareTo(PriorityFile o) {return o.getPriority() - this.priority;}}...}
总结:
-
如果真的要使用,Redis 中建议保存的是文件上传到 OSS的文件地址或者数据库中Id,而不是直接保存 File 或 byte[]
-
Redisson 的优先级阻塞队列使用的并不是 sorted set 做数据结构, 而是使用 list 结构。这一点可以在元素入队时看到,如下:
@Overridepublic boolean offer(V e) {return add(e);}@Overridepublic boolean add(V value) {lock.lock();try {checkComparator();// 二分查找,根据优先级确定当前元素入队的位置BinarySearchResult<V> res = binarySearch(value, codec);int index = 0;if (res.getIndex() < 0) {index = -(res.getIndex() + 1);} else {index = res.getIndex() + 1;}// lua 语句保证并发性在队列指定位置插入元素commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID, "local len = redis.call('llen', KEYS[1]);"+ "if tonumber(ARGV[1]) < len then "+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"+ "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);"+ "return;"+ "end;"+ "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.<Object>asList(getName()), index, encode(value));return true;} finally {lock.unlock();}}
2. jvisualvm 远程连接
jvisualvm 是 JDK 提供的监控 Java 程序的工具,当使用如下命令进行启动服务时,可以使用 jvisualvm 远程连接服务。(这块内容如有需要可详参
https://blog.csdn.net/zhou920786312/article/details/123572662)
java - jar -Djava.rmi.server.hostname=[serverIp] -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=[serverPort] -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false demo.jar
这里注意两个变量:
- serverIp 指的是当前服务部署的机器的ip地址
- serverPort 指的是当前服务部署的机器暴露给外部的端口,jvisualvm 将通过此端口来远程连接。所以服务器需要开放serverPort 端口,否则也是无法连接。
以下为举例:
-
使用该命令启动服务, 其中 192.168.72.128 为服务器本地地址, 10081 为服务器暴露的端口
[root@localhost app]# java -jar -Djava.rmi.server.hostname=192.168.72.128 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10081 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false spring-simple-demo-1.0-SNAPSHOT.jar
-
jvisualvm 建立连接 : 如下,直接建立连接,名称可以随便写
建立后再新增 JMX 连接,如下,连接内容填写服务器地址和暴露的端口。
连接后的效果如下:
3. Jstack 高 CPU 排查
在使用上述 PDF 压缩时,会出现 CPU 使用率过高的情况,借此再总结下通过 jstack 命令排查 CPU使用率过高的情况,如下(这块内容如有需要详参 https://blog.csdn.net/weixin_44588186/article/details/124680586):
-
top
:通过 top 命令确定 服务器上 CPU 占用较高的进程是哪个, top 命令默认按照CPU 排序,可以铜鼓 top c 可以更清晰的看到进程信息,如下:
-
top -Hp pid
:确定好哪个进程 CPU 占用高后,可以通过 top -Hp pid 命令查看指定进程的每个线程的 CPU 占用情况。需要注意的是top -Hp pid
中的 pid 指的是第一步中确定的进程的pid,而 命令输出中的 pid 则是指的进程中的 线程id。如下图中
top -Hp 2150
, 这里的 2150 指的是进程id,而输出中CPU 占用最高的 PID 为 2183,这个PID 为 线程id。 -
jstack pid
: 通过 jstack pid 可以查看线程的具体信息( 可以通过jstack pid >/tmp/log.txt
命令将内容输出到文件)。但我在本地测试的时候输出如下信息,无法输出正常数据,但所幸服务本身日志输出了对应内容 (因此次问题这里不做深究)。
jstack 命令正常输出线程信息如下:
-
分析堆栈信息 :将
top -Hp pid
记录下来的pid 转为十六进制,去 jstack日志文件中找,可以找到对应线程的代码,从而修改代码。
如 上面通过top -Hp 2150
命令确定 pid 为 2183 的线程CPU占用较高,所以将 2183 转为十六进制为887,在 jstack 的日志中搜索 887 ,便可以根据搜索结果可以确定问题代码
五、参考内容
https://blog.csdn.net/zhou920786312/article/details/123572662
https://blog.csdn.net/weixin_44588186/article/details/124680586