将java.util.concurrent.BlockingQueue用作rx.Observable

在Java中,经典的生产者-消费者模式相对简单,因为我们有java.util.concurrent.BlockingQueue 。 为了避免繁忙的等待和容易出错的手动锁定,我们只需利用put()take() 。 如果队列已满或为空,它们都将阻塞。 我们需要的是一堆线程共享对同一队列的引用:一些正在生产而其他正在消耗。 当然,队列必须具有有限的容量,否则,如果生产者的表现优于消费者,我们很快就会用光内存。 格雷格·扬(Greg Young)在波兰Devoxx期间对这条规则的强调不够:

永远不要创建无限队列

使用

这是最简单的例子。 首先,我们需要一个将对象放在共享队列中的生产者:

import lombok.Value;
import lombok.extern.slf4j.Slf4j;@Slf4j
@Value
class Producer implements Runnable {private final BlockingQueue<User> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user = new User("User " + System.currentTimeMillis());log.info("Producing {}", user);queue.put(user);TimeUnit.SECONDS.sleep(1);}} catch (Exception e) {log.error("Interrupted", e);}}
}

生产者只需每秒将User类的实例(无论它是什么)发布到给定队列。 显然,在现实生活中,将User在队列中是系统中某些操作(例如用户登录)的结果。 同样,消费者从队列中获取新项目并进行处理:

@Slf4j
@Value
class Consumer implements Runnable {private final BlockingQueue<User> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user = queue.take();log.info("Consuming: {}", user);}} catch (Exception e) {log.error("Interrupted", e);}}
}

再次,在现实生活中,处理将意味着存储在数据库中或对用户运行某些欺诈检测。 我们使用队列将处理线程与消耗线程解耦,例如减少延迟。 为了运行一个简单的测试,让我们启动几个生产者和消费者线程:

BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(new Producer(queue),new Producer(queue),new Consumer(queue),new Consumer(queue),new Consumer(queue)
);final List<Thread> threads = runnables.stream().map(runnable -> new Thread(runnable, threadName(runnable))).peek(Thread::start).collect(toList());TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);//...private static String threadName(Runnable runnable) {return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}

我们有2个生产者和3个消费者,似乎一切正常。 在现实生活中,您可能会有一些隐式生产者线程,例如HTTP请求处理线程。 在使用者方面,您很可能会使用线程池。 这种模式效果很好,但是特别是在消费方面是很底层的。

介绍

本文的目的是介绍一种抽象,其行为类似于生产者方的队列,但表现为来自消费者方的RxJava的Observable 。 换句话说,我们可以将添加到队列中的对象视为可以在客户端映射,过滤,撰写等的流。 有趣的是,这不再是排在后面的队列。 ObservableQueue<T>仅将所有新对象直接转发给订阅的使用者,并且在没有人监听(“可观察到的” )的情况下不缓冲事件。 ObservableQueue<T>本身并不是队列,它只是一个API与另一个API之间的桥梁。 它类似于java.util.concurrent.SynchronousQueue ,但是如果没有人对使用感兴趣,则将对象简单地丢弃。

这是第一个实验性实现。 这只是一个玩具代码,不要认为它已准备就绪。 另外,我们稍后将对其进行简化:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());private final Observable<T> observable = Observable.create(subscriber -> {subscriber.add(new Subscription() {@Overridepublic void unsubscribe() {subscribers.remove(subscriber);}@Overridepublic boolean isUnsubscribed() {return false;}});subscribers.add(subscriber);});public Observable<T> observe() {return observable;}@Overridepublic boolean add(T t) {return offer(t);}@Overridepublic boolean offer(T t) {subscribers.forEach(subscriber -> subscriber.onNext(t));return true;}@Overridepublic T remove() {return noSuchElement();}@Overridepublic T poll() {return null;}@Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}@Overridepublic T peek() {return null;}@Overridepublic void put(T t) throws InterruptedException {offer(t);}@Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}@Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException("Use observe() instead");}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}@Overridepublic int remainingCapacity() {return 0;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends T> c) {c.forEach(this::offer);return true;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return true;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<T> iterator() {return Collections.emptyIterator();}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return a;}@Overridepublic int drainTo(Collection<? super T> c) {return 0;}@Overridepublic int drainTo(Collection<? super T> c, int maxElements) {return 0;}@Overridepublic void close() throws IOException {subscribers.forEach(rx.Observer::onCompleted);}
}

关于它有两个有趣的事实:

  1. 我们必须跟踪所有订户,即愿意接收新商品的消费者。 如果其中一个订阅者不再感兴趣,我们必须删除该订阅者,否则会发生内存泄漏(请继续阅读!)
  2. 此队列的行为就好像它始终为空。 它永远不会保存任何项目–当您将某些内容放入此队列时,它会自动传递给订阅者并被遗忘
  3. 从技术上讲,此队列是无界的(!),这意味着您可以根据需要放置任意数量的项目。 但是,由于将项目传递给所有订户(如果有)并立即丢弃,因此此队列实际上始终为空(请参见上文)
  4. 生产者可能仍会生成太多事件,而消费者可能无法跟上这一步– RxJava现在具有背压支持,本文未介绍。

假设我正确实现了队列协定,生产者可以像使用其他BlockingQueue<T>一样使用ObservableQueue<T> 。 但是,消费者看起来更轻巧,更聪明:

final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));

上面的代码仅打印"B""C" 。 由于ObservableQueue会在没有人监听的情况下丢弃项目,因此设计会丢失"A" 。 显然, Producer类现在使用users队列。 一切正常,您可以随时调用users.observe()并应用数十个Observable运算符之一。 但是有一个警告:默认情况下,RxJava不执行任何线程处理,因此消耗与产生线程在同一线程中发生! 我们失去了生产者-消费者模式的最重要特征,即线程去耦。 幸运的是,RxJava中的所有内容都是声明性的,线程调度也是如此:

users.observe().observeOn(Schedulers.computation()).forEach(user ->log.info("User logged in: {}", user));

现在让我们看一下RxJava的真正功能。 假设您要计算每秒登录的用户数,其中每个登录都作为事件放入队列中:

users.observe().map(User::getName).filter(name -> !name.isEmpty()).window(1, TimeUnit.SECONDS).flatMap(Observable::count).doOnCompleted(() -> log.info("System shuts down")).forEach(c -> log.info("Logins in last second: {}", c));

性能也是可以接受的,这样的队列每秒可以在我的一个订户的笔记本电脑上接受约300万个对象。 将此类视为使用队列到现代反应世界的旧系统的适配器。 可是等等! 使用ObservableQueue<T>很容易,但是使用subscribers同步集的实现似乎太底层了。 幸运的是有Subject<T, T>SubjectObservable “另一面” –您可以将事件推送到Subject但是它仍然实现Observable ,因此您可以轻松地创建任意Observable 。 使用Subject实现之一, ObservableQueue外观如何:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {private final Subject<T, T> subject = PublishSubject.create();public Observable<T> observe() {return subject;}@Overridepublic boolean add(T t) {return offer(t);}@Overridepublic boolean offer(T t) {subject.onNext(t);return true;}@Overridepublic void close() throws IOException {subject.onCompleted();}@Overridepublic T remove() {return noSuchElement();}@Overridepublic T poll() {return null;}@Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}@Overridepublic T peek() {return null;}@Overridepublic void put(T t) throws InterruptedException {offer(t);}@Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}@Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException("Use observe() instead");}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}@Overridepublic int remainingCapacity() {return 0;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends T> c) {c.forEach(this::offer);return true;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return true;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<T> iterator() {return Collections.emptyIterator();}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return a;}@Overridepublic int drainTo(Collection<? super T> c) {return 0;}@Overridepublic int drainTo(Collection<? super T> c, int maxElements) {return 0;}}

上面的实现更加简洁,我们完全不必担心线程同步。

翻译自: https://www.javacodegeeks.com/2015/07/consuming-java-util-concurrent-blockingqueue-as-rx-observable.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/357801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac OS X 10.10如何打开虚拟内存

转载请注明出处&#xff1a;http://www.cnblogs.com/blazer/p/5103608.html 禁用虚拟内存sudo launchctl unload -w /System/Library/LaunchDaemons/com.apple.metadata.mds.plist删除交换文件sudo rm /private/var/vm/swapfile*启用虚拟内存sudo launchctl load -w /System/L…

java 1.5.0 gcj_CentOS安装JAVA后JAVA版本不对的问题

今天用CentOS安装JDK&#xff0c;发觉在安装完成后&#xff0c;输入java命令来验证是否安装成功时&#xff0c;出现Usage: gij [OPTION] ... CLASS [ARGS] ...to invoke CLASS.main, orgij -jar [OPTION] ... JARFILE [ARGS] ...to execute a jar fileTry gij --help‘ for mor…

java开发一款模拟写字板系统

导读:目前,很多新的技术领域都涉及到了Java语言,Java语言是面向对象编程,并且涉及到网络、多线程等重要的基础知识,因此Java语言也是学习面向对象编程和网络编程的首选语言。此简易JAVA写字板程序,使用Java程序编写,能够进行输入文字操作,并具有新建文件,打开文件,保…

java枚举的特点_Java中有些好的特性(二):枚举

前言我在写上一篇时&#xff0c;开始选了一个很土的名字“Java超过C#的地方”&#xff0c;然后引起了某些同学的不满&#xff0c;后来修改了个名。我在这里再次申明一下&#xff0c;我没有贬低任何语言之意&#xff0c;纯粹是从一个.NET程序员的角度去看Java&#xff0c;看看是…

java开发一款推箱子游戏

导读:社会在进步,人们生活质量也在日益提高。高强度的压力也接踵而来。社会中急需出现新的有效方式来缓解人们的压力。此次设计符合了社会需求,Java推箱子游戏可以让人们在闲暇之余,体验游戏的乐趣。具有操作简单,易于上手的特点。 推箱子游戏的玩法十分简单——控制人物绕…

java ee技术_Java EE 6与Spring Framework:技术决策过程

java ee技术在过去的几个月中&#xff0c;我们经历了这个决策过程&#xff1a;为Java平台上的企业开发选择哪种技术堆栈&#xff1f; 有多种选择。 但是&#xff0c;我们深入讨论的是&#xff1a;纯Java EE 6堆栈与带有Java EE的Spring。 以下博客文章总结了当您考虑这些技术堆…

java获取系统当前时间格式化_java 获取系统当前时间并格式化

java 获取系统当前时间并格式化CreateTime--2018年5月9日11:41:00Author:Marydon实现方式有三种updateTime--2018年7月23日09点32分准备工作&#xff1a;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;方式一&#xff1a;/*** 获取系统当…

并行和并发

并行:同时运行,从微观的角度讲 并发:同时发生,从宏观的角度讲 并行是真正意义上的同时执行。而并发不是。 “并行”是指无论从微观还是宏观&#xff0c;二者都是一起执行的&#xff0c;就好像两个人各拿一把铁锨在挖坑&#xff0c;一小时后&#xff0c;每人一个大坑。 而“并发…

java 本地方法栈_Java虚拟机栈和本地方法栈

Java虚拟机栈的特征线程私有后进先出(LIFO)栈存储栈帧&#xff0c;支持Java方法的调用、执行和退出可能出现OutOfMemoryError异常和StackOverflowError异常Java本地方法栈的特征线程私有后进先出(LIFO)栈作用是支撑Native方法的调用、执行和退出可能出现OutOfMemoryError异常和…

如何使用Java,Maven,Jetty创建Web应用程序项目

在本文中&#xff0c;我们使用Maven Archetype插件创建一个简单的Web应用程序。 我们将在名为Jetty的Servlet容器中运行此Web应用程序&#xff0c;添加一些依赖项&#xff0c;编写简单的Servlet&#xff0c;并生成WAR文件。 在本文的最后&#xff0c;您还可以在Tomcat中部署该服…

java开发一款坦克大战游戏

导读:随着人们对生活质量的要求一天比一天高,为了让人们更好地开掘自身的智慧,游戏就此进入了大众的视野,在人们的生活中有着重要的位置,已然变得必不可少。游戏产业推动高新技术不断升级,极大地促进了经济的增长,推动了“第四产业”的经济腾飞。坦克大战游戏是童年时期…

java大转盘抽奖概率算法_幸运大转盘抽奖 抽奖算法 程序实现逻辑

近期碰到的一个需求&#xff0c;实现一个类似大转盘抽奖的功能&#xff0c;需自定义奖项&#xff0c;各奖项中奖概率&#xff0c;当日抽奖最大次数&#xff0c;抽奖成本等。分享一个简单的java代码的实现的思路&#xff0c;有不足之处感谢各位指正。初步方法首先要定义几个奖品…

java开发银行柜员业务绩效考核系统

导读:当今社会己进入信息社会时代,信息己经受到社会的广泛关注,被看作社会和科学技术发展的三大支柱(材料、能源、信息)之一。信息是管理的基础,是进行决策的的基本依据。在一个组织里,信息己作为人力、物力、财力之外的第四种能源,占有重要的地位。然而,信息是一种非…

mysql orderby count_mysql中count(),groupby,orderby使用方法分享

本文主要和大家分享mysql中count(), group by, order by使用方法&#xff0c;mysql中order by 排序查询、asc升序、desc降序&#xff0c;group by 分组查询、having 只能用于group by子句、作用于组内&#xff0c;having条件子句可以直接跟函数表达式。使用group by 子句的查询…

一文教你使用java开发一款坦克大战游戏

导读&#xff1a;随着人们对生活质量的要求一天比一天高&#xff0c;为了让人们更好地开掘自身的智慧&#xff0c;游戏就此进入了大众的视野&#xff0c;在人们的生活中有着重要的位置&#xff0c;已然变得必不可少。游戏产业推动高新技术不断升级&#xff0c;极大地促进了经济…

Java 8流中的常见SQL子句及其等效项

功能编程允许使用通用语言进行准声明性编程 。 通过使用功能强大的流畅API&#xff08;例如Java 8的Stream API &#xff09;或jOOλ的顺序Stream扩展Seq或更复杂的库&#xff08;例如javaslang或functionaljava&#xff09; &#xff0c;我们可以以一种非常简洁的方式来表示数…

php开启openssl的方法

windows下开启方法&#xff1a; 1&#xff1a; 首先检查php.ini中&#xff1b;extensionphp_openssl.dll是否存在&#xff0c; 如果存在的话去掉前面的注释符‘&#xff1b;’&#xff0c; 如果不存在这行&#xff0c;那么添加extensionphp_openssl.dll。 2&#xff1a; 讲php文…

mysql 短时大连接的问题_mysql长连接和短连接的问题

什么是长连接&#xff1f;其实长连接是相对于通常的短连接而说的&#xff0c;也就是长时间保持客户端与服务端的连接状态。通常的短连接操作步骤是&#xff1a;连接-》数据传输-》关闭连接&#xff1b;而长连接通常就是&#xff1a;连接-》数据传输-》保持连接-》数据传输-》保…

java实现动漫论坛

导读:作为文化产业的一部分,动漫影响了我国一代又一代青少年,据钱江晚报调查显示,有超过七成的95后愿意从事与动漫相关的行业,可见其对青少年影响力之大。 动漫论坛作为最先开始热爱动漫人士进行交流的方式之一,是爱好者们共享信息,寻找同伴的重要渠道之一。在这次毕业设…

混合使用Azure LB和ILB访问相同web服务(3)

接下来我们来配置Azure Load balancer&#xff0c;就是面向公网的负载均衡器&#xff1a; 1.在该测试中&#xff0c;为了保持内网访问和外网访问一样的体验&#xff0c;本地端口和public端口和ILB一样&#xff0c;同样是80&#xff1a; PS C:\> Get-AzureVM -ServiceName …