Java 7:在不丢失数据的情况下关闭NIO.2文件通道

关闭异步文件通道可能非常困难。 如果您将I / O任务提交到异步通道,则需要确保正确执行了任务。 实际上,出于多种原因,这对于异步通道可能是一个棘手的要求。 默认的通道组使用守护进程线程作为工作线程,这不是一个好选择,因为如果JVM退出,这些线程就会被放弃。 如果将自定义线程池执行程序与非守护线程一起使用,则需要自己管理线程池的生命周期。 如果不这样做,当主线程退出时,线程仅保持活动状态。 因此,JVM实际上根本不会退出,您可以执行的操作是杀死JVM。

关闭异步通道时,另一个问题在AsynchronousFileChannel的javadoc中提到:“在通道打开时关闭执行程序服务会导致未指定的行为。” 这是因为close()上的操作AsynchronousFileChannel问题的任务是模拟与挂起的I / O操作(在同一个线程池)的故障相关的执行服务AsynchronousCloseException 。 因此,如果您在先前关闭关联的执行程序服务时在异步文件通道实例上执行close() ,则会得到RejectedExecutionException

综上所述,安全配置文件通道并关闭该通道的建议方法如下:

public class SimpleChannelClose_AsynchronousCloseException {private static final String FILE_NAME = "E:/temp/afile.out";private static AsynchronousFileChannel outputfile;private static AtomicInteger fileindex = new AtomicInteger(0);private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {outputfile = AsynchronousFileChannel.open(Paths.get(FILE_NAME),new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.CREATE,StandardOpenOption.DELETE_ON_CLOSE)), pool);List<Future<Integer>> futures = new ArrayList<>();for (int i = 0; i < 10000; i++) {futures.add(outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5));}outputfile.close();pool.shutdown();pool.awaitTermination(60, TimeUnit.SECONDS);for (Future<Integer> future : futures) {try {future.get();} catch (ExecutionException e) {System.out.println("Task wasn't executed!");}}}
}

在第6和7行中定义了定制线程池执行程序服务。在第10至13行中定义了文件通道。在第18至20行中,异步通道以有序方式关闭。 首先关闭通道本身,然后关闭执行程序服务,最后最重要的一点是线程等待线程池执行程序的终止。

尽管这是使用自定义执行程序服务关闭通道的安全方法,但还是引入了新问题。 客户端提交了异步写入任务(第16行),并且可能希望确保一旦成功提交了这些任务,这些任务肯定会被执行。 始终不等待Future.get()返回(第23行),因为在许多情况下,这将导致*异步*文件通道adurdum。 上面的代码段将返回很多“未执行任务!” 消息导致将写操作提交到通道后立即关闭通道(第18行)。 为了避免这种“数据丢失”,您可以实现自己的CompletionHandler并将其传递给请求的写操作。

public class SimpleChannelClose_CompletionHandler {
...public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
...outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5, "", defaultCompletionHandler);
...}private static CompletionHandler<integer, string=""> defaultCompletionHandler = new CompletionHandler<Integer, String>() {@Overridepublic void completed(Integer result, String attachment) {// NOP}@Overridepublic void failed(Throwable exc, String attachment) {System.out.println("Do something to avoid data loss ...");}};
}

CompletionHandler.failed()方法(第16行)在任务处理期间捕获任何运行时异常。 您可以在此处实施任何补偿代码,以避免数据丢失。 处理关键任务数据时,最好使用CompletionHandler 。 但是*仍然*还有另一个问题。 客户端可以提交任务,但是他们不知道池是否将成功处理这些任务。 在这种情况下,成功表示已提交的字节实际上到达了目的地(硬盘上的文件)。 如果您想确保所有提交的任务在关闭前都已得到实际处理,则会有些棘手。 您需要一个“优美的”关闭机制,该机制要等到工作队列为空时才*实际上*先关闭通道和关联的执行程序服务(使用标准生命周期方法无法实现)。

引入GracefulAsynchronousChannel

我的最后一个片段介绍了GracefulAsynchronousFileChannel 。 您可以在我的Git存储库中获取完整的代码。 该通道的行为是这样的:保证处理所有成功提交的写操作,如果通道准备关闭,则抛出NonWritableChannelException 。 实现该行为需要两件事。 首先,您需要在ThreadPoolExecutor的扩展中实现afterExecute() ,该扩展在队列为空时发送信号。 这就是DefensiveThreadPoolExecutor所做的。

private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor {public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler);}/*** "Last" task issues a signal that queue is empty after task processing was completed.*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {if (state == PREPARE) {closeLock.lock(); // only one thread will pass when closer thread is awaiting signaltry {if (getQueue().isEmpty() && state < SHUTDOWN) {System.out.println("Issueing signal that queue is empty ...");isEmpty.signal();state = SHUTDOWN; // -> no other thread can issue empty-signal}} finally {closeLock.unlock();}}super.afterExecute(r, t);}
}

afterExecute()方法(第12行)在每个处理的任务之后由处理给定任务的线程执行。 该实现在第18行中发送isEmpty信号。第二个需要您优雅地关闭一个通道的部分是AsynchronousFileChannelclose()方法的自定义实现。

/*** Method that closes this file channel gracefully without loosing any data.*/
@Override
public void close() throws IOException {AsynchronousFileChannel writeableChannel = innerChannel;System.out.println("Starting graceful shutdown ...");closeLock.lock();try {state = PREPARE;innerChannel = AsynchronousFileChannel.open(Paths.get(uri),new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool);System.out.println("Channel blocked for write access ...");if (!pool.getQueue().isEmpty()) {System.out.println("Waiting for signal that queue is empty ...");isEmpty.await();System.out.println("Received signal that queue is empty ... closing");} else {System.out.println("Don't have to wait, queue is empty ...");}} catch (InterruptedException e) {Thread.interrupted();throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e);} catch (Exception e) {throw new RuntimeException("Unexpected error" + e);} finally {closeLock.unlock();writeableChannel.force(false);writeableChannel.close(); // close the writable channelinnerChannel.close(); // close the read-only channelSystem.out.println("File closed ...");pool.shutdown(); // allow clean up tasks from previous close() operation to finish safelytry {pool.awaitTermination(1, TimeUnit.MINUTES);} catch (InterruptedException e) {Thread.interrupted();throw new RuntimeException("Could not terminate thread pool!", e);}System.out.println("Pool closed ...");}
}

研究该代码一段时间。 有趣的位在第11行中,其中的innerChannel被只读通道替换。 这将导致任何后续的异步写入请求均由于NonWritableChannelException而失败。 在第16行中, close()方法等待isEmpty信号发生。 在上一个写任务之后发送此信号时, close()方法将继续执行有序的关闭过程(第27页及其后的内容)。 基本上,代码在文件通道和关联的线程池之间添加了共享的生命周期状态。 这样,两个对象都可以在关闭过程中进行通信,并避免数据丢失。

这是使用GracefulAsynchronousFileChannel的日志记录客户端。

public class MyLoggingClient {private static AtomicInteger fileindex = new AtomicInteger(0);private static final String FILE_URI = "file:/E:/temp/afile.out";public static void main(String[] args) throws IOException {new Thread(new Runnable() { // arbitrary thread that writes stuff into an asynchronous I/O data sink@Overridepublic void run() {try {for (;;) {GracefulAsynchronousFileChannel.get(FILE_URI).write(ByteBuffer.wrap("Hello".getBytes()),fileindex.getAndIncrement() * 5);}} catch (NonWritableChannelException e) {System.out.println("Deal with the fact that the channel was closed asynchronously ... "+ e.toString());} catch (Exception e) {e.printStackTrace();}}}).start();Timer timer = new Timer(); // asynchronous channel closertimer.schedule(new TimerTask() {public void run() {try {GracefulAsynchronousFileChannel.get(FILE_URI).close();long size = Files.size(Paths.get("E:/temp/afile.out"));System.out.println("Expected file size (bytes): " + (fileindex.get() - 1) * 5);System.out.println("Actual file size (bytes): " + size);if (size == (fileindex.get() - 1) * 5)System.out.println("No write operation was lost!");Files.delete(Paths.get("E:/temp/afile.out"));} catch (IOException e) {e.printStackTrace();}}}, 1000);}
}

客户端启动两个线程,一个线程在无限循环中(第6行以下)发出写操作。 在处理一秒钟后,另一个线程异步关闭文件通道(第25 ff行)。 如果运行该客户端,那么将产生以下输出:

Starting graceful shutdown ...
Deal with the fact that the channel was closed asynchronously ... java.nio.channels.NonWritableChannelException
Channel blocked for write access ...
Waiting for signal that queue is empty ...
Issueing signal that queue is empty ...
Received signal that queue is empty ... closing
File closed ...
Pool closed ...
Expected file size (bytes): 400020
Actual file size (bytes): 400020
No write operation was lost!

输出显示参与线程的有序关闭过程。 日志记录线程需要处理通道异步关闭的事实。 处理排队的任务后,将关闭通道资源。 没有数据丢失,客户端发出的所有内容均已真正写入文件目标位置。 在这种正常的关闭过程中,没有AsynchronousClosedExceptionRejectedExecutionException

这就是安全关闭异步文件通道的全部方法。 完整的代码在我的Git存储库中 。 希望您喜欢它。 期待您的评论。

参考:来自我们JCG合作伙伴 Niklas的“ Java 7:关闭NIO.2文件通道而不会丢失数据”。

翻译自: https://www.javacodegeeks.com/2012/05/java-7-closing-nio2-file-channels.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/372912.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript封装方法,兼容参数类型为Number和String

/*** 依据Kind确定跳转到目标列表页面。* param kind*/function gobackByKind(kind) {var kindStr String(kind);switch(kindStr){case "1"://跳转到客户列表页面window.location.href/biz/customer/list;break;case "2"://跳转到代理机构列表页面window.…

#获得请求来源ip_以太网数据包TCP、IP、ICMP、UDP、ARP协议头结构详解

以太网首部目地MAC地址(8字节)源MAC地址(8字节)类型(2字节)1、IP头的结构版本(4位)头长度(4位)服务类型(8位)封包总长度(16位)封包标识(16位)标志(3位)片断偏移地址(13位)存活时间(8位)协议(8位)校验和(16位)来源IP地址(32位)目的IP地址(32位)选项(可选)填充(可选)数据(1)字节和…

团队项目第二次冲刺Ⅶ

今天将整体代码的编码方式改了&#xff0c;作业模块基本修改完成 遇到的问题是对于添加问答模块无从下手转载于:https://www.cnblogs.com/brucekun/p/5573312.html

编写Play 2的模块,第2部分:拦截器

在本教程的第一部分中&#xff0c;我们介绍了创建&#xff0c;发布和调用模块的基本知识。 我们创建的模块并没有真正做很多事情&#xff0c;因此现在是时候使用Play的某些功能来扩展功能了。 1.拦截器 拦截器使您可以拦截对控制器的调用&#xff0c;并增强或阻止其行为。 在第…

c# ef报错_C# EF调用MySql出现“未将对象引用设置到对象的实例”错误解决方案

C# EF调用MySql出现“未将对象引用设置到对象的实例”错误解决方案---修改步骤---1.打开Nuget管理包&#xff0c;把Mysql.Data替换为6.10.0以下任意版本。这里选择的是6.8.82.修改完毕后&#xff0c;继续把Mysql.Data.Entity也修改为对应版本6.8.8。3.安装完成后可以看到App.Co…

js格式化时间

Date.prototype.format function(fmt) {var o { "M" : this.getMonth()1, //月份 "d" : this.getDate(), //日 "h" : this.getHours(), //小时 "m" : this.getMinu…

PHP---函数

一.函数定义的四个要素 返回类型&#xff0c;函数名&#xff0c;参数列表&#xff0c;函数体 //1.最简单的定义方式/*function show(){ echo "hello";}show();*///2.有参数的函数定义/*function show($a){ echo $a;}show("bbbbb");*///3.有默认值的函数定义…

ServletRequest startAsync()的有用性有限

前段时间我遇到了Servlet 3.0中AsyncContext.start&#xff08;…&#xff09;的目的是什么&#xff1f; 题。 引用上述方法的Javadoc &#xff1a; 使容器调度线程&#xff08;可能从托管线程池中&#xff09;运行指定的Runnable 。 提醒大家&#xff0c; AsyncContext是Servl…

mysql所支持的比较运算符_mysql比较运算符有哪些?Mysql比较运算符详解

比较运算符可用于比较数字和字符串。今天发一篇Mysql比较运算符详解&#xff0c;希望对初学者有所帮助&#xff0c;虽然现在流行NoSQL&#xff0c;但是MYSQL还是很有用的&#xff0c;数字作为浮点值进行比较&#xff0c;字符串以不区为例进行比较&#xff0c;运算符用于比较表达…

数据结构0类模板的使用

类模板的使用 #include <iostream> #include <conio.h> #include <string> #define N 3 using namespace std;template <class numtype> class Swap{public :Swap(numtype a,numtype b){xa;yb;}numtype ___(){tempx;xy;ytemp;return x;}//testnumtype …

JavaScript 函数

函数 由于JavaScript的函数也是一个对象&#xff0c;所以类似function abs(v){}函数实际上是一个函数对象&#xff0c;而函数名abs可以视为指向该函数的变量。 因此&#xff0c;第二种定义函数的方式如下&#xff1a; var abs function (x) {if (x > 0) {return x;} else {…

Http Invoker的Spring Remoting支持

Spring HTTP Invoker是Java到Java远程处理的重要解决方案。 该技术使用标准的Java序列化机制通过HTTP公开服务&#xff0c;并且可以被视为替代解决方案&#xff0c;而不是Hessian和Burlap中的自定义序列化。 而且&#xff0c;它仅由Spring提供&#xff0c;因此客户端和服务器应…

mysql 日期列表_MySQL 生成日期表

1、创建一个num表&#xff0c;用来存储数字0~9CREATE TABLE num (i int);2、在num表中生成0~9INSERT INTO num (i) VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9);3、生成一个存储日期的表&#xff0c;datalist是字段名CREATE TABLE if not exists calendar(dateli…

学习后缀自动机想法

小序&#xff1a;学习后缀自动机是要有耐心的&#xff0c;clj的论文自己看真心酸爽&#xff01;&#xff08;还是自己太弱&#xff0c;ls&#xff0c;oyzx好劲啊&#xff0c;狂膜不止&#xff09; 刚刚在写博客之前又看了篇论文&#xff0c;终于看懂了&#xff0c;好开心 正文&…

【BZOJ】3575: [Hnoi2014]道路堵塞

题目链接&#xff1a;http://www.lydsy.com/JudgeOnline/problem.php?id3575 大概的做法是&#xff0c;按照顺序枚举每一条要删去的边&#xff0c;(假设当前点为$u$&#xff0c;在最短路径上的下一个点是$v$)然后强制不走${u->v}$这条边&#xff0c;将$u$入队&#xff0c;做…

结合使用slf4j和Logback教程

在当前文章中&#xff0c;我将向您展示如何配置您的应用程序以使用slf4j和logback作为记录器解决方案。 Java简单日志记录外观&#xff08;slf4j&#xff09;是各种日志记录框架的简单外观&#xff0c;例如JDK日志记录&#xff08;java.util.logging&#xff09;&#xff0c;lo…

mysql 分组top_MySQL:如何查询出每个分组中的 top n 条记录?

问题描述需求&#xff1a;查询出每月 order_amount(订单金额) 排行前3的记录。例如对于2019-02&#xff0c;查询结果中就应该是这3条&#xff1a;解决方法MySQL 5.7 和 MySQL 8.0 有不同的处理方法。1. MySQL 5.7我们先写一个查询语句。根据 order_date 中的年、月&#xff0c;…

ACM第四站————最小生成树(普里姆算法)

对于一个带权的无向连通图&#xff0c;其每个生成树所有边上的权值之和可能不同&#xff0c;我们把所有边上权值之和最小的生成树称为图的最小生成树。 普里姆算法是以其中某一顶点为起点&#xff0c;逐步寻找各个顶点上最小权值的边来构建最小生成树。 其中运用到了回溯&#…

利用jenkins的api来完成相关工作流程的自动化

[本文出自天外归云的博客园] 背景 1. 实际工作中涉及到安卓客户端方面的测试&#xff0c;外推或运营部门经常会有很多的渠道&#xff0c;而每个渠道都对应着一个app的下载包&#xff0c;这些渠道都记录在安卓项目下的一个渠道列表文件中。外推或运营部门经常会有新的渠道产生&a…

拥有成本分析:Oracle WebLogic Server与JBoss

Crimson Consulting Group 撰写的非常有趣的白皮书 &#xff0c;比较了Weblogic和JBoss之间的拥有成本 。 尽管JBoss是免费的&#xff0c;但该白皮书却严肃地宣称&#xff0c;从长远来看&#xff0c;Weblogic更便宜。 尽管此研究是由Oracle赞助的&#xff0c;但它看起来非常严肃…