通过Java 8流使用Oracle AQ

Oracle数据库最令人敬畏的功能之一是Oracle AQ:Oracle数据库高级队列 。 AQ API直接在数据库中实现了完整的事务性消息传递系统。

在数据库位于系统中心的经典体系结构中,使用AQ进行进程间通信时,多个应用程序(其中一些应用程序用Java编写,其他应用程序用Perl或PL / SQL编写)访问同一数据库。太好了 如果您更喜欢Java EE,则可以购买基于Java的MQ解决方案,并将该消息总线/中间件放在系统体系结构的中心。 但是,为什么不使用数据库呢?

如何在jOOQ中使用PL / SQL AQ API

用于AQ消息入队和出队的PL / SQL API非常简单,可以使用jOOQ的OracleDSL.DBMS_AQ API从Java轻松访问它。

此处使用的队列配置如下所示:

CREATE OR REPLACE TYPE message_t AS OBJECT (ID         NUMBER(7),title      VARCHAR2(100 CHAR)
)
/BEGINDBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'message_aq_t',queue_payload_type => 'message_t');DBMS_AQADM.CREATE_QUEUE(queue_name => 'message_q',queue_table => 'message_aq_t');DBMS_AQADM.START_QUEUE(queue_name => 'message_q');COMMIT;
END;
/

并且jOOQ代码生成器将生成有用的类,并将所有类型信息直接与它们相关联(简化示例):

class Queues {static final Queue<MessageTRecord> MESSAGE_Q = new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);
}class MessageTRecord {void setId(Integer id) { ... }Integer getId() { ... }void setTitle(String title) { ... }String getTitle() { ... }MessageTRecord(Integer id, String title) { ... }
}

然后,可以使用这些类直接在生成的队列引用上安全地使消息类型入队和出队:

// The jOOQ configuration
Configuration c = ...// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q, new MessageTRecord(1, "test"));// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);

很简单,不是吗?

现在,让我们利用Java 8功能

消息队列就是无限(阻塞)消息流。 从Java 8开始,我们为此类消息流提供了强大的API,即Stream API。

这就是为什么我们为即将到来的jOOQ 3.8添加了新的API,将现有的jOOQ AQ API与Jav​​a 8 Streams相结合的原因:

// The jOOQ configuration
Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);

上面的流管道将侦听MESSAGE_Q队列,使用所有消息,过滤掉不包含"test"的消息,并打印其余消息。

阻止流

有趣的是,这是一个无限的阻塞流。 只要队列中没有新消息,流管道处理就会简单地在队列上阻塞,等待新消息。 这对于顺序流来说不是问题,但是在调用Stream.parallel()时会发生什么呢?

jOOQ将消耗事务中的每个消息。 jOOQ 3.8事务在ForkJoinPool.ManagedBlocker运行:

static <T> Supplier<T> blocking(Supplier<T> supplier) {return new Supplier<T>() {volatile T result;@Overridepublic T get() {try {ForkJoinPool.managedBlock(new ManagedBlocker() {@Overridepublic boolean block() {result = supplier.get();return true;}@Overridepublic boolean isReleasable() {return result != null;}});}catch (InterruptedException e) {throw new RuntimeException(e);}return asyncResult;}};
}

这不是很多魔术。 当ManagedBlockerForkJoinWorkerThread运行时,它会运行一些特殊的代码,以确保线程的ForkJoinPool不会由于线程耗尽而死锁。 有关更多信息,请在此处阅读此有趣的文章:http: //zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health

或以下堆栈溢出答案: http : //stackoverflow.com/a/35272153/521799

因此,如果您想要超快速的并行AQ出队过程,请运行:

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).parallel().filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);

而且您将拥有几个线程,这些线程将使消息并行出队。

不想等待jOOQ 3.8?

没问题。 使用当前版本并将dequeue操作包装在您自己的Stream

Stream<MessageTRecord> stream = Stream.generate(() ->DSL.using(config).transactionResult(c ->dequeue(c, MESSAGE_Q))
);

做完了

奖励:异步出队

在我们讨论时,排队系统的另一个很好的功能是它们的异步性。 在Java 8中, CompletionStage是一个非常有用的用于建模(和组合)异步算法的类型,它是默认实现CompletableFuture ,它再次在ForkJoinPool执行任务。

使用jOOQ 3.8,您可以再次简单地调用

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q).thenCompose(m -> ...)...;

敬请期待jOOQ博客上的另一篇文章,我们将研究更复杂的异步用例,并使用jOOQ 3.8和Java 8阻止SQL语句。

翻译自: https://www.javacodegeeks.com/2016/02/using-oracle-aq-via-java-8-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/354921.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

藏经阁

藏经阁 收藏经典书籍&#xff0c; 文学 1.霍乱时期的爱情 2.巨人的陨落 3.群山之巅 4.无人生还 5.加缪文集&#xff0c;1995&#xff0c;译林出版 6.1984 7.黄金时代 8.太阳照常升起 9.月亮和六便士 10.白夜行 11.小王子 12.杀死一只知更鸟 社科 1.高效能人士的七个习惯 2.少有…

php private方法,php如何调用private方法

php调用private方法&#xff1a;首先定义一个parent类&#xff1b;然后在类的内部使用私有函数&#xff1b;接着实例化parent类&#xff0c;让其变成一个对象并赋值给“$obj”即可。将一个类实例化后就变成对象&#xff0c;私有函数只能在类内部使用&#xff0c;不能在类外&…

Centos7 开启端口

CentOS7默认没有使用iptables&#xff0c;所以通过编辑iptables的配置文件来开启80端口是不可以的。 CentOS 7 采用了 firewalld 防火墙。 如要查询是否开启80端口&#xff1a; [rootjoe-pc ~]# firewall-cmd --query-port80/tcp no 显然80端口没有开启。 下面我们开启80端口&a…

java flux api,SpringBoot学习系列-WebFlux REST API 全局异常处理

本文内容为什么要全局异常处理&#xff1f;WebFlux REST 全局异常处理实战小结摘录&#xff1a;只有不断培养好习惯&#xff0c;同时不断打破坏习惯&#xff0c;我们的行为举止才能够自始至终都是正确的。一、为什么要全局异常处理&#xff1f;前后端分离开发&#xff0c;一般提…

使您的Java 8方法引用生效

方法参考 众所周知&#xff0c;我们可以使用Java 8中的方法引用 &#xff08;例如String::isEmpty来引用例如在元素上流式传输时使用的方法。 看一下以下代码片段&#xff1a; Stream.of("A", "", "B").filter(Stream::isEmpty).count();它将产…

down.php无法打开,php下载文件 图片不能打开,该怎么解决

php下载文件 图片不能打开function fileDown($file_name){$file_name iconv("utf-8","gb2312",$file_name);$file_path "E:/php/down/".$file_name;if(!file_exists($file_path)){echo "文件不存在";return;}$fp fopen($file_path,…

【转】Linux如何在系统启动时自动加载模块

1、Linux安装驱动程序 tar zxf ixgbe-<x.x.x>.tar.gz cd ixgbe-<x.x.x>/src/ make install modprobe <ixgbe> 卸载驱动 cd ixgbe-<x.x.x>/src/ make uninstall 2、Linux如何在系统启动时自动加载模块 原文&#xff1a;http://www.cnblogs.com/image-ey…

MATLAB小记_fread的用法

“fread”以二进制形式&#xff0c;从文件读出数据。语法1&#xff1a;[a,count]fread(fid,size,precision)语法2&#xff1a;[a,count]fread(fid,size,precision,skip)size: 不指定 &#xff1a;到尾返回读。N : 读出N个数据&#xff0c;构成列向量。inf …

mvvm 后端_ZK实际应用:MVVM –与ZK客户端API一起使用

mvvm 后端在以前的文章中&#xff0c;我们已经使用ZK的MVVM实现了以下功能&#xff1a; 将数据加载到表中 使用表单绑定保存数据 删除条目并以编程方式更新视图 ZK MVVM和ZK MVC实现方式之间的主要区别是&#xff0c;我们不直接在controller&#xff08;ViewModel&#xff0…

微信分享朋友圈固定缩略图 php,微信转发或分享朋友圈带缩略图、标题和描述的实现方法...

自己做博客以来&#xff0c;很早之前分享过文章至朋友圈&#xff0c;那个时候分享过去的文章自动获取页面的比例适合的图片为所缩略图&#xff1a;后期就很少分享至朋友圈&#xff0c; 近来分享文章给朋友后&#xff0c;发现不带缩略图和简介了&#xff0c;觉得这样很不好看&am…

Oracle 用户、表空间、授权、备份、导入等操作相关

一、基础操作 闲来无事&#xff0c;整理oracle数据库相关操作&#xff0c;以后备用。。。。。 ps&#xff1a; satp 为用户 satp_data 为表空间 1 1.删除表空间2 DROP TABLESPACE satp_data INCLUDING CONTENTS AND DATAFILES;3 4 2.删除用户5 drop user satp cascade;6 …

看一下即将发布的JSF 2.3 Push支持

如前几篇文章所述&#xff0c;下一版本的JavaServer Faces&#xff08;Mojarra&#xff09;已添加了许多增强功能。 JSF 2.3计划于2017年与Java EE 8一起发布&#xff0c;但是您现在可以通过从源代码构建或运行里程碑版本来尝试JSF的一些增强和更新&#xff0c;以进行测试。 对…

java mvc中重复提交表单,spring mvc 防止重复提交表单的两种方法,推荐第二种

第一种方法&#xff1a;判断session中保存的token比较麻烦&#xff0c;每次在提交表单时都必须传入上次的token。而且当一个页面使用ajax时&#xff0c;多个表单提交就会有问题。注解Token代码&#xff1a;package com.thinkgem.jeesite.common.repeat_form_validator;import j…

3415 最小和

3415 最小和 链接&#xff1a;http://codevs.cn/problem/3415/ CodeVS原创 时间限制: 1 s空间限制: 64000 KB题目描述 Description小浣熊松松来到文具店&#xff0c;选择了K支自己喜欢的水彩笔&#xff0c;并抄下了它们的价格。可是到结算时&#xff0c;他发现自己抄价格时抄…

java监控rabbitMq服务状态,SpringCloud-Turbine【RabbitMQ服务监控】

前面我们介绍了通过turbine直接聚合多个服务的监控信息&#xff0c;实现了服务的监控&#xff0c;但是这种方式有个不太好的地方就是turbine和服务的耦合性太强了&#xff0c;针对这个问题&#xff0c;我们可以将服务的监控消息发送到RabbitMQ中&#xff0c;然后turbine中Rabbi…

SVN提示被锁定的解决方法(转)

1、&#xff08;常用&#xff09;出现这个问题后使用“清理”即"Clean up"功能&#xff0c;如果还不行&#xff0c;就直接到上一级目录&#xff0c;再执行“清理”&#xff0c;然后再“更新”。 2、&#xff08;没试过&#xff09;有时候如果看到某个包里面的文件夹没…

jpa 查询 列表_终极JPA查询和技巧列表–第1部分

jpa 查询 列表我们可以在Internet上找到一些JPA“如何做”&#xff0c;在本博客的此处&#xff0c;教您如何使用JPA执行多项任务。 通常&#xff0c;我看到有人问有关使用JPA进行查询的问题。 通常&#xff0c;为了回答此类问题&#xff0c;提供了几个链接以尝试找到该问题的解…

windows下php swoole扩展,Windows 下安装 swoole 图文教程(php)

Windows 下安装 swoole 具体步骤&#xff1a;Swoole,原本不支持在Windows下安装的&#xff0c;所以我们要安装Cygwin来使用。在安装Cygwin下遇到了很多坑&#xff0c;百度经验上的文档不是很全&#xff0c;所以我把自己安装Cygwin和Swoole写下来相当于对自己的沉淀吧。首先准备…

软件工程实践2017第一次作业

&#xff08;1&#xff09;回想一下你初入大学时对计算机专业的畅想 当初你是如何做出选择计算机专业的决定的&#xff1f; 高考之后&#xff0c;综合分数、地理位置、专业考虑&#xff0c;搭上了福州大学这趟动车&#xff0c;不过选报的专业给我打了个折&#xff0c;意外的在生…

将JQGrid与Spring MVC和Gson集成

我在一个单页面应用程序上工作&#xff0c;我想在使用Spring MVC的应用程序的一部分中使用网格功能。 自从我上次使用JQGrid以来已经有一段时间了&#xff0c;找到让我起床所需的信息有点困难。 在这篇文章中&#xff0c;我想整理所有信息并将其放入教程中以供遵循&#xff0c;…