【Java万花筒】解锁Java并发之门:深入理解多线程

Java并发编程艺术:深度剖析多线程利器

前言

在当今软件开发的世界中,多线程编程已经变得愈发重要。面对多核处理器的普及和复杂的系统架构,开发人员需要深入了解并发编程的原理和实践,以充分发挥硬件的性能潜力。本文将带您深入探讨Java中的并发与多线程编程,介绍一系列强大的Java库和框架,助您更好地处理并发挑战。

文章目录

  • Java并发编程艺术:深度剖析多线程利器
    • 前言
    • 1. java.util.concurrent 包
      • 1.1 Executor 框架
      • 1.2 并发集合
      • 1.3 同步器
      • 1.4 原子变量
      • 1.5 CompletableFuture
      • 1.6 Phaser
      • 1.7 LinkedTransferQueue
    • 2. Akka
      • 2.1 Actor 模型
      • 2.2 并发与分布式
        • 2.2.1 Clustering
        • 2.2.2 Sharding
        • 2.2.3 Distributed Data
        • 2.2.4 Akka Streams
        • 2.2.5 Akka HTTP
    • 3. RxJava
      • 3.1 Observable 与 Observer
      • 3.2 操作符
      • 3.3 背压处理
      • 3.4 任务调度器
      • 3.5 错误处理
      • 3.6 异步与并行
      • 3.7 扩展与自定义操作符
    • 4. ForkJoin 框架
      • 4.1 工作窃取算法
      • 4.2 ForkJoinPool 类
      • 4.3 RecursiveTask 与 RecursiveAction 类
      • 4.4 ForkJoinTask 类
      • 4.5 RecursiveTask 与 RecursiveAction 类(续)
        • 4.5.1 RecursiveTask
        • 4.5.2 RecursiveAction
    • 5. Disruptor
      • 5.1 高性能无锁并发框架
      • 5.2 RingBuffer 数据结构
      • 5.3 生产者-消费者模式
      • 5.4 应用于低延迟的金融交易系统
    • 6. Guava 并发库
      • 6.1 ListenableFuture 接口
      • 6.2 Futures 工具类
      • 6.3 SettableFuture 类
      • 6.4 ListeningExecutorService 接口
      • 6.5 RateLimiter 类
      • 6.6 Cache 类
    • 总结

1. java.util.concurrent 包

1.1 Executor 框架

Executor 框架是 Java 提供的用于管理线程的框架,它包含一组接口和类,用于简化多线程编程。其中,Executor 接口是整个框架的核心,定义了执行任务的基本协议。下面是一个简单的使用 ThreadPoolExecutor 的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ExecutorExample {public static void main(String[] args) {// 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(2);// 提交任务executorService.submit(() -> {System.out.println("Task 1 executed by " + Thread.currentThread().getName());});executorService.submit(() -> {System.out.println("Task 2 executed by " + Thread.currentThread().getName());});// 关闭线程池executorService.shutdown();}
}

1.2 并发集合

并发集合是为了在多线程环境中提供安全的数据操作而设计的。ConcurrentHashMap 是一个线程安全的哈希表实现,CopyOnWriteArrayList 是一个线程安全的动态数组实现。以下是它们的简单应用:

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;public class ConcurrentCollectionExample {public static void main(String[] args) {// 使用 ConcurrentHashMapMap<String, String> concurrentMap = new ConcurrentHashMap<>();concurrentMap.put("key1", "value1");concurrentMap.put("key2", "value2");// 使用 CopyOnWriteArrayListCopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>();copyOnWriteList.add("Item1");copyOnWriteList.add("Item2");}
}

1.3 同步器

同步器用于协调多个线程的执行。CountDownLatch 允许一个或多个线程等待其他线程完成操作,CyclicBarrier 用于多线程之间的同步,Semaphore 用于控制同时访问的线程数量。下面是一个使用 CountDownLatch 的例子:

import java.util.concurrent.CountDownLatch;public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException {// 创建 CountDownLatch,设置计数器为2CountDownLatch latch = new CountDownLatch(2);// 启动两个线程new Thread(() -> {System.out.println("Task 1 executed");latch.countDown();}).start();new Thread(() -> {System.out.println("Task 2 executed");latch.countDown();}).start();// 等待两个线程执行完毕latch.await();System.out.println("Both tasks completed");}
}

1.4 原子变量

原子变量提供了一种无锁的线程安全的操作方式。AtomicIntegerAtomicLongAtomicReference 分别用于整数、长整数和对象的原子操作。以下是 AtomicInteger 的使用示例:

import java.util.concurrent.atomic.AtomicInteger;public class AtomicIntegerExample {public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(0);// 原子地增加值int result = atomicInteger.incrementAndGet();System.out.println("Incremented value: " + result);// 原子地减少值result = atomicInteger.decrementAndGet();System.out.println("Decremented value: " + result);}
}

1.5 CompletableFuture

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它提供了一种简单而灵活的方式来处理异步操作的结果。相较于传统的 FutureCompletableFuture 允许你以声明式的方式构建异步操作流水线,轻松地进行组合和转换。下面是一个使用 CompletableFuture 的示例:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExample {public static void main(String[] args) {// 异步执行任务CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("Task executed by " + Thread.currentThread().getName());return "Result";});// 注册回调函数future.thenAccept(result -> System.out.println("Async result: " + result));// 等待任务完成future.join();}
}

在上述示例中,通过 CompletableFuture.supplyAsync 异步执行任务,使用 thenAccept 注册回调函数,实现了异步任务的执行和结果处理。

1.6 Phaser

Phaser 是 Java 7 引入的同步辅助类,它允许线程在多阶段并发算法中协同工作。Phaser 提供了更灵活的同步机制,比传统的 CountDownLatchCyclicBarrier 更强大。以下是一个简单的 Phaser 使用示例:

import java.util.concurrent.Phaser;public class PhaserExample {public static void main(String[] args) {// 创建 Phaser,设置参与的线程数目Phaser phaser = new Phaser(3);// 启动三个线程new Thread(() -> {System.out.println("Thread 1 arrived");phaser.arriveAndAwaitAdvance();}).start();new Thread(() -> {System.out.println("Thread 2 arrived");phaser.arriveAndAwaitAdvance();}).start();new Thread(() -> {System.out.println("Thread 3 arrived");phaser.arriveAndAwaitAdvance();}).start();}
}

在上述示例中,通过 Phaser 控制三个线程同时到达同一阶段,实现了更加灵活的线程同步。

1.7 LinkedTransferQueue

LinkedTransferQueuejava.util.concurrent 包中的一个并发队列实现,它具有高性能和可伸缩性。相较于其他阻塞队列,LinkedTransferQueue 具有更好的性能特征,特别适用于高并发场景。以下是一个简单的 LinkedTransferQueue 使用示例:

import java.util.concurrent.LinkedTransferQueue;public class LinkedTransferQueueExample {public static void main(String[] args) {// 创建 LinkedTransferQueueLinkedTransferQueue<String> transferQueue = new LinkedTransferQueue<>();// 生产者线程new Thread(() -> {try {// 将元素传输给消费者transferQueue.transfer("Message from producer");System.out.println("Message sent by producer");} catch (InterruptedException e) {e.printStackTrace();}}).start();// 消费者线程new Thread(() -> {try {// 接收生产者传输的元素String message = transferQueue.take();System.out.println("Message received by consumer: " + message);} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}

在上述示例中,LinkedTransferQueue 实现了生产者与消费者之间的消息传输,具有更好的性能表现。

2. Akka

2.1 Actor 模型

Actor 模型是一种并发计算的模型,其中的 Actor 是并发执行的基本单位。在 Akka 中,ActorSystem 是整个 Actor 模型的入口,ActorRef 用于在 Actor 之间传递消息。以下是一个简单的 Actor 示例:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;public class ActorExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system = ActorSystem.create("MySystem");// 创建 ActorActorRef myActor = system.actorOf(Props.create(MyActor.class), "myActor");// 发送消息给 ActormyActor.tell("Hello, Actor!", ActorRef.noSender());}// 定义一个简单的 Actorstatic class MyActor extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().match(String.class, message -> {System.out.println("Received message: " + message);}).build();}}
}

2.2 并发与分布式

Akka 提供了强大的并发和分布式支持。Clustering 允许将多个 ActorSystem 组成集群,Sharding 用于将 Actor 分布到多个节点,而 Distributed Data 提供了分布式数据结构的实现。

2.2.1 Clustering

Clustering 是 Akka 中用于构建集群的核心模块。通过 Cluster 模块,可以将多个运行在不同 JVM 中的 ActorSystem 组成一个分布式集群,实现节点之间的通信和协同工作。以下是一个简单的集群示例:

import akka.actor.AbstractActor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;public class ClusterExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system1 = ActorSystem.create("ClusterSystem");ActorSystem system2 = ActorSystem.create("ClusterSystem");// 将两个 ActorSystem 加入同一个集群Cluster.get(system1).join(Cluster.get(system2).selfAddress());// 创建一个运行在集群中的 Actorsystem1.actorOf(Props.create(ClusterActor.class), "clusterActor");}// 集群中的 Actorstatic class ClusterActor extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(message -> {System.out.println("Received message: " + message + " by " + self().path());}).build();}}
}

上述示例中,ClusterActor 在两个不同的 ActorSystem 中运行,并通过 Cluster 模块加入了同一个集群。这样,它们可以相互通信,形成一个分布式集群。

2.2.2 Sharding

Sharding 是 Akka 中用于分片管理的模块。它允许将大量的 Actor 实例分布到多个节点上,每个节点负责一部分数据。这有助于提高并发性能和分布式扩展性。以下是一个简单的 Sharding 示例:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;public class ShardingExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system = ActorSystem.create("ShardingSystem");// 创建 Sharding 区域ActorRef shardingRegion = ClusterSharding.get(system).start("MyShardingActor", Props.create(ShardingActor.class), ClusterShardingSettings.create(system), new ShardingMessageExtractor(), new ShardingMessageExtractor());// 发送消息给 Sharding ActorshardingRegion.tell(new ShardingMessageExtractor.ShardingMessage("shard-1", "Hello, Sharding Actor!"), ActorRef.noSender());}// Sharding Actorstatic class ShardingActor extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().match(ShardingMessageExtractor.ShardingMessage.class, message -> {System.out.println("Received sharded message: " + message.getMessage());}).build();}}// Sharding 消息提取器static class ShardingMessageExtractor implements ShardRegion.MessageExtractor {@Overridepublic Object entityMessage(Object message) {return message;}@Overridepublic String entityId(Object message) {if (message instanceof ShardingMessage) {return ((ShardingMessage) message).getShardId();}return null;}@Overridepublic String shardId(Object message) {if (message instanceof ShardingMessage) {return ((ShardingMessage) message).getShardId();}return null;}static class ShardingMessage {private final String shardId;private final String message;public ShardingMessage(String shardId, String message) {this.shardId = shardId;this.message = message;}public String getShardId() {return shardId;}public String getMessage() {return message;}}}
}

在上述示例中,通过 ClusterSharding 模块,创建了一个名为 “MyShardingActor” 的 Sharding 区域。消息通过 ShardingMessageExtractor 进行分片,并由相应的 ShardingActor 处理。

2.2.3 Distributed Data

Distributed Data 是 Akka 中用于处理分布式数据的模块。它提供了一系列的分布式数据结构,如 ReplicatorLWWMap 等,使得在分布式系统中更容易实现数据的一致性和复制。以下是一个简单的 Replicator 示例:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ddata.*;public class DistributedDataExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system1 = ActorSystem.create("DistributedDataSystem");ActorSystem system2 = ActorSystem.create("DistributedDataSystem");// 加入同一个集群Cluster.get(system1).join(Cluster.get(system2).selfAddress());// 创建 ReplicatorActorRef replicator1 = DistributedData.get(system1).replicator();ActorRef replicator2 = DistributedData.get(system2).replicator();// 创建分布式 MapReplicatedDataKey<String> dataKey = ReplicatedDataKey.create("myData", Replicators.CausalDiamond);// 在系统1中更新值replicator1.tell(new Replicator.Update<>(dataKey, Replicators.writeLocal(), "key", new Replicator.UpdateData<>("value", Replicators.writeLocal())),ActorRef.noSender());// 在系统2中读取值replicator2.tell(new Replicator.Get<>(dataKey, Replicators.readLocal()), ActorRef.noSender());}// Actor 处理分布式数据的更新和读取static class DistributedDataActor extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().match(Replicator.GetSuccess.class, success -> {System.out.println("Read value: " + success.get(dataKey).get("key"));}).build();}}
}

在上述示例中,通过 DistributedData 模块创建了两个 ActorSystem,并在集群中加入了相同的地址。通过Replicator 进行分布式数据的复制和同步。在示例中,通过 ReplicatedDataKey 创建了一个名为 “myData” 的分布式 Map,并在系统1中更新了键值对 “key” 和 “value”,然后在系统2中读取了该值。

2.2.4 Akka Streams

Akka Streams 是 Akka 中用于处理流数据的模块。它提供了一种声明式的方式来操作和处理数据流,适用于异步、高并发的场景。以下是一个简单的 Akka Streams 示例:

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;import java.util.Arrays;public class AkkaStreamsExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system = ActorSystem.create("AkkaStreamsSystem");// 创建流执行环境ActorMaterializer materializer = ActorMaterializer.create(system);// 创建数据源Source<Integer, ?> source = Source.from(Arrays.asList(1, 2, 3, 4, 5));// 定义数据处理流程source.map(value -> value * 2).filter(value -> value > 5).to(Sink.foreach(System.out::println)).run(materializer);}
}

在上述示例中,通过 Akka Streams 创建了一个数据源,并定义了一个数据处理流程,包括将每个元素乘以2、过滤掉小于等于5的元素,最后将结果打印出来。

2.2.5 Akka HTTP

Akka HTTP 是 Akka 中用于构建高性能、可伸缩的 HTTP 服务的模块。它提供了一套强大而灵活的 API,支持异步和流式处理。以下是一个简单的 Akka HTTP 服务示例:

import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;public class AkkaHttpExample extends AllDirectives {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system = ActorSystem.create("AkkaHttpSystem");// 创建流执行环境ActorMaterializer materializer = ActorMaterializer.create(system);// 定义路由Route route = path("hello", () ->get(() ->complete("Hello, Akka HTTP!")));// 启动 HTTP 服务Http.get(system).bindAndHandle(route.flow(system, materializer), ConnectHttp.toHost("localhost", 8080), materializer);}
}

在上述示例中,通过 Akka HTTP 定义了一个简单的路由,当访问 “/hello” 路径时,返回 “Hello, Akka HTTP!”。

3. RxJava

3.1 Observable 与 Observer

RxJava 是响应式编程库,基于观察者模式。Observable 代表一个可被观察的对象,而 Observer 则是观察者。以下是一个简单的 RxJava 示例:

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class RxJavaExample {public static void main(String[] args) {// 创建 ObservableObservable<String> observable = Observable.just("Hello", "RxJava");// 创建 ObserverObserver<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("Subscribed");}@Overridepublic void onNext(String value) {System.out.println("Received: " + value);}@Overridepublic void onError(Throwable e) {System.err.println("Error: " + e.getMessage());}@Overridepublic void onComplete() {System.out.println("Completed");}};// 订阅 Observableobservable.subscribe(observer);}
}

3.2 操作符

RxJava 提供了丰富的操作符,用于对发射的数据进行变换、过滤和合并等操作。以下是一些常用的操作符示例:

import io.reactivex.Observable;
import io.reactivex.functions.Function;public class RxJavaOperatorsExample {public static void main(String[] args) {// 转换操作符:mapObservable<Integer> numbers = Observable.just(1, 2, 3, 4, 5);numbers.map(value -> value * 2).subscribe(System.out::println);// 过滤操作符:filterObservable<String> fruits = Observable.just("Apple", "Banana", "Orange", "Grape");fruits.filter(fruit -> fruit.length() > 5).subscribe(System.out::println);// 合并操作符:zipObservable<Integer> integers = Observable.just(1, 2, 3);Observable<String> strings = Observable.just("A", "B", "C");Observable.zip(integers, strings, (num, str) -> num + str).subscribe(System.out::println);}
}

3.3 背压处理

在处理大量数据或者处理速度较快的情况下,为了避免产生过多的数据导致内存溢出,需要使用背压处理机制。RxJava 提供了 Flowable 类来支持背压处理。以下是一个简单的背压处理示例:

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;public class RxJavaBackpressureExample {public static void main(String[] args) throws InterruptedException {Flowable<Integer> flowable = Flowable.create(emitter -> {for (int i = 1; i <= 1000; i++) {emitter.onNext(i);}emitter.onComplete();}, BackpressureStrategy.BUFFER);flowable.observeOn(Schedulers.io()).subscribe(System.out::println);Thread.sleep(1000); // 等待异步线程执行}
}

3.4 任务调度器

RxJava 提供了不同的调度器(Scheduler)来控制任务在不同线程上的执行。例如,IoScheduler 用于执行 I/O 操作,ComputationScheduler 用于执行计算密集型任务。以下是一个简单的任务调度器示例:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;public class RxJavaSchedulersExample {public static void main(String[] args) throws InterruptedException {Observable.just("Task 1", "Task 2", "Task 3").observeOn(Schedulers.io()).map(task -> {System.out.println("Executing " + task + " on thread " + Thread.currentThread().getName());return task;}).subscribe();Thread.sleep(1000); // 等待异步线程执行}
}

3.5 错误处理

在 RxJava 中,错误处理是一个重要的方面,可以通过 onError 回调来处理发生的异常。以下是一个简单的错误处理示例:

import io.reactivex.Observable;public class RxJavaErrorHandlingExample {public static void main(String[] args) {Observable<Integer> numbers = Observable.just(1, 2, 3, 4, 5);numbers.map(value -> {if (value == 3) {throw new RuntimeException("Error at value 3");}return value;}).subscribe(System.out::println,throwable -> System.err.println("Error: " + throwable.getMessage()));}
}

3.6 异步与并行

RxJava 提供了多种方式来实现异步和并行操作。以下是一个简单的异步与并行示例:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;public class RxJavaAsyncParallelExample {public static void main(String[] args) throws InterruptedException {Observable.just("Task 1", "Task 2", "Task 3").observeOn(Schedulers.io()).map(task -> {System.out.println("Executing " + task + " on thread " + Thread.currentThread().getName());return task;}).subscribe();Thread.sleep(1000); // 等待异步线程执行}
}

3.7 扩展与自定义操作符

RxJava 允许开发者扩展和自定义操作符,以满足特定的需求。以下是一个简单的自定义操作符示例:

import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class RxJavaCustomOperatorExample {public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).lift(upperCaseOperator()).subscribe(System.out::println);}private static ObservableOperator<String, Integer> upperCaseOperator() {return observer -> new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {observer.onSubscribe(d);}@Overridepublic void onNext(Integer value) {observer.onNext(String.valueOf(value).toUpperCase());}@Overridepublic void onError(Throwable e) {observer.onError(e);}@Overridepublic void onComplete() {observer.onComplete();}};}
}

4. ForkJoin 框架

4.1 工作窃取算法

ForkJoin 框架采用工作窃取算法(Work-Stealing),使得任务可以在多个线程之间高效地分配和执行。工作窃取算法允许线程在执行完自己的任务后,主动去窃取其他线程的任务执行,从而实现任务的动态负载均衡。这样,当某个线程执行完自己的任务后,它可以帮助其他线程执行任务,提高整体的并发效率。

4.2 ForkJoinPool 类

ForkJoinPool 是 ForkJoin 框架的核心类,负责管理工作线程池。它提供了一个通用的线程池,用于执行 ForkJoinTask。在一般情况下,可以使用默认的无参构造函数创建一个 ForkJoinPool,也可以通过构造函数指定并行度(Parallelism),即并发执行的线程数目。

下面是一个使用 ForkJoin 框架计算数组元素和的示例:

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;public class ForkJoinExample {public static void main(String[] args) {int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};ForkJoinPool forkJoinPool = new ForkJoinPool();// 创建一个计算任务SumTask task = new SumTask(array, 0, array.length);// 提交任务并获取结果int result = forkJoinPool.invoke(task);System.out.println("Sum of array elements: " + result);}static class SumTask extends RecursiveTask<Integer> {private final int[] array;private final int start;private final int end;public SumTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Integer compute() {// 如果任务足够小,直接计算结果if (end - start <= 2) {int sum = 0;for (int i = start; i < end; i++) {sum += array[i];}return sum;}// 否则,拆分任务int middle = (start + end) / 2;SumTask leftTask = new SumTask(array, start, middle);SumTask rightTask = new SumTask(array, middle, end);// 并行执行子任务leftTask.fork();rightTask.fork();// 合并子任务的结果return leftTask.join() + rightTask.join();}}
}

上述示例中,我们通过 ForkJoinPool 创建了一个工作线程池,然后定义了一个 SumTask 继承自 RecursiveTask,用于计算数组元素的和。在 compute 方法中,我们判断任务是否足够小,如果是,则直接计算结果;否则,将任务拆分成两个子任务并并行执行。最后,合并子任务的结果得到最终的计算结果。

4.3 RecursiveTask 与 RecursiveAction 类

RecursiveTask 用于表示有返回值的任务,而 RecursiveAction 用于表示无返回值的任务。在上面的例子中,我们使用了 RecursiveTask,因为我们希望计算出数组元素的和,并返回一个结果。

4.4 ForkJoinTask 类

ForkJoinTask 是 ForkJoin 框架中所有任务的基类,它提供了一些用于管理任务执行的方法。在前面的例子中,RecursiveTask 继承了 ForkJoinTask,并通过 fork 方法实现了任务的拆分与并行执行,通过 join 方法实现了子任务结果的合并。

这样,通过 ForkJoin 框架,我们能够方便地实现复杂的任务拆分与并行执行,从而充分利用多核处理器的性能。

4.5 RecursiveTask 与 RecursiveAction 类(续)

ForkJoinTask 中,RecursiveTaskRecursiveAction 分别用于表示有返回值的任务和无返回值的任务。

4.5.1 RecursiveTask

RecursiveTask 是一个泛型类,用于表示有返回值的任务。通过继承 RecursiveTask,可以实现自定义的有返回值的任务。以下是一个简单的示例,计算斐波那契数列的第 n 项:

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;public class FibonacciExample {public static void main(String[] args) {int n = 10;ForkJoinPool forkJoinPool = new ForkJoinPool();// 创建一个计算任务FibonacciTask task = new FibonacciTask(n);// 提交任务并获取结果int result = forkJoinPool.invoke(task);System.out.println("Fibonacci number at position " + n + ": " + result);}static class FibonacciTask extends RecursiveTask<Integer> {private final int n;public FibonacciTask(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n <= 1) {return n;}FibonacciTask leftTask = new FibonacciTask(n - 1);leftTask.fork();FibonacciTask rightTask = new FibonacciTask(n - 2);rightTask.fork();return leftTask.join() + rightTask.join();}}
}

在上述示例中,通过继承 RecursiveTask,实现了一个计算斐波那契数列第 n 项的任务。任务在计算过程中拆分为两个子任务,分别计算第 n - 1 和第 n - 2 项,然后合并子任务的结果得到最终结果。

4.5.2 RecursiveAction

RecursiveAction 是一个泛型类,用于表示无返回值的任务。通过继承 RecursiveAction,可以实现自定义的无返回值的任务。以下是一个简单的示例,打印斐波那契数列的前 n 项:

import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;public class FibonacciPrintExample {public static void main(String[] args) {int n = 10;ForkJoinPool forkJoinPool = new ForkJoinPool();// 创建一个打印任务FibonacciPrintTask task = new FibonacciPrintTask(n);// 提交任务forkJoinPool.invoke(task);}static class FibonacciPrintTask extends RecursiveAction {private final int n;public FibonacciPrintTask(int n) {this.n = n;}@Overrideprotected void compute() {int[] fib = new int[n];computeFibonacci(fib, n);for (int i = 0; i < n; i++) {System.out.print(fib[i] + " ");}}private void computeFibonacci(int[] fib, int n) {fib[0] = 0;if (n > 1) {fib[1] = 1;computeFibonacci(fib, n, 2);}}private void computeFibonacci(int[] fib, int n, int current) {if (current < n) {fib[current] = fib[current - 1] + fib[current - 2];computeFibonacci(fib, n, current + 1);}}}
}

在上述示例中,通过继承 RecursiveAction,实现了一个打印斐波那契数列前 n 项的任务。任务在计算过程中递归调用自身,直到计算完成。由于该任务无返回值,因此 compute 方法不需要返回结果。

5. Disruptor

5.1 高性能无锁并发框架

Disruptor 是一个专注于高性能、无锁并发的框架,主要应用于金融领域的低延迟系统。它的核心思想是通过环形缓冲区(RingBuffer)实现高效的事件发布与订阅,避免了传统锁机制可能带来的性能瓶颈。

5.2 RingBuffer 数据结构

RingBuffer 是 Disruptor 中的核心数据结构,采用环形缓冲区的形式存储事件。它通过使用预分配的数组,避免了链式结构的内存分配,减少了垃圾回收的压力。多个生产者可以同时向 RingBuffer 中发布事件,多个消费者可以同时订阅并处理事件。这种设计使得 Disruptor 能够以极低的延迟处理大量的事件。

5.3 生产者-消费者模式

Disruptor 基于生产者-消费者模式,通过无锁的方式实现了高效的事件处理。生产者负责向 RingBuffer 中发布事件,消费者负责订阅并处理事件。由于 RingBuffer 的环形结构,生产者和消费者之间不存在竞争关系,不需要加锁,从而避免了传统并发编程中锁带来的性能开销。

5.4 应用于低延迟的金融交易系统

Disruptor 的高性能和低延迟特性使其在金融领域的高频交易系统中得到广泛应用。在这些系统中,对事件的处理速度要求极高,而 Disruptor 的设计理念正好满足了这些需求。通过有效地利用现代计算机硬件的特性,避免了传统锁机制可能引入的性能问题,使得 Disruptor 成为处理金融交易等对低延迟要求极高的场景的理想选择。

以下是一个简单的使用 Disruptor 的示例,演示了生产者和消费者之间的协作:

import com.lmax.disruptor.*;import java.util.concurrent.Executors;public class DisruptorExample {public static void main(String[] args) {// 创建 Disruptor 环境Disruptor<Event> disruptor = new Disruptor<>(Event::new, 1024, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());// 设置事件处理器disruptor.handleEventsWith(new EventHandler<Event>() {@Overridepublic void onEvent(Event event, long sequence, boolean endOfBatch) {// 处理事件System.out.println("Event: " + event.getData() + " processed by " + Thread.currentThread().getName());}});// 启动 Disruptordisruptor.start();// 创建生产者RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();EventProducer producer = new EventProducer(ringBuffer);// 发布事件for (int i = 0; i < 10; i++) {producer.publishEvent(i);}// 关闭 Disruptordisruptor.shutdown();}static class Event {private int data;public int getData() {return data;}public void setData(int data) {this.data = data;}}static class EventProducer {private final RingBuffer<Event> ringBuffer;public EventProducer(RingBuffer<Event> ringBuffer) {this.ringBuffer = ringBuffer;}public void publishEvent(int data) {// 获取下一个可用的序号long sequence = ringBuffer.next();try {// 获取序号对应的事件对象Event event = ringBuffer.get(sequence);// 设置事件数据event.setData(data);} finally {// 发布事件ringBuffer.publish(sequence);}}}
}

上述示例中,我们使用了 Disruptor 框架创建了一个环境,定义了一个事件类 Event,并设置了事件处理器。然后创建了一个生产者 EventProducer,通过调用 publishEvent 发布事件。通过 Disruptor 的内部机制,生产者和消费者之间的通信实现了高效的事件处理。在实际应用中,可以根据具体场景进一步定制事件处理逻辑。

6. Guava 并发库

6.1 ListenableFuture 接口

ListenableFuture 是 Guava 提供的接口,扩展了 JDK 的 Future 接口,允许注册回调函数。这个接口的设计目的是为了在异步操作完成时执行特定操作,而不需要显式地等待异步操作的完成。以下是一个简单的使用示例:

import com.google.common.util.concurrent.*;import java.util.concurrent.Executors;public class ListenableFutureExample {public static void main(String[] args) {// 创建 ListeningExecutorServiceListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));// 提交异步任务ListenableFuture<String> future = executorService.submit(() -> {System.out.println("Task executed by " + Thread.currentThread().getName());return "Result";});// 注册回调函数Futures.addCallback(future, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {System.out.println("Success! Result: " + result);}@Overridepublic void onFailure(Throwable t) {System.err.println("Failure: " + t.getMessage());}}, executorService);// 关闭 executorServiceexecutorService.shutdown();}
}

在上述示例中,我们使用 Guava 的 ListeningExecutorService 包装了 JDK 的 ExecutorService,并通过 Futures.addCallback 注册了回调函数。这样,当异步任务完成时,将自动执行注册的回调函数。

6.2 Futures 工具类

Guava 的 Futures 工具类提供了一系列用于处理 ListenableFuture 的静态方法。除了 addCallback,还有其他一些方法,例如 transformtransformAsync 等,用于对异步操作的结果进行转换或组合。这些方法的设计目的是为了简化异步编程的复杂性,使代码更加清晰。

import com.google.common.util.concurrent.*;import java.util.concurrent.Executors;public class FuturesExample {public static void main(String[] args) {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));// 提交异步任务ListenableFuture<Integer> future = executorService.submit(() -> {System.out.println("Task executed by " + Thread.currentThread().getName());return 42;});// 使用 transform 转换结果ListenableFuture<String> transformedFuture = Futures.transform(future, Object::toString, executorService);// 注册回调函数Futures.addCallback(transformedFuture, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {System.out.println("Transformed Result: " + result);}@Overridepublic void onFailure(Throwable t) {System.err.println("Failure: " + t.getMessage());}}, executorService);// 关闭 executorServiceexecutorService.shutdown();}
}

在上述示例中,我们使用 Futures.transform 将异步任务的结果从整数转换为字符串。通过使用 Guava 的工具方法,我们能够更加方便地对异步操作的结果进行处理。

6.3 SettableFuture 类

SettableFuture 是 Guava 提供的一个实现了 ListenableFuture 接口的类,可以手动设置异步任务的结果。它允许在异步任务的执行体中,根据实际情况设置成功或失败的结果,从而更加灵活地控制异步任务的执行。

以下是一个使用 SettableFuture 的示例:

import com.google.common.util.concurrent.*;import java.util.concurrent.Executors;public class SettableFutureExample {public static void main(String[] args) {ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));// 创建 SettableFutureSettableFuture<String> settableFuture = SettableFuture.create();// 手动设置异步任务的结果executorService.submit(() -> {try {Thread.sleep(1000); // 模拟异步操作settableFuture.set("Success Result");} catch (InterruptedException e) {settableFuture.setException(e);}});// 注册回调函数Futures.addCallback(settableFuture, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {System.out.println("Success! Result: " + result);}@Overridepublic void onFailure(Throwable t) {System.err.println("Failure: " + t.getMessage());}}, executorService);// 关闭 executorServiceexecutorService.shutdown();}
}

在上述示例中,我们使用 SettableFuture 创建了一个实现了 ListenableFuture 接口的对象,并在异步任务的执行体中手动设置了成功的结果。这样,我们就可以灵活地在异步操作中控制结果的设置。

6.4 ListeningExecutorService 接口

Guava 的 ListeningExecutorService 接口是对 JDK 的 ExecutorService 的扩展,允许执行异步任务,并提供了一些用于处理 ListenableFuture 的方法。它提供了 submit 方法用于提交异步任务,以及 shutdown 方法用于关闭 executor。通过使用 ListeningExecutorService,我们可以更方便地处理异步任务的结果。

以上是 Guava 并发库的一些基本用法和实例,Guava 提供了丰富的工具类和接口,能够简化并发编程的复杂性,提高代码的可读性和可维护性。在实际应用中,可以根据具体的需求选择合适的工具类和接口,以便更高效地处理并发操作。

6.5 RateLimiter 类

RateLimiter 是 Guava 提供的一个用于令牌桶算法的实现类,用于控制某个资源访问的速度。通过 RateLimiter,我们可以限制对资源的访问频率,以便更好地控制系统的并发性。

以下是一个使用 RateLimiter 的简单示例:

import com.google.common.util.concurrent.RateLimiter;public class RateLimiterExample {public static void main(String[] args) {// 创建一个每秒发放两个令牌的 RateLimiterRateLimiter rateLimiter = RateLimiter.create(2.0);// 模拟请求for (int i = 0; i < 10; i++) {// 尝试获取令牌double waitTime = rateLimiter.acquire();// 执行业务逻辑System.out.println("Request " + i + " served after waiting for " + waitTime + " seconds");}}
}

在上述示例中,我们创建了一个每秒发放两个令牌的 RateLimiter,然后模拟了一系列请求。通过 acquire 方法,我们可以获取令牌,并在获取令牌的过程中进行阻塞,以控制请求的速率。

6.6 Cache 类

Cache 是 Guava 提供的缓存实现类,用于将键值对存储在内存中,以便更快地检索数据。Cache 提供了一系列的方法,用于将数据放入缓存、从缓存中获取数据以及清理缓存等操作。

以下是一个使用 Cache 的简单示例:

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;import java.util.concurrent.TimeUnit;public class CacheExample {public static void main(String[] args) {// 创建一个最大容量为 100,过期时间为 5 分钟的缓存Cache<String, String> cache = CacheBuilder.newBuilder().maximumSize(100).expireAfterWrite(5, TimeUnit.MINUTES).build();// 将数据放入缓存cache.put("key1", "value1");cache.put("key2", "value2");// 从缓存中获取数据String value1 = cache.getIfPresent("key1");String value3 = cache.getOrDefault("key3", "default");System.out.println("Value 1: " + value1);System.out.println("Value 3: " + value3);}
}

在上述示例中,我们使用 CacheBuilder 创建了一个最大容量为 100,过期时间为 5 分钟的缓存。然后,我们将数据放入缓存,并通过 getIfPresentgetOrDefault 方法从缓存中获取数据。

Guava 并发库中还有其他许多有用的类和接口,用于处理并发编程中的各种场景。在实际应用中,可以根据具体的需求选择合适的工具类和接口,以便更高效地处理并发操作。

总结

通过学习本文提供的内容,读者将掌握Java中并发编程的核心概念和高级工具。了解多线程编程的基本原理、并发库的使用方法以及适用场景,将使开发者能够更加自信地构建高性能、可伸缩且稳定的应用程序。在竞争激烈的软件开发领域,具备并发编程的深入知识将成为区分优秀开发者的重要标志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/611428.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++进阶05】AVL树的介绍及模拟实现

一、AVL树的概念 二叉搜索树的缺点 二叉搜索树虽可以缩短查找效率 但如果数据有序或接近有序 二叉搜索树将退化为单支树 查找元素相当于在顺序表中搜索元素&#xff0c;效率低下 AVL树便是解决此问题 向二叉搜索树中插入新结点 并保证每个结点的左右子树 高度之差的绝对值不超…

新能源汽车的三电指的是什么,作用有什么区别。

问题描述&#xff1a;新能源汽车的三电指的是什么&#xff0c;作用有什么区别。 问题解答&#xff1a; "新能源汽车的三电"通常指的是新能源汽车中的三大核心电气系统&#xff0c;包括&#xff1a;高压电池系统、电动机驱动系统和电子控制系统。这三者协同工作&…

Java诊断利器Arthas

https://arthas.aliyun.com/doc/https://arthas.aliyun.com/doc/ 原理 利用java.lang.instrument(容器类) 做动态 Instrumentation(执行容器) 是 JDK5 的新特性。使用 Instrumentation&#xff0c;开发者可以构建一个独立于应用程序的代理程序&#xff08;Agent&#xff09;&…

汽车IVI中控开发入门及进阶(十二):手机投屏

前言: 汽车座舱有车载中控大屏、仪表/HUD多屏的显示能力,有麦克风/喇叭等车载环境更好的音频输入输出能力,有方控按键、旋钮等方便的反向控制输入能力,还有高精度的车辆数据等。但汽车座舱中控主机硬件计算能力升级迭代周期相对较长,汽车的应用和服务不够丰富。现在很多汽…

.NetCore部署微服务(二)

目录 前言 概念 一 Consul注册服务中心 1.1 consul下载 1.2 consul运行 二 服务注册 2.1 安装Consul包 2.2 修改配置文件 2.3 注入Consul服务 2.3 修改Controller&#xff0c;增加HealthCheck方法 三 运行服务 3.1 docker运行服务 前言 上一篇讲到微服务要灵活伸缩…

「超级细菌」魔咒或将打破,MIT 利用深度学习发现新型抗生素

作者&#xff1a;加零 编辑&#xff1a;李宝珠、三羊 MIT 利用图神经网络 Chemprop 识别潜在抗生素&#xff0c;特异性杀死鲍曼不动杆菌。 自然界中充满了各种各样的微生物&#xff0c;例如结核杆菌&#xff08;导致肺结核&#xff09;、霍乱弧菌&#xff08;导致霍乱&#…

数据结构实验4:链表的基本操作

目录 一、实验目的 二、实验原理 1. 节点 2. 指针 3.链表的类型 3.1 单向链表 3.2 双向链表 3.3 单向循环链表 3.4 双向循环链表 4. 单链表的插入 4.1 头插法 4.2 尾插法 4.3 在指定位置插入元素 5. 单链表的删除 5.1 删除指定数值的节点 5.2 删除指定位置的节点 …

Pytorch从零开始实战16

Pytorch从零开始实战——ResNeXt-50算法的思考 本系列来源于365天深度学习训练营 原作者K同学 对于上次ResNeXt-50算法&#xff0c;我们同样有基于TensorFlow的实现。具体代码如下。 引入头文件 import numpy as np from tensorflow.keras.preprocessing.image import Ima…

Rust基础类型之布尔类型和字符

布尔类型 Rust 中的布尔类型为 bool&#xff0c;仅仅有两个值&#xff0c;true 和 false。比如下方代码&#xff1a; let flag1 true; let flag2: bool false;字符 Rust 中的字符类型是 char&#xff0c;值用单引号括起来。 fn main() {let char1 z;let char2: char ℤ…

TensorRt(5)动态尺寸输入的分割模型测试

文章目录 1、固定输入尺寸逻辑2、动态输入尺寸2.1、模型导出2.2、推理测试2.3、显存分配问题2.4、完整代码 这里主要说明使用TensorRT进行加载编译优化后的模型engine进行推理测试&#xff0c;与前面进行目标识别、目标分类的模型的网络输入是固定大小不同&#xff0c;导致输入…

Java单元测试 1.SpringBoot 2.普通java工程

1)SpringBoot工程 1.注意&#xff1a;新建SpringBoot工程时&#xff0c;一定要根据你当前的jdk版本选择好SpringBoot的版本&#xff0c;比如&#xff1a;jdk为11&#xff0c;则不能创建3.x的SpringBoot工程&#xff0c;有时候2021版本的idea强制我创建3.x版本的SpringBoot&…

【打卡】牛客网:BM79 打家劫舍(二)

资料&#xff1a; dp.clear()会把dp的size变为0。 assign和insert的对比&#xff1a; v1.assign(v2.begin(), v2.end()); v1.insert(pos,n,elem); //在pos位置插入n个elem数据&#xff0c;无返回值。 v1.insert(pos,beg,end); //在pos位置插入[beg,end)区间的数据&#xff0…

【现代密码学】笔记3.4-3.7--构造安全加密方案、CPA安全、CCA安全 《introduction to modern cryphtography》

【现代密码学】笔记3.4-3.7--构造安全加密方案、CPA安全、CCA安全 《introduction to modern cryphtography》 写在最前面私钥加密与伪随机性 第二部分流加密与CPA多重加密 CPA安全加密方案CPA安全实验、预言机访问&#xff08;oracle access&#xff09; 操作模式伪随机函数PR…

Java微服务系列之 ShardingSphere - ShardingSphere-JDBC

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 系列专栏目录 [Java项…

复习python从入门到实践——类

复习python从入门到实践——类 学好继承&#xff0c;你就初步学习了程序面向对象啦&#xff5e;本章我依旧为大家整理好了运行的步骤&#xff0c;以及举了尽可能清晰简单的例子为大家展示。 目录 复习python从入门到实践——类1. Syntax注意&#xff1a; 2.修改属性值3.继承4…

报错解决:No module named ‘pytorch_lightning‘ 安装pytorch_lightning

报错记录 执行如下代码&#xff1a; import pytorch_lightning报错&#xff1a; No module named ‘pytorch_lightning’ 解决方式 安装pytorch_lightning包即可。 一般情况下&#xff0c;缺失的包通过pip安装&#xff0c;即&#xff1a; pip install pytorch_lightning然…

C++性能优化简单总结

什么样的代码是高度优化的&#xff1f; 我们先出去数据结构和算法本身的使用。C 的高效代码通常是利用了各种编译器优化和语言特性来最大程度地提高执行效率和资源利用率的代码。我们需要编写编译器友好的代码来让编译器优化或者说编写出不用编译器高度优化优化也能达到同样效果…

1 快速前端开发

1 前端开发 目的&#xff1a;开发一个平台&#xff08;网站&#xff09;- 前端开发&#xff1a;HTML、CSS、JavaScript- Web框架&#xff1a;接收请求并处理- MySQL数据库&#xff1a;存储数据地方快速上手&#xff1a;基于Flask Web框架让你快速搭建一个网站出来。1.快速开发…

HarmonyOS应用开发学习笔记 应用上下文Context 获取文件夹路径

1、 HarmoryOS Ability页面的生命周期 2、 Component自定义组件 3、HarmonyOS 应用开发学习笔记 ets组件生命周期 4、HarmonyOS 应用开发学习笔记 ets组件样式定义 Styles装饰器&#xff1a;定义组件重用样式 Extend装饰器&#xff1a;定义扩展组件样式 5、HarmonyOS 应用开发…

14-股票K线图功能-个股日K线SQL分析__ev

需求&#xff1a;统计个股日K线数据&#xff0c;也就是把某只股票每天的最高价&#xff0c;开盘价&#xff0c;收盘价&#xff0c;最低价形成K线图。