编写基于事件的CQRS读取模型

关于事件源和CQRS的讨论似乎通常集中在CQRS上下文中的整体系统架构或领域驱动设计的各种形式。 但是,尽管也有一些有趣的考虑,但读取模型经常被忽略。 在本文中,我们将展示一个通过使用事件流填充视图模型的示例实现。 es_cqrs_projection_funnel_300_2

总览

读取模型的想法非常简单。 您获取事件日志,使用适当的功能在最初为空的数据模型上应用(重放)所有事件,然后获得填充的模型。 代码看起来像:

List<Event> events = getEvents();
Model model = Model.empty();
for (Event event : events) {apply(model, event);
}

通过函数式编程,我们可以使此过程更短:

Model m = reduce(getEvents(),Model.empty(),(m, e) -> apply(m, e));

这就是本质。 请注意,这仅仅是抽象的轮廓,实际的实现可能会有所不同,包括缓冲,批处理(或流式传输),持久性等。

申请活动

应用事件的实际Java代码可能类似于以下内容:

EventProcessingResult processEvents() {if (getState().isRunning()) {int batchSize = getEventsPerIteration();List<Event> events = eventStore.getEventsForAllStreams(getLastEventId(),batchSize);if (events.isEmpty()) {return NO_EVENTS_TO_PROCESS;} else {return processEvents(events);}} else {return NOT_RUNNING;}
}EventProcessingResult processEvents(List<Event> events) {try {for (Event event : events) {dispatchEvent(event);}return SUCCESS;} catch (RuntimeException e) {return FAILURE;}
}

总而言之,它非常简单明了。 在处理单个事件和整个批处理之前和之后,可以使用钩子来增强它。 这样的钩子可以用来:

  • 实施交易,
  • 插入监控,
  • 实施错误处理,
  • 根据速度计算批次大小,
  • 执行任意操作,例如设置某些内容或每批重新计算一次。

最后一个有趣的部分是dispatchEvent方法。 除了遍历类型层次结构,错误处理并将其全部设置为可选项之外,它还可以归结为:

void dispatchEvent(Event e) {Method handler = projector.getClass().findMethod("on", e.getClass());handler.invoke(projector, e);
}

换句话说,对于每种事件类型(如OrderCreated ),我们在projector对象上寻找一个名为on的公共方法on该方法采用匹配类型的单个参数。

以上所有都是引擎的一部分,是支持许多视图模型的基础架构。 实际上,实现投影所需的全部工作就是为投影仪提供有趣的事件类型的处理程序。 所有其他事件都将被忽略。

它可能看起来像这样:

public class OrderProjector {@Injectprivate OrderDao orders;public void on(OrderCreated e) {orders.save(new Order(e.getOrderNumber()));}public void on(OrderApproved e) {Order o = orders.find(e.getOrderNumber());o.setApproved(true);}
}

投影线

让我们讨论一下多线程。 共享的可变状态会立即带来许多问题, 应尽可能避免 。 处理它的方法之一是首先没有并发性,例如,通过限制对单个线程的写操作。 在大多数情况下 ,单线程写入器与ACID事务相结合足以应付写入负载。 (读取/查询负载可能很重,并且使用许多线程–此处所有详细信息仅与写入有关。)

从查询事件存储到更新视图模型数据库,该线程负责将事件应用于读取的模型。 通常,它只是从商店加载一批事件并应用它们。 只要有更多事件要处理,它就会继续,并在捕获到事件后进入睡眠状态。 在一定时间后或事件存储通知新事件时,它将唤醒。

我们还可以控制此线程的生命周期。 例如,我们提供了一种以编程方式暂停和恢复每个投影线程的方法,即使在管理GUI中也是如此。

projection_admin

推还是拉?

使用数据库支持的事件存储,可以很容易地重复查询新事件。 这是模型。 不幸的是,这也意味着您可能最终会轮询过多并产生不必要的负载,或者轮询频率太低,因此可能需要更长的时间才能将更改传播到视图模型。

这就是为什么除了轮询事件存储之外,最好引入通知,以在保存新事件后立即唤醒读取的模型。 这有效地成为了具有最小延迟和负载的推送模型。 我们发现JGroups是完成这项工作的非常好的工具–它支持多种协议,并且易于设置,与成熟的消息队列相比,所涉及的麻烦要少得多。

通知可能包含也可能不包含实际事件。

在后一种(和更简单的)设计中,它们仅传播已保存新事件的信息及其顺序ID(以便所有预测都可以估算出它们背后的数量)。 唤醒后,执行程序可以从查询事件存储开始沿其正常路径继续。

为什么? 因为处理来自单一来源的事件比较容易,但是更重要的是,由于数据库支持的事件存储可轻松保证排序,并且消息丢失或重复不存在任何问题。 鉴于我们正在按主键顺序读取单个表,而且大多数情况下数据仍在RAM高速缓存中,因此查询数据库的速度非常快。 瓶颈在于投影线程正在更新其读取模型数据库。

但是,将事件数据放入通知中没有任何障碍(可能是出于大小或网络流量方面的考虑)。 这可能会减少事件存储上的负载并节省一些数据库往返时间。 投影仪将需要维护一个缓冲区,并在需要时退回查询事件存储。 或者系统可以使用更可靠的消息队列。

重新开始投影

除了暂停/恢复之外,上面的屏幕截图还显示了另一项操作:重新启动。 从外观上看是无害的,这是一个非常不错且功能强大的功能。

由于视图模型是完全从事件日志派生的,因此可以随时将其丢弃(从某个初始状态/足够旧的快照开始)并重新创建。 在事件日志中,数据是安全的,这是事实的最终来源。

当有关视图的任何内容发生更改时,此功能很有用:添加了字段或表,修复了错误,计算出的内容有所不同。 当发生这种情况时,通常从一开始就更容易(或需要),而不是例如实施大量的SQL迁移脚本。

甚至可以进行完全自动化,以便在系统启动并检测到DB模式与相应的Java模型不匹配时,它可以自动重新创建模式并重新处理事件日志。 就像使用Hibernate create-drop策略运行一样,不同之处在于它不会丢失数据。

性能

该解决方案在性能方面可能看起来非常有限。

单线程作家可能引起人们的注意。 实际上,单个线程通常足够快,可以轻松地跟上负载。 并发不仅更难以实现和维护,而且还引入了争用。 读取(查询)可以是多线程的,并且易于扩展。

通过拥有多个读取模型,例如从管理和“交易”数据中分离分析,我们也获得了很多收益。 每个模型都是单线程的(用于编写),但是多个模型并行使用事件。 最后,可以将解决方案修改为使用分片或某种fork-join处理。

另一个有趣的观点是从头开始重新启动投影

一个好的解决方案是类似于kappa体系结构 :

  • 保持过时的预测正常运行并回答所有查询。
  • 开始新的投影,例如到另一个数据库。 只要让它处理事件,就不要指向任何流量。
  • 当新的预测赶上时,请重定向流量并关闭旧的预测。

在很小的情况下,尤其是对于开发而言,甚至可以在同一情况下在线重新启动。 它取决于以下问题的答案:重新处理所有事件需要多长时间? 这个投影陈旧30分钟是否可以接受? 如果没人使用系统,我们可以在晚上或周末进行部署吗? 我们需要重播所有历史吗?

这里要考虑的另一个因素是持久性。 如果瓶颈太大,无法进一步优化,请考虑使用内存视图模型。

加起来

本质上,这就是实现使用事件存储区的读取模型所需要的全部。 有了线性事件存储并在单个线程中处理所有内容,它变得非常简单。 如此之多,最终实际上只是一个循环,实现了开头所示的减少。

在以后的文章中,我将更深入地探讨实施预测的实际问题。

翻译自: https://www.javacodegeeks.com/2015/09/writing-an-event-sourced-cqrs-read-model.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java else if和switch_如何优雅地优化代码中的的if else和switch

引言一般来说&#xff0c;随着我们项目的迭代以及业务的越来越复杂&#xff0c;项目中的分支判断会原来越多。当项目中涉及到复杂的业务判断或者分支逻辑时&#xff0c;我们就需要考虑是否需要对项目进行重构了&#xff0c;或者if else和switch case是否能够满足当前项目的复杂…

jQuery data

一个简单的Cache (function(){var __cache {},Cache {get: function(__name){return __cache[__name] || undefined;}, set: function(__name, __value){return (__cache[__name] __value)}};this.Cache Cache; })();alert(Cache.get("name")); //undefine…

设计模式(中介者模式-对象去耦)

声明&#xff1a;本系列文章内容摘自《iOS设计模式》 中介者模式 用一个对象来封装一系列对象的交互方式。中介者使个对象不需要显式地相互调用&#xff0c;从而使其耦合松散&#xff0c;而且可以独立地改变它们之间的交互。 何时使用中介者模式 1.对象间的交互虽定义明确然而非…

windows服务器的DDOS防御,

抵御 SYN 攻击 SYN 攻击利用了 TCP/IP 连接建立机制中的安全漏洞。要实施 SYN 洪水攻击&#xff0c;攻击者会使用程序发送大量 TCP SYN 请求来填满服务器上的挂起连接队列。这会禁止其他用户建立网络连接。 要保护网络抵御 SYN 攻击&#xff0c;请按照下面这些通用步骤操作&…

java程序运行结果题_2016年关于Java编程与程序运行结果笔试题

2016年关于Java编程与程序运行结果笔试题1.Java编程,打印昨天的当前时刻public class YesterdayCurrent{public void main(String[] args){Calendar cal Calendar.getInstance();cal.add(Calendar.DATE, -1);System.out.println(cal.getTime());}}2.文件读写,实现一个计数器pu…

Hystrix简介– Hello World

在先前的博客文章中&#xff0c;我谈到了需要像Netflix Hystrix这样的库的动机。 在这里&#xff0c;我将跳入一些非常基本的方法来开始使用Hystrix&#xff0c;并在更复杂的用例中进行后续介绍。 你好&#xff0c;世界 以下是“ Hystrix命令”的一个简单的Hello World示例&am…

js base64编码 java 解码_JavaScript字符串的Base64编码与解码

有时文本里包含一些不可打印的符号&#xff0c;而你需要把它们传输到服务器&#xff0c;这时我们会需要用到Base64编码。或者你需要把一个图片内容以文本格式嵌入到网页中&#xff0c;这时你也会用到 Base64 编码。所谓 Base64 是一种基于64个可打印字符来表示二进制数据的方法…

ssh图片上传 java_ssh上传并显示图片

struts部分&#xff1a;attribute"upfileForm"input"/upload/uploadfile.jsp"name"upfileForm"path"/upfile"scope"request"validate"true"type"com.yourcompany.struts.action.UpfileAction">publi…

责任链设计模式示例

本文是我们名为“ Java设计模式 ”的学院课程的一部分。 在本课程中&#xff0c;您将深入研究大量的设计模式&#xff0c;并了解如何在Java中实现和利用它们。 您将了解模式如此重要的原因&#xff0c;并了解何时以及如何应用模式中的每一个。 在这里查看 &#xff01; 目录 …

使用JUnit规则进行干净的集成测试

JUnit Rules的优势&#xff0c;尤其是在进行集成测试时&#xff0c;几乎不能被高估。 在本文中&#xff0c;我们将阐明ExternalResource扩展的有用性。 在我们必须使用抽象外部资源的第三方库的情况下&#xff0c;这些简化了灯具控制。 作为示例&#xff0c;我们将看看如何基于…

winform基础窗体设置及基础控件

WinForm - 也叫做C/S 客户端 另&#xff1a;B/S是 网页端 客户端应用程序 - 是需要安装在用户电脑上才可以使用的程序 特点&#xff1a; 不需要联网也可以打开使用部分功能&#xff0c;但是现在的情况是许多功能依然需要互联网的支持&#xff0c;代码部分在用户电脑上执行 使用…

【FastJSON】解决FastJson中“$ref 循环引用”的问题

0、开发环境 SSH&#xff0c;EasyUI&#xff0c;MySQL 1、需求要求&#xff1a; (1)首先获取所有的贷款订单数据&#xff0c;即List <LoanOrder>。 (2)然后从单个贷款订单实体LoanOrder去访问贷款人实体Loaner的信息。 2、实体之间的关系描述 (1)LoanOrder实体与Loaner…

JavaFX真实世界应用程序:EIZO CuratOR Caliop

JavaFX Real-World应用程序第四号称为Caliop 。 它是EIZO为医院手术室开发的CuratOR解决方案的前端。 前端在壁挂式控制台上运行&#xff0c;并允许操作团队查找有关患者的信息&#xff0c;控制各种视频源到不同监视器的路由&#xff0c;录制视频&#xff0c;拍摄照片/剧照。 …

netbeans代码提示_NetBeans可用性提示

netbeans代码提示的Java IDE都来了&#xff0c;因为在很长的路要走天的JBuilder的 &#xff08;尽管JBuilder中似乎是一个值得欢迎提前在时间&#xff09;。 当今的Java IDE&#xff08;例如NetBeans &#xff0c; Eclipse &#xff0c; IntelliJ IDEA和JDeveloper &#xff09…

推荐文章:机器学习:“一文读懂机器学习,大数据/自然语言处理/算法全有了...

PS:文章主要转载自CSDN大神"黑夜路人"的文章: http://blog.csdn.NET/heiyeshuwu/article/details/43483655 本文主要对机器学习进行科普,包括机器学习的定义、范围、方法,包括机器学习的研究领域&#xff1a;模式识别、计算机视觉、语音识别、自然语言…

java比较炫的登录界面_html+css实现漂亮的透明登录页面,HTML实现炫酷登录页面...

承蒙各位小伙伴的支持&#xff0c;鄙人有幸入围了《CSDN 2020博客之星》的前200名&#xff0c;现在进入投票环节&#xff0c;如果我平时写的文章和分享对你有用的话&#xff0c;请每天点击一下这个链接&#xff0c;投上你们宝贵的一票吧&#xff01;谢谢&#xff01;❤️ 每一票…

OpenMap教程第2部分–使用MapHandler构建基本地图应用程序–第1部分

1.简介 在第一个教程中&#xff0c;我们创建了一个基本的OpenMap GIS应用程序&#xff0c;该应用程序在JFrame中显示一个从文件系统加载的具有一个形状图层的地图。 该教程基于com.bbn.openmap.app.example.SimpleMap 。 在该教程中&#xff0c;我们使用了以下OpenMap类&#x…

java rx.observable_Rxjava2 Observable的条件操作符详解及实例

简要&#xff1a;需求了解&#xff1a;在使用 Rxjava 开发中&#xff0c;经常有一些各种条件的操作 &#xff0c;如比较两个 Observable 谁先发射了数据、跳过指定条件的 Observable 等一系列的条件操作需求&#xff0c;那么很幸运&#xff0c; Rxjava 中已经有了很多条件操作符…

Linux poll 和 select 机制

poll select 介绍 使用非阻塞 I/O 的应用程序常常使用 poll, select, 和 epoll 系统调用. poll, select 和 epoll 本质上有相同的功能: 每个允许一个进程来决定它是否可读或者写一个 或多个文件而不阻塞. 这些调用也可阻塞进程直到任何一个给定集合的文件描述符可用来 读或写.…

hprof 不大 泄露_HPROF –内存泄漏分析教程

hprof 不大 泄露本文将为您提供有关如何通过生成和分析Sun HotSpot JVM HPROF堆转储文件来分析JVM内存泄漏问题的教程。 一个现实的案例研究将用于此目的&#xff1a;Weblogic 9.2内存泄漏影响Weblogic Admin服务器。 环境规格 Java EE服务器&#xff1a;Oracle Weblogic Ser…