在实现线程安全和持久化的消息队列时,异常处理和性能优化是非常重要的。以下是一些改进措施:
异常处理:确保在读写文件、队列操作等过程中,能够妥善处理可能出现的异常。
性能优化:使用缓冲区和异步写入来减少磁盘I/O操作的开销。
资源管理:确保所有资源(如文件句柄)在使用后都能被正确关闭,以避免资源泄露。
下面是改进后的代码示例:
java
复制
import java.io.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class PersistentQueue {
private LinkedBlockingQueue queue;
private String filePath;
private BufferedWriter writer; // 用于异步写入
public PersistentQueue(String filePath) {this.filePath = filePath;this.queue = new LinkedBlockingQueue<>();initializeWriter();recoverMessages();
}// 异步写入消息到文件
private void initializeWriter() {try {writer = new BufferedWriter(new FileWriter(filePath, true));} catch (IOException e) {throw new RuntimeException("Unable to initialize writer", e);}
}// 将消息添加到队列,并异步写入到文件
public void enqueue(String message) {try {queue.offer(message);new Thread(() -> {try {writer.write(message);writer.newLine();writer.flush(); // 可以选择在此处或在消息消费时flush} catch (IOException e) {e.printStackTrace();}}).start();} catch (Exception e) {e.printStackTrace();}
}// 从队列中取出消息
public String dequeue() {try {return queue.poll(1, TimeUnit.SECONDS); // 1秒超时} catch (InterruptedException e) {Thread.currentThread().interrupt();return null;}
}// 从文件中恢复消息
private void recoverMessages() {File file = new File(filePath);if (file.exists()) {try (BufferedReader reader = new BufferedReader(new FileReader(file))) {String line;while ((line = reader.readLine()) != null) {queue.offer(line);}} catch (IOException e) {throw new RuntimeException("Unable to recover messages", e);}}
}// 确保资源被正确关闭
public void close() throws IOException {queue.clear();if (writer != null) {writer.close();}
}
}
public class Producer implements Runnable {
private PersistentQueue queue;
public Producer(PersistentQueue queue) {this.queue = queue;
}@Override
public void run() {try {for (int i = 0; i < 10; i++) {String message = "Message " + (i + 1);queue.enqueue(message);Thread.sleep(1000);}queue.close(); // 确保资源被关闭} catch (InterruptedException e) {Thread.currentThread().interrupt();} catch (IOException e) {e.printStackTrace();}
}
}
// Consumer 类似于 Producer,使用 queue.dequeue() 来消费消息
public class Main {
public static void main(String[] args) {
String filePath = “messages.txt”;
PersistentQueue queue = new PersistentQueue(filePath);
Producer producer = new Producer(queue);new Thread(producer).start();// 启动消费者线程等
}
}
在这个改进的版本中,我们添加了以下特性:
异步写入:使用单独的线程来处理消息的写入,以避免阻塞主线程。
资源管理:通过 initializeWriter 和 close 方法来确保 BufferedWriter 资源被正确管理。
超时机制:在 dequeue 方法中添加了超时参数,避免无限期地等待消息。
异常处理:在关键操作中添加了异常处理,确保在发生错误时能够记录并继续执行。
请注意,这个示例代码仍然保持简单,以便于理解。在实际应用中,你可能需要根据具体需求进一步优化和调整异常处理逻辑。