程序员过关斩将--自定义线程池来实现文档转码

背景

我司在很久之前,一位很久之前的同事写过一个文档转图片的服务,具体业务如下:

1. 用户在客户端上传文档,可以是ppt,word,pdf 等格式,用户上传完成可以在客户端预览上传的文档,预览的时候采用的是图片形式(不要和我说用别的方式预览,现在已经来不及了)

2. 当用户把文档上传到云端之后(阿里云),把文档相关的信息记录在数据库,然后等待转码完成

3. 服务器有一个转码服务(其实就是一个windows service)不停的在轮训待转码的数据,如果有待转码的数据,则从数据库取出来,然后根据文档的网络地址下载到本地进行转码(转成多张图片)

4. 当文档转码完毕,把转码出来的图片上传到云端,并把云端图片的信息记录到数据库

5. 客户端有预览需求的时候,根据数据库来判断有没有转码成功,如果成功,则获取数据来显示。

文档预览的整体过程如以上所说,老的转码服务现在什么问题呢?

1. 由于一个文档同时只能被一个线程进行转码操作,所以老的服务采用了把待转码数据划分管道的思想,一共有六个管道,映射到数据库大体就是 Id=》管道ID 这个样子。

2. 一个控制台程序,根据配置文件信息,读取某一个管道待转码的文档,然后单线程进行转码操作

3. 一共有六个管道,所以服务器上起了六个cmd的黑窗口......

4. 有的时候个别文档由于格式问题或者其他问题 转码过程中会卡住,具体的表现为:停止了转码操作。

5. 如果程序卡住了,需要运维人员重新启动转码cmd窗口(这种维护比较蛋疼)

后来机缘巧合,这个程序的维护落到的菜菜头上,维护了一周左右,大约重启了10多次,终于忍受不了了,重新搞一个吧。仔细分析过后,刨除实际文档转码的核心操作之外,整个转码流程其实还有很多注意点

1. 需要保证转码服务不被卡住,如果和以前一样就没有必要重新设计了

2. 尽量避免开多个进程的方式,其实在这个业务场景下,多个进程和多个线程作用是一致的。

3. 每个文档只能被转码一次,如果一个文档被转码多次,不仅浪费了服务器资源,而且还有可能会有数据不一致的情况发生

4. 转码失败的文档需要有一定次数的重试,因为一次失败不代表第二次失败,所以一定要给失败的文档再次被操作的机会

5. 因为程序不停的把文档转码成本地图片,所以需要保证这些文件在转码完成在服务器上删除,不然的话,时间长了会生成很多无用的文件

说了这么多,其实需要注意的点还是很多的。以整个的转码流程来说,本质上是一个任务池的生产和消费问题,任务池中的任务就是待转码的文档,生产者不停的把待转码文档丢进任务池,消费者不停的把任务池中文档转码完成。

线程池

这很显然和线程池很类似,菜菜之前就写过一个线程池的文章,有兴趣的同学可以去翻翻历史。今天我们就以这个线程池来解决这个转码问题。线程池的本质是初始化一定数目的线程,不停的执行任务。

 //线程池定义 public class LXThreadPool:IDisposable{bool PoolEnable = true; //线程池是否可用 List<Thread> ThreadContainer = null; //线程的容器ConcurrentQueue<ActionData> JobContainer = null; //任务的容器int _maxJobNumber; //线程池最大job容量ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string, DateTime>(); //job的副本,用于排除某个job 是否在运行中public LXThreadPool(int threadNumber,int maxJobNumber=1000){if(threadNumber<=0 || maxJobNumber <= 0){throw new Exception("线程池初始化失败");}_maxJobNumber = maxJobNumber;ThreadContainer = new List<Thread>(threadNumber);JobContainer = new ConcurrentQueue<ActionData>();for (int i = 0; i < threadNumber; i++){var t = new Thread(RunJob);t.Name = $"转码线程{i}";ThreadContainer.Add(t);t.Start();}//清除超时任务的线程var tTimeOutJob = new Thread(CheckTimeOutJob);tTimeOutJob.Name = $"清理超时任务线程";tTimeOutJob.Start();}//往线程池添加一个线程,返回线程池的新线程数public int AddThread(int number=1){if(!PoolEnable || ThreadContainer==null || !ThreadContainer.Any() || JobContainer==null|| !JobContainer.Any()){return 0;}while (number <= 0){var t = new Thread(RunJob);ThreadContainer.Add(t);t.Start();number -= number;}return ThreadContainer?.Count ?? 0;}//向线程池添加一个任务,返回0:添加任务失败   1:成功public int AddTask(Action<object> job, object obj,string actionId, Action<Exception> errorCallBack = null){if (JobContainer != null){if(JobContainer.Count>= _maxJobNumber){return 0;}//首先排除10分钟还没转完的var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);if(timeoOutJobList!=null&& timeoOutJobList.Any()){foreach (var timeoutJob in timeoOutJobList){JobIdList.TryRemove(timeoutJob.Key,out DateTime v);}}if (!JobIdList.Any(s => s.Key == actionId)){if(JobIdList.TryAdd(actionId, DateTime.Now)){JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, ErrorCallBack = errorCallBack });return 1;}else{return 101;}}else{return 100;}            }return 0;}  private void RunJob(){while (JobContainer != null  && PoolEnable){//任务列表取任务ActionData job = null;JobContainer?.TryDequeue(out job);if (job == null){//如果没有任务则休眠Thread.Sleep(20);continue;}try{//执行任务job.Job.Invoke(job.Data);}catch (Exception error){//异常回调if (job != null&& job.ErrorCallBack!=null){job?.ErrorCallBack(error);}}finally{if (!JobIdList.TryRemove(job.ActionId,out DateTime v)){}}}}//终止线程池public void Dispose(){PoolEnable = false;JobContainer = null;if (ThreadContainer != null){foreach (var t in ThreadContainer){//强制线程退出并不好,会有异常t.Join();}ThreadContainer = null;}}//清理超时的任务private void CheckTimeOutJob(){//首先排除10分钟还没转完的var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);if (timeoOutJobList != null && timeoOutJobList.Any()){foreach (var timeoutJob in timeoOutJobList){JobIdList.TryRemove(timeoutJob.Key, out DateTime v);}}System.Threading.Thread.Sleep(60000);}}public class ActionData{//任务的id,用于排重public string ActionId { get; set; }//执行任务的参数public object Data { get; set; }//执行的任务public Action<object> Job { get; set; }//发生异常时候的回调方法public Action<Exception> ErrorCallBack { get; set; }}

以上就是一个线程池的具体实现,和具体的业务无关,完全可以用于任何适用于线程池的场景,其中有一个注意点,我新加了任务的标示,主要用于排除重复的任务被投放多次(只排除正在运行中的任务)。当然代码不是最优的,有需要的同学可以自己去优化

使用线程池

接下来,我们利用以上的线程池来完成我们的文档转码任务,首先我们启动的时候初始化一个线程池,并启动一个独立线程来不停的往线程池来输送任务,顺便起了一个监控线程去监视发送任务的线程

string lastResId = null;string lastErrorResId = null;Dictionary<string, int> ResErrNumber = new Dictionary<string, int>(); //转码失败的资源重试次数int MaxErrNumber = 5;//最多转码错误的资源10次Thread tPutJoj = null;LXThreadPool pool = new LXThreadPool(4,100);public void OnStart(){//初始化一个线程发送转码任务tPutJoj = new Thread(PutJob);tPutJoj.IsBackground = true;tPutJoj.Start();//初始化 监控线程var tMonitor = new Thread(MonitorPutJob);tMonitor.IsBackground = true;tMonitor.Start();}//监视发放job的线程private void MonitorPutJob(){while (true){if(tPutJoj == null|| !tPutJoj.IsAlive){Log.Error($"发送转码任务线程停止==========");tPutJoj = new Thread(PutJob);tPutJoj.Start();Log.Error($"发送转码任务线程重新初始化并启动==========");}System.Threading.Thread.Sleep(5000);}}private void PutJob(){           while (true){try{//先搜索等待转码的var fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Wait }, 30, lastResId);Log.Error($"拉取待转码记录===总数:lastResId:{lastResId},结果:{fileList?.Count() ?? 0}");if (fileList == null || !fileList.Any()){lastResId = null;Log.Error($"待转码数量为0,开始拉取转码失败记录,重新转码==========");//如果无待转,则把出错的 尝试fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);if (fileList == null || !fileList.Any()){lastErrorResId = null;}else{// Log.Error($"开始转码失败记录:{JsonConvert.SerializeObject(fileList)}");List<DocResourceRegister> errFilter = new List<DocResourceRegister>();foreach (var errRes in fileList){if (ResErrNumber.TryGetValue(errRes.res_id, out int number)){if (number > MaxErrNumber){Log.Error($"资源:{errRes.res_id} 转了{MaxErrNumber}次不成功,放弃===========");continue;}else{errFilter.Add(errRes);ResErrNumber[errRes.res_id] = number + 1;}}else{ResErrNumber.Add(errRes.res_id, 1);errFilter.Add(errRes);}}fileList = errFilter;if (fileList.Any()){lastErrorResId = fileList.Select(s => s.res_id).Max();}}}else{lastResId = fileList.Select(s => s.res_id).Max();}if (fileList != null && fileList.Any()){foreach (var file in fileList){//如果 任务投放线程池失败,则等待一面继续投放int poolRet = 0;while (poolRet <= 0){poolRet = pool.AddTask(s => {AliFileService.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instance(file.res_ext));}, file, file.res_id);if (poolRet <= 0 || poolRet > 1){Log.Error($"发放转码任务失败==========线程池返回结果:{poolRet}");System.Threading.Thread.Sleep(1000);}}}}//每一秒去数据库取一次数据System.Threading.Thread.Sleep(3000);}catch{continue;}}}

以上就是发放任务,线程池执行任务的所有代码,由于具体的转码代码涉及到隐私,这里不在提供,如果有需要可以私下找菜菜索要,虽然我深知还有更优的方式,但是我觉得线程池这样的思想可能会对部分人有帮助,其中任务超时的核心代码如下(采用了polly插件):

var policy= Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut), onTimeout: (context, timespan, task) =>{ret.State=Enum.FileToImgStateEnum.TimeOut;                   });policy.Execute(s=>{.....});

把你的更优方案写在留言区吧,2020年大家越来越好

●程序员修神之路--打通Docker镜像发布容器运行流程

●程序员修神之路--容器技术为什么会这么流行(记得去抽奖)

●程序员修神之路--kubernetes是微服务发展的必然产物

●程序员过关斩将--要想获取我的用户信息,就得按照规矩来

●程序员过关斩将--更加优雅的Token认证方式JWT

●程序员过关斩将--cookie和session的关系其实很简单

●程序员修神之路--用NOSql给高并发系统加速

●程序员修神之路--高并发系统设计负载均衡架构

●程序员修神之路--做好分库分表其实很难之一(继续送书)

●程序员修神之路--做好分库分表其实很难之二(送书继续)

●程序员过关斩将--你为什么还在用存储过程?

●程序员过关斩将--小小的分页引发的加班血案

●程序员修神之路--问世间异步为何物?

●程序员修神之路--提高网站的吞吐量????

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/312368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UnitTest in .NET(Part 4)

Photo &#xff1a;Unit Test in Visual Studio文 | Edison Zhou上一篇我们学习了如何使用模拟对象进行交互测试。这一篇我们则会进一步使用隔离框架支持适应未来和可用性的功能。为何使用模拟框架&#xff1f; 对于复杂的交互场景&#xff0c;可能手工编写模拟对象和存根就会变…

Xamarin.Forms弹出对话框插件

微信公众号&#xff1a;Dotnet9&#xff0c;网站&#xff1a;Dotnet9&#xff0c;问题或建议&#xff0c;请网站留言&#xff1b;如果您觉得Dotnet9对您有帮助&#xff0c;欢迎赞赏。Dotnet9.com内容目录实现效果业务场景编码实现本文参考源码下载1.实现效果弹出动画 2.业务场景…

Pycharm安装第三方库

转载地址&#xff1a; https://www.cnblogs.com/bwjblogs/p/12839463.html 今晚想安装一些第三方库但是pip版本低&#xff0c;安装一直报错&#xff0c;输入升级的命令也一直不行。于是在pycharm上安装&#xff0c;但是还是一直失败&#xff0c;下面提出解决办法。 然后在搜索…

CAP 3.0 版本正式发布

前言大家好&#xff0c;我们很高兴宣布 CAP 发布了 3.0 版本正式版。自从上次 CAP 2.6 版本发布 以来&#xff0c;已经过去了几个月的时间&#xff0c;关注的朋友可能知道&#xff0c;在这几个月的时间里&#xff0c;也发布了几个预览版的 3.0 版本的NuGet包。3.0 是一个主要版…

mysql字符集变为gbk_MYSQL数据库默认latin1字符集转换为GBK或UTF8

可以采用下面的方法latin1字符集转换为gbk字符集或utf8字符集。具体的转换步骤如下&#xff1a;一、latin1转gbk1、导出数据库mysqldump --default-character-setlatin1 -h 数据库连接ip -u root -P 3306 -p数据库密码 db_name table_name > /usr/home/test/table_name.sql2…

微服务统计,分析,图表,监控一体化的HttpReports项目在.Net Core 中的使用

简单介绍HttpReports 是 .Net Core 下的一个Web项目, 适用于WebAPI&#xff0c;Ocelot网关应用&#xff0c;MVC项目&#xff0c;非常适合针对微服务应用使用&#xff0c;通过中间件的形式集成到您的项目中&#xff0c;可以让开发人员快速的搭建出一个 数据统计&#xff0c;分析…

查看node的位置_升级Node版本RN项目运行报错cb.apply is not a function

今日打算安装一下ReactNative官方推荐的脚手架工具Ignite。infinitered/ignite​github.comIgnite是一套整合了 Redux 以及一些常见 UI 组件的脚手架。它带有一个命令行可以生成 app、组件或是容器。在安装的过程中&#xff0c;提示当前系统安装的node版本过低&#xff0c;无法…

C++构造函数调用规则

1.拷贝函数的值拷贝&#xff1a; #include <iostream> using namespace std;//构造函数的调用规则&#xff1a; //1,创建一个类&#xff0c;C编译器会给每个类都添加至少3个函数 //默认函数(空实现)&#xff0c;析构函数(空实现)&#xff0c;拷贝函数(值拷贝)class Pers…

HTTP Strict Transport Security (HSTS) in ASP.NET Core

本文是《2020年了&#xff0c;再不会HTTPS就老了》的后篇&#xff0c;本文着重聊一聊HTTP Strict Transport Security协议的概念和应用。启用 HTTPS 还不够安全现在很多站点通过HTTPS对外提供服务&#xff0c;用户在访问某站点&#xff0c;往往会直接输入站点域名&#xff08;b…

mysql支持的平台和操作系统_MySQL 数据库所支持的操作系统_MySQL

MySQL数据库所支持的操作系统&#xff1a;我们使用GNU Autoconf&#xff0c;因此将MySQL移植到所有使用Posix线程和C编译器的现代系统是可能的。(要求服务器支持线程。如果只是编译客户端代码&#xff0c;则只需要C编译器)。我们主要在Linux(SuSE和Red Hat)、FreeBSD和Sun Sola…

C++深拷贝与浅拷贝

浅拷贝&#xff1a; 简单的赋值拷贝操作。 深拷贝&#xff1a; 在堆区重新申请空间&#xff0c;进行拷贝操作。 首先我们先写这样的一段代码&#xff1a; #include <iostream> using namespace std; //深拷贝与浅拷贝class Person {public:Person() {cout << &qu…

BeetleX轻松搭建HTTP和Weboskcet网关

在新版本的BeetleX.Bumblebee中实现了对Weboskcet代理的支持&#xff0c;因此使用BeetleX搭建同时支持HTTP和Weboskcet的网关只需要几行代码的工作就能完成&#xff1b;接下来构建一个简单的网关程序并测试一下对asp.net core SignalR进行代理的应用。引用组件使用BeetleX构建网…

[功能发布]Excel与PowerBI互通互联升级版连接SSAS和AzureAS

Excel催化剂发自内心地热爱着PowerBI社区&#xff0c;从最开始提供了PowerBIDeskTop的互通互联功能&#xff0c;到无偿奉献所有此功能的核心原代码&#xff0c;再到今天的高潮&#xff0c;献上最具商业价值的高级功能&#xff0c;让企业级商业智能BI项目插上翅膀&#xff0c;最…

window oracle 只有bak文件怎么恢复_一起来学习Oracle的备份恢复基础吧-4

基于backup controlfile的恢复使用备份的控制文件在实际工作中的两种情况&#xff1a;当前控制文件全部损坏&#xff0c;而数据文件备份、控制文件备份及当前的日志处在不同的SCN版本&#xff0c;它们之间又增加过表空间(数据文件)。当前控制文件没有损坏&#xff0c;但是想恢复…

微服务、容器和Kubernetes的2020你怎么看?

历史上&#xff0c;有些年份比其他年份容易预测。因为市场出现的稳定性使追踪趋势线变得更加容易。2020年将是企业向微服务迁移的关键一年&#xff1a;稳定并逐步地向主流应用过渡。毫无疑问&#xff0c;IT组织正在转向微服务架构。微服务将应用程序分解为许多小部分&#xff0…

C++类对象作为类成员

C类中的成员可以是另一个类的对象&#xff0c;我们称该成员为对象成员 代码如下&#xff1a; #include <iostream> using namespace std; #include <cstring>//类对象作为类成员 class Phone {public:Phone(string PName) {cout << "Phone函数的调用&…

.NET 大数据实时计算--学习笔记

摘要纯 .Net 自研大数据实时计算平台&#xff0c;在中通快递服务数百亿包裹&#xff0c;处理数据万亿计&#xff01;将分享大数据如何落地以及设计思路&#xff0c;技术重难点。目录背景介绍计算平台架构项目实战背景介绍计算平台架构分片实时计算计算平台数据统计模型开源项目…

asp.net core 实现支持多语言

asp.net core 实现支持多语言Intro最近有一个外国友人通过邮件联系我&#xff0c;想用我的活动室预约&#xff0c;但是还没支持多语言&#xff0c;基本上都是写死的中文&#xff0c;所以最近想支持一下更多语言&#xff0c;于是有了多语言方面的一些实践国际化/本地化介绍国际化…

C++this指针的用途

this指针的用途&#xff1a; 1.当形参和成员变量同名时&#xff0c;可用this指针来区分。 2.在类的非静态成员函数中返回对象本身&#xff0c;可使用return *this 每一个非静态成员函数只会诞生一份函数实例&#xff0c;也就是说多个同类型的对象会共用一块代码&#xff0c;那…

【实战 Ids4】小技巧篇:自定义登录页操作

今天的内容很简单&#xff0c;1分钟就能看完&#xff0c;5分钟就能学会&#xff0c;但是却是在我们平时开发中必须要学会的一个小知识点&#xff0c;我就不让大家走弯路了&#xff0c;直接看操作。在平时的IdentityServer4开发中呢&#xff0c;我们都是根据官方的Demo来操作一遍…