Pipeline流水线组件

文章目录

    • 1、新建pipeline流水线
    • 2、定义处理器
    • 3、定义处理器上下文
    • 4、pipeline流水线实现
    • 5、处理器抽象类实现
    • 6、pipeline流水线构建者
    • 7、具体处理器实现
    • 8、流水线测试
    • 9、运行结果

1、新建pipeline流水线

package com.summer.toolkit.model.chain;import java.util.List;
import java.util.concurrent.Executor;public interface Pipeline<T> {/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(Handler<T> handler);/*** 向pipeline中添加一个执行器** @param name    执行器名称* @param handler 执行器* @return 返回pipeline对象*/Pipeline<T> addLast(String name, Handler<T> handler);/*** pipeline执行** @param list 数据集合* @return 返回值,执行完成返回true*/boolean execute(List<T> list);/*** pipeline并行执行** @param list     数据集合* @param executor 线程池* @return 返回值,执行完成返回true*/boolean parallelExecute(List<T> list, Executor executor);/*** pipeline执行** @param object 单个数据* @return 返回值,执行完成返回true*/boolean execute(T object);}

2、定义处理器

package com.summer.toolkit.model.chain;public interface Handler<T> {/*** 处理器处理方法** @param handlerContext 上下文* @param t              要处理的数据*/void doHandler(HandlerContext<T> handlerContext, T t);}

3、定义处理器上下文

package com.summer.toolkit.model.chain;import lombok.Data;@Data
public class HandlerContext<T> {/*** 执行器名称 */private String name;/*** 执行器 */private Handler<T> handler;/*** 链表的下一个节点,用来保存下一个执行器 */public HandlerContext<T> next;public HandlerContext(Handler<T> handler) {this.name = handler.getClass().getName();this.handler = handler;}public HandlerContext(String name, Handler<T> handler) {this.name = name;this.handler = handler;}/*** 调用该方法即调用上下文中处理器的执行方法** @param t 需要处理的数据*/public void handle(T t) {this.handler.doHandler(this, t);}/*** 执行下一个节点的处理器** @param t 待执行的数据*/public void runNext(T t) {if (this.next != null) {this.next.handle(t);}}
}

4、pipeline流水线实现

package com.summer.toolkit.model.chain;import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;@Slf4j
public class DefaultPipeline<T> implements Pipeline<T> {/*** 默认pipeline中有一个处理器上下文的头结点* 头结点无处理逻辑,直接执行下一个节点的处理器*/HandlerContext<T> head = new HandlerContext<>(HandlerContext::runNext);@Overridepublic Pipeline<T> addLast(Handler<T> handler) {this.addLast(null, handler);return this;}@Overridepublic Pipeline<T> addLast(String name, Handler<T> handler) {if (handler == null) {log.warn("处理器为空,不进行添加!");return this;}if (StringUtils.isEmpty(name)) {name = handler.getClass().getName();}// 将处理器添加到处理器上下文的尾节点HandlerContext<T> context = head;while (context.next != null) {context = context.next;}context.next = new HandlerContext<T>(name, handler);return this;}@Overridepublic boolean execute(List<T> list) {List<Object> result = list.stream().peek(this::execute).collect(Collectors.toList());return true;}@Overridepublic boolean parallelExecute(List<T> list, Executor executor) {Map<String, List<T>> parts = this.split(list);List<CompletableFuture<Boolean>> results = new ArrayList<>();for (Map.Entry<String, List<T>> entry : parts.entrySet()) {CompletableFuture<Boolean> completableFuture = CompletableFuture// 提交任务.supplyAsync(() -> this.execute(entry.getValue()), executor)// 打印异常信息.exceptionally(e -> {log.error("并行处理数据时发生异常!{}", e.getMessage(), e);return Boolean.FALSE;});results.add(completableFuture);}CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();return true;}@Overridepublic boolean execute(T t) {this.head.handle(t);return true;}/*** 对集合进行分组拆分** @param list 集合* @return 返回值*/private Map<String, List<T>> split(List<T> list) {Map<String, List<T>> parts = new HashMap<>(8);if (CollectionUtils.isEmpty(list)) {return parts;}// 如果集合数量过少,则不进行分组int limit = 10;if (list.size() < limit) {String key = String.valueOf(0);parts.put(key, list);return parts;}// 固定分五个分组int group = 5;for (int i = 0, length = list.size(); i < length; i++) {int key = i % group;List<T> part = parts.computeIfAbsent(String.valueOf(key), k -> new ArrayList<>());T t = list.get(i);part.add(t);}return parts;}}

5、处理器抽象类实现

package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;@Slf4j
public abstract class AbstractHandler<T> implements Handler<T> {/*** 开始处理数据,通用方法** @param handlerContext 上下文* @param t              要处理的数据*/@Overridepublic void doHandler(HandlerContext<T> handlerContext, T t) {long start = System.currentTimeMillis();String threadName = Thread.currentThread().getName();String handlerName = handlerContext.getName();log.info("====={} 开始处理:{}=====", threadName, handlerName);try {// 此处处理异常,如果执行过程失败,则继续执行下一个handlerthis.handle(t);} catch (Throwable throwable) {log.error("====={} 处理异常:{},异常原因:{}=====", threadName, handlerName, throwable.getMessage(), throwable);this.handleException(t, throwable);}long end = System.currentTimeMillis();log.info("====={} 处理完成:{},耗时:{} 毫秒=====", threadName, handlerName, (end - start));// 处理完该上下文中的处理器逻辑后,调用上下文中的下一个执行器的执行方法handlerContext.runNext(t);}/*** 处理数据抽象方法,由子类实现具体细节** @param t 对象*/public abstract void handle(T t);/*** 处理数据抽象方法,由子类实现具体细节** @param t         对象* @param throwable 异常对象*/public void handleException(T t, Throwable throwable) {log.error("=====处理数据发生异常:{}", throwable.getMessage(), throwable);}}

6、pipeline流水线构建者

package com.summer.toolkit.model.chain;public class DefaultPipelineBuilder<T> {private final Pipeline<T> pipeline;public DefaultPipelineBuilder() {this.pipeline = new DefaultPipeline<>();}/*** 向pipeline中添加一个执行器** @param handler 执行器* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(Handler<T> handler) {pipeline.addLast(handler);return this;}/*** 向pipeline中添加一个执行器** @param name 执行器名称* @return 返回pipeline对象*/public DefaultPipelineBuilder<T> addLast(String name, Handler<T> handler) {pipeline.addLast(name, handler);return this;}/*** 返回pipeline对象** @return 返回值*/public Pipeline<T> build() {return this.pipeline;}}

7、具体处理器实现

package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;import java.util.Objects;@Slf4j
public class StringHandler extends AbstractHandler<String> {@Overridepublic void handle(String s) {log.info("入参:{}", s);}@Overridepublic void handleException(String s, Throwable throwable) {if (Objects.nonNull(throwable)) {log.error("异常:{}", throwable.getMessage());}}}

8、流水线测试

package com.summer.toolkit.model;import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;public class Processor {public static void main(String[] args) {DefaultPipelineBuilder<String> builder = new DefaultPipelineBuilder<>();Pipeline<String> pipeline = builder.addLast("字符串信息", new StringHandler()).addLast("寄件人信息", new StringHandler()).addLast("收件人信息", new StringHandler()).build();pipeline.execute("1");}}

9、运行结果

20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:字符串信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:字符串信息,耗时:5 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:寄件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:寄件人信息,耗时:0 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:收件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:收件人信息,耗时:0 毫秒=====

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/26762.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 中如何解决跨域问题、Spring Cloud 5大组件、微服务的优缺点是什么?

Spring Boot 中如何解决跨域问题 ? SpringMVC项目中使用CrossOrigin注解来解决跨域问题 , 本质是CORS RequestMapping("/hello")CrossOrigin(origins "*")//CrossOrigin(value "http://localhost:8081") //指定具体ip允许跨域public String …

区别五大数据可视化工具,有这一篇就够了

进入企业数字化时代&#xff0c;数据可视化工具的重要性被越来越多企业看到。这些企业都希望在短时间内找到适合自己的数据可视化工具。以下是针对帆软BI、奥威BI、思迈特BI&#xff08;Smartbi&#xff09;、永洪BI和亿信华辰BI的详细介绍&#xff0c;希望能帮助用户企业快速筛…

vue3框架基本使用(基础指令)

一、响应式数据 1.ref ref可以定义 基本类型的响应式数据&#xff0c; 也可以定义对象类型响应式数据 <template><h1>{{ name }}</h1><button click"test">修改姓名</button> </template><script setup lang"ts"…

原子阿波罗STM32F429程序的控制器改为STM32F407驱动LCD屏

原子大神的阿波罗开发板使用STM32F429IGT6控制器&#xff0c;编程风格也与探索者F407系列有了很大的不同&#xff0c;使用BSP功能模块编程了&#xff0c;也有点类似于安富莱的编程风格了。这种模块式程序风格的优点是更加方便移植&#xff0c;更方便泡系统。 但无奈手里只有F40…

wordpress旅游网站模板

旅行社wordpress主题 简洁实用的旅行社wordpress主题&#xff0c;适用于旅行社建网站的wordpress主题模板。 https://www.jianzhanpress.com/?p4296 旅游WordPress主题 简洁实用的旅游WordPress主题&#xff0c;适合做旅游公司网站的WordPress主题模板。 https://www.jian…

CSS实现3个圆点加载动画

加载动画主要使用了css的animation和transform属性&#xff0c;animation用来实现动画效果&#xff0c;transform实现过渡&#xff0c;让动画看起来更真实 一、html <div class"loadding-box"><div class"dot1"></div><div class&qu…

【网络编程】基于TCP的服务器端/客户端

TCP是Transmission Control Protocol(传输控制协议)简写。因为TCP套接字是面向连接的&#xff0c;因此又称为基于流的套接字。 把协议分为多个层次&#xff0c;设计更容易&#xff0c;通过标准化操作设计开放式系统 网络层介绍 链路层 链路层是物理连接领域标准化的结果&…

学习分享-FutureTask

前言 今天再改简历的时候回顾了之前实习用到的FutureTask&#xff0c;借此来回顾一下相关知识。 FutureTask 介绍 FutureTask 是 Java 并发包&#xff08;java.util.concurrent&#xff09;中的一个类&#xff0c;用于封装异步任务。它实现了 RunnableFuture 接口&#xff0…

Win快速删除node_modules

在Windows系统上删除 node_modules 文件夹通常是一个缓慢且耗时的过程。这主要是由于几个关键因素导致的&#xff1a; 主要原因 文件数量多且嵌套深&#xff1a; node_modules 文件夹通常包含成千上万的子文件夹和文件。由于其结构复杂&#xff0c;文件和文件夹往往嵌套得非常…

JavaScript的数组排序

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

函数式开发接口( Consumer、Function)在实际开发中的应用场景

之前有个扫码下载文件需求&#xff0c;由于要同时进行记录下载人的记录。一开始用的是异步进行日志记录。发现有的用户扫码下载了一次文件&#xff0c;日志记录了三条。这种很容易联想到是因为网络抖动造成的。 问题代码 由于日志记录是异步的&#xff0c;文件下载需要时间。同…

干货下载 |《数据治理:数据中台建设与能力提升策略》

在当今这个信息爆炸的时代&#xff0c;数据已经成为企业最宝贵的资产之一。数据不仅能帮助企业洞察市场趋势&#xff0c;还能优化业务流程&#xff0c;提升运营效率&#xff0c;进而在激烈的市场竞争中占据优势地位。然而&#xff0c;如何有效地管理和利用这些数据&#xff0c;…

(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2+bx+c0 的两个根:

(代数:解一元二次方程)可以使用下面的公式求一元二次方程 ax2bxc0 的两个根: b2-4ac 称作一元二次方程的判别式。如果它是正值,那么一元二次方程就有两个实数根。 如果它为 0&#xff0c;方程式就只有一个根。如果它是负值&#xff0c;方程式无实根。 编写程序&#xff0c;提示…

推挽与开漏输出

一般来说&#xff0c;微控制器的引脚都会有一个驱动电路&#xff0c;可以配置不同类型的数字和模拟电路接口。输出模式一般会有推挽与开漏输出。 推挽输出 推挽输出&#xff08;Push-Pull Output&#xff09;&#xff0c;故名思意能输出两种电平&#xff0c;一种是推&#xf…

DOM-获取元素

获取元素的方法&#xff1a; 方法一&#xff1a;根据id获取元素document.getElementById <div id"time">2024-6-4</div> 在script标签中&#xff1a;注意getElementById括号里面必须要有引号&#xff0c;获得的是对象类型 var timer document.getEle…

设计模式——建造者模式(生成器模式)

建造者模式(生成器模式) 将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示的意图 用了建造者模式&#xff0c;那么用户就只需要指定需要构建的类型就可以得到它们&#xff0c;而具体构造的细节和过程不需要知道 概括地说&#xff0c;Bu…

2. 音视频H264

视频软件基本流程 1.什么是H264 H.264是由ITU-T视频编码专家组&#xff08;VCEG&#xff09;和ISO/IEC动态图像专家组&#xff08;MPEG&#xff09;联合组成的联合视频组&#xff08;JVT&#xff0c;Joint Video Team&#xff09;提出的高度压缩数字视频编解码器标准 H265又名高…

1004.最大连续1的个数

给定一个二进制数组 nums 和一个整数 k&#xff0c;如果可以翻转最多 k 个 0 &#xff0c;则返回 数组中连续 1 的最大个数 。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1,0,0,0,1,1,1,1,0], K 2 输出&#xff1a;6 解释&#xff1a;[1,1,1,0,0,1,1,1,1,1,1] 粗体数字…

算法题day43(补5.29日卡:动态规划03)

一、01背包问题基础&#xff1a; 有n件物品和一个最多能背重量为w的背包。第i件物品的重量是weight[i],得到的价值是value[i]。每件物品只能用一次&#xff0c;求解哪些物品装入背包里物品价值总和最大。 1.用dp的方法做&#xff1a; ①dp[i][j]的含义&#xff1a;从下标为[…

席卷的B站《植物大战僵尸杂交版》V2.0.88整合包,PC和手机可用,含通关存档和视频教程!

今天给大家安利一款席卷B站&#xff0c;火爆全网的游戏——《植物大战僵尸杂交版》2.0.88整合包。 这个是网络上现存植物大战僵尸杂交版的最全整合&#xff0c;包含了修改工具&#xff0c;超强通关存档和高清工具。工具包有安装视频教程&#xff0c;支持手机版和pc多端使用&am…