项目场景:
商品改价:商品改价中通过多线程批量处理经过 Lists.partition拆分的集合对象
问题描述
商品改价中通过多线程批量处理经过 Lists.partition拆分的集合对象,发现偶尔会报
java.util.ConcurrentModificationException: nullat java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1231)at java.util.ArrayList$SubList.spliterator(ArrayList.java:1235)at java.util.Collection.stream(Collection.java:581)
代码实现逻辑
for (Map.Entry<Integer, List<PriceHandle>> partnerIdProductId : priceHandleGroup.entrySet()) {Integer partnerId = partnerIdProductId.getKey();List<PriceHandle> priceHandles = partnerIdProductId.getValue();log.info("当前分片={},开始同步partnerId={} size={}", shardingItem, partnerId, priceHandles.size());List<List<PriceHandle>> pidsGroup = Lists.partition(priceHandles, splitSize);log.info("partnerId={},分为{}组,每组{}条", partnerId, pidsGroup.size(), splitSize);CountDownLatch countDownLatch = new CountDownLatch(pidsGroup.size());for (List<PriceHandle> priceHandlesLittle : pidsGroup) {executorService.submit(() -> {try {List<Long> productIds = priceHandlesLittle.stream().map(PriceHandle::getProductId).collect(Collectors.toList());List<ProductMapEntity> productMapEntities = productMappingAllPartnerService.getProductMapListByPartnerIdAndProductIds(partnerId, productIds);//没有映射的失败不处理List<Long> mapProductIds = productMapEntities.stream().map(ProductMapEntity::getProductId).collect(Collectors.toList());List<PriceHandle> noMapPriceHandles = priceHandlesLittle.stream().filter(e -> !mapProductIds.contains(e.getProductId())).collect(Collectors.toList());if (CollectionUtils.isNotEmpty(noMapPriceHandles)) {jingdongPriceSyncService.updateBatch(noMapPriceHandles, EventStatus.HANDLED_NO_NEED.eventStatus, "商品无映射");}priceHandlesLittle.removeAll(noMapPriceHandles);// 过滤productPool中不存在的重复品filterRepeatProductId(productMapEntities, priceHandlesLittle);if (CollectionUtils.isEmpty(productMapEntities) || CollectionUtils.isEmpty(priceHandlesLittle)) {log.info("过滤productPool中不存在的重复品");return;}PriceSyncService.synPrice(partnerId, productMapEntities, false, priceHandlesLittle);} catch (Exception e) {log.error("同步价格异常,当前处理的店铺:{}", partnerId, e);} finally {countDownLatch.countDown();}});}try {countDownLatch.await();} catch (InterruptedException e) {log.error("countDownLatch.await error", e);Thread.currentThread().interrupt();}}
原因分析:
这里需要注意的是在使用Google Guava库中的Lists.partition方法时,如果对原始列表进行了修改,则可能会导致ConcurrentModificationException异常的抛出。这是因为Lists.partition方法返回的是原始列表的视图,而不是副本。因此,如果在对视图进行操作时修改了原始列表,则会导致ConcurrentModificationException异常的抛出。
因此在我们的这段代码中 问题在于又通过拆分后的subList做了removeAll的操作
具体复现
解决方案:
方法1不要再for循环中基于视图 修改修改原始列表
方法2 在for循环中新建一个list对象来包装原来的list
具体实现代码
for (Map.Entry<Integer, List<PriceHandle>> partnerIdProductId : priceHandleGroup.entrySet()) {Integer partnerId = partnerIdProductId.getKey();List<PriceHandle> priceHandles = partnerIdProductId.getValue();log.info("当前分片={},开始同步partnerId={} size={}", shardingItem, partnerId, priceHandles.size());List<List<PriceHandle>> pidsGroup = Lists.partition(priceHandles, splitSize);log.info("partnerId={},分为{}组,每组{}条", partnerId, pidsGroup.size(), splitSize);CountDownLatch countDownLatch = new CountDownLatch(pidsGroup.size());for (List<PriceHandle> priceHandlesLittle : pidsGroup) {// 由于存在并发remove,需要重新赋值新ArrayList,防止ArrayList$SubList引起的ConcurrentModificationExceptionList<PriceHandle> copyPriceHandlesLittle = new ArrayList<>(priceHandlesLittle);executorService.submit(() -> {try {List<Long> productIds = copyPriceHandlesLittle.stream().map(PriceHandle::getProductId).collect(Collectors.toList());List<ProductMapEntity> productMapEntities = productMappingAllPartnerService.getProductMapListByPartnerIdAndProductIds(partnerId, productIds);//没有映射的失败不处理List<Long> mapProductIds = productMapEntities.stream().map(ProductMapEntity::getProductId).collect(Collectors.toList());List<PriceHandle> noMapPriceHandles = copyPriceHandlesLittle.stream().filter(e -> !mapProductIds.contains(e.getProductId())).collect(Collectors.toList());if (CollectionUtils.isNotEmpty(noMapPriceHandles)) {PriceSyncService.updateBatch(noMapPriceHandles, EventStatus.HANDLED_NO_NEED.eventStatus, "商品无映射");}copyPriceHandlesLittle.removeAll(noMapPriceHandles);// 过滤productPool中不存在的重复品filterRepeatProductId(productMapEntities, copyPriceHandlesLittle);if (CollectionUtils.isEmpty(productMapEntities) || CollectionUtils.isEmpty(copyPriceHandlesLittle)) {log.info("过滤productPool中不存在的重复品");return;}PriceSyncService.synJPrice(partnerId, productMapEntities, false, copyPriceHandlesLittle);} catch (Exception e) {log.error("同步价格异常,当前处理的店铺:{}", partnerId, e);} finally {countDownLatch.countDown();}});}try {countDownLatch.await();} catch (InterruptedException e) {log.error("countDownLatch.await error", e);Thread.currentThread().interrupt();}}
参考
https://blog.csdn.net/weixin_48321825/article/details/121012733?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-0-121012733-blog-102456357.235v38pc_relevant_default_base3&spm=1001.2101.3001.4242.1&utm_relevant_index=3