服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送

使用 SSE(Server-Sent Events) 进行 HTTP 服务器推送

这个示例是一个类似 twitter 的 web 应用程序,使用 Server-Sent Events 来支持实时刷新。

c969a34fd1422caef18ea7e29f89ec8b.gif

运行

docker-compose up

然后, 浏览 http://localhost:8080

您可以添加自己的帖子或点击按钮获得随机生成的帖子。

无论哪种方式,feeds 列表和 feed 中的帖子都应该是最新的。尝试使用第二个浏览器窗口查看更新。

它是如何工作的

  • 可以创建和更新帖子。

  • 帖子可以包含标签。

  • 每个标签都有自己的 feed,其中包含来自该标签的所有帖子。

  • 所有的帖子都存储在 MySQL 中。这就是写模型。

  • 所有 feed 都异步更新并存储在 MongoDB 中。这是读模型。

为什么要使用单独的写和读模型?

对于这个示例应用程序,使用多语言持久性(两个数据库引擎)当然有些过头了。我们这样做是为了展示这个技术,以及如何很容易地将它应用到 Watermill。

专用的读模型对于具有高读/写比率的应用程序是一种有用的模式。所有写操作都被原子地应用到写模型(在我们的例子中是 MySQL)。事件处理程序异步更新读模型(我们使用 Mongo)。

读取模型中的数据可以按原样使用。也可以独立于写模型进行扩展。

请记住,要使用此模式,应用程序中必须接受最终的一致性。而且,在大多数用例中,您可能不需要使用它。务实!

1ee8f8c15bece1c8451d1b0cd17491d8.png

SSE Router

SSERouter 来自 watermill-http。当创建一个新的路由器时,你需要传递一个上游订阅者。来自该订阅服务器的消息将触发通过 HTTP 推送更新。

在本例中,我们使用 NATS 作为 Pub/Sub,但这可以是 Watermill 支持的任何 Pub/Sub。

sseRouter, err := watermillHTTP.NewSSERouter(
watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)

Stream Adapters(流适配器)

要使用 SSERouter,你需要准备一个带有两个方法的 StreamAdapter

GetResponse 类似于标准的 HTTP 处理程序。修改现有的处理程序来匹配这个签名应该非常容易。

Validate 是一个额外的方法,它告诉我们是否应该为特定的 Message 推送更新。

type StreamAdapter interface {
// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}

Validate 示例如下所示。它检查消息是否来自与用户通过 HTTP 请求发送的相同的 post ID。

func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
postUpdated := PostUpdated{}

err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}

postID := chi.URLParam(r, "id")

return postUpdated.OriginalPost.ID == postID
}

如果你想为每条消息触发一个更新,你可以简单地返回 true

func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
return true
}

在开始 SSERouter 之前,您需要添加带有特定主题的处理程序。 AddHandler 返回一个可以在任何路由库中使用的标准 HTTP 处理程序。

postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)

// ...

r.Get("/posts/{id}", postHandler)

Event handlers(事件处理程序)

该示例使用 Watermill 进行所有异步通信,包括 SSE。

发布了以下事件:

  • PostCreated

    • 将 post 添加到贴子中包含标签的所有 feeds 中。

  • FeedUpdated

    • 将更新推送到当前访问 feed 页面的所有客户端。

  • PostUpdated

    • a) 对于现有标签,帖子内容将在标签中更新。

    • b) 如果添加了新的标签,文章将被添加到标签的 feed 中。

    • c) 如果标签已删除,则该帖子将从标签的 feed 中删除。

    • 将更新推送给所有当前访问 post 页面的客户端。

    • 使用帖子中存在的标签更新所有 feeds 中的帖子

前端 app

前端应用程序是使用 Vue.js 和 Bootstrap 构建的。

最有趣的部分是 EventSource 的使用。

this.es = new EventSource('/api/feeds/' + this.feed)

this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);

Refs

  • watermill.io

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/530909.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

extends 抽象方法_关于abstract抽象类的理解

abstract:抽象类不能被实例化(new),包含属性、方法、构造器(此构造器不用来初始化实例,只用来被子类调用,其构造函数是提供给子类创建对象的时候初始化父类的属性的),故只…

三张表有重复字段_什么?搞不定Kafka重复消费?

点戳蓝字“架构之美”关注我们哦!前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ?如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的时候一般都会…

如何利用扩展欧几里得算法求解不定方程_欧几里德算法、拓展欧几里德、中国剩余定理...

01.欧几里德算法(Euclidean algorithm)(辗转相除法)欧几里德算法又称辗转相除法,主要是用于计算两个整数a,b的最大公约数。简单点说一下算法原理:两个整数的最大公约数等于其中小的那个数跟大除以小余数的最…

mysql 先删后增 更新_MySQL 高级操作——新增数据、更新数据、删除数据、查询数据...

新增数据多数据插入只要写一次insert指令,但是可以插入多条记录语法:insert into 表名 [(字段列表)] values (值列表1),(值列表2),(值列表3);主键冲突主键冲突,在有的表中,使用的是业务主键(字段有业务含义),但是往往在…

python七段数码管倒计时_python实现七段数码管和倒计时效果

8是典型的七段数码管的例子,因为刚好七段都有经过,这里我写的代码是从1开始右转。这是看Mooc视频写的一个关于用七段数码管显示当前时间# -*-coding:utf-8 -*-import turtle as timport timedef drawGap():t.penup()t.fd(5)def drawLine(draw):drawGap()…

rda分析怎么做_数量生态学笔记||冗余分析(RDA)

上一节数量生态学笔记||冗余分析(RDA)概述中,我们回顾了RDA的计算过程,不管这个过程我们有没有理解透彻,我希望你能知道的是:RDA是响应变量矩阵与解释变量之间多元多重线性回归的拟合值矩阵的PCA分析。本节我们就是具体来看一个RD…

mysql 服务器管理员_mysql 查看数据库管理员

mysql 查看数据库管理员云服务器(Elastic Compute Service,简称ECS)是阿里云提供的性能卓越、稳定可靠、弹性扩展的IaaS(Infrastructure as a Service)级别云计算服务。云服务器ECS免去了您采购IT硬件的前期准备,让您像使用水、电、天然气等公共资源一样…

python中有哪些重要的书写规则_一文读懂Python代码的书写规范

Python代码的书写规范1. 一致性的建议打破一条既定规则的两个好理由当应用这个规则将导致代码可读性下降,即使对于某人来说他已经习惯于按照这条规则来阅读代码了为了和周围的代码保持一致而打破规则(也许是历史原因)2. 代码的布局缩进4个空格代码行行最大长度 : 79字符推荐长度…

java输入行数打印菱形_JAVA题,输入行数,输入列数,输出一个菱形

展开全部1,冒泡排序1. /**2. * JAVA排序算法实现代码-冒泡(Bubble Sort)排序。3. *4. *5. *6. */7. public class Test {8. public static void main(String[] args) {9. int[] a ;10.11. System.out.print("排序前: ");12.13. for (int i 0; i < a.length; i)1…

openshift 3 mysql_最新OpenShift免费空间申请与使用教程-1G内存1G空间支持PHP和MysqL

一、OpenShift空间申请使用前必备工具1、OpenShift官网&#xff1a;1、官方网站&#xff1a;https://www.openshift.com/2、OpenShift V3&#xff1a;https://manage.openshift.com/2、Github账号(或者其他的git仓库也可以..)。注册git仓库是为了方便的实现代码的同步&#xff…

cpython教程_python高性能扩展工具-cython教程1快速入门

Cython不仅仅是一种编程语言。它的起源可以追溯到SAGE数学软件包&#xff0c;它用于提高数学计算性能&#xff0c;例如涉及矩阵的计算。更一般地说&#xff0c;我倾向于将Cython视为SWIG的替代品&#xff0c;为本机代码生成非常好的Python绑定。SWIG是最早和最好之一&#xff0…

golang mysql封装_golang如何封装路由

封装方式一、路由写在 main函数中&#xff0c;数据库初始连接放在 init() 函数中。、首先看 main.go一个初始化函数&#xff0c;初始化 dbfunc init() {db.Connect()}第二&#xff0c;路由func main() {// Configurerouter : gin.Default()// Set html render optionshtmlRende…

java socket编程客户端_Java Socket编程 - 基于Socket实现HTTP下载客户端

没有借助任何第三方库&#xff0c;完全基于JAVA Socket实现一个最小化的HTTP文件下载客户端。完整的演示如何通过Socket实现下载文件的HTTP请求(request header)发送如何从Socket中接受HTTP响应(Response header, Response body)报文并解析与保存文件内容。如何通过SwingWork实…

java相遇问题_行程问题

行程问题 《行程问题》说课设计——现代教育信息技术与数学学科的整合福建省闽侯县尚干中心小学 林惠贞 邮编&#xff1a;350112 邮箱:zhenzi2277163.com众所周知,未来的教育&#xff0c;倡导开放式学习&#xff0c;把学习的地点扩展到社会、网络&…

java写一个99到0_Java中一个普通的循环为何从10开始到99连续相乘会得到0?

【套装4本】java编程思想4第4版402.5元包邮(需用券)去购买 >这是一块非常简单的Java代码片段&#xff1a;public class HelloWorld{public static void main(String []args){int product 1;for (int i 10; i < 99; i) {product * i;}System.out.println(product);}}为什…

neo4j java查找_Spring-Boot使用neo4j-java-driver-- 查找两个节点之间关系的最短路径

一、Cypher数据create (小北:朋友圈{姓名:"小北", 喜欢的书类:"Poetry"}),(小菲:朋友圈{姓名:"小菲", 喜欢的书类:"Science Fiction"}),(小鹏:朋友圈{姓名:"小鹏", 喜欢的书类:"Music"}),(小颖:朋友圈{姓名:"…

继承易错总结

1.继承会将所有的成员继承下来&#xff0c;但是继承方式限定的是继承下来成员的可见类型(如果是private继承&#xff0c;那么他不论哪里都是不可见的&#xff1b;如果是protected继承在类中是可见的&#xff0c;在类外是不可见的&#xff1b;如果是public继承&#xff0c;在任何…

hhkb适合写java吗_起底这届HHKB最强新品键盘,究竟好在哪儿?

2019年12月HHKB上市了3大品类的12款新品键盘&#xff0c;今天为大家分享外设天下为HHKB Professional HYBIRD Type-S 双模静音旗舰版静电容键盘做的评测&#xff0c;起底这届HHKB新品的最强新品。近日&#xff0c;HHKB更新了旗下的在售产品系列&#xff0c;为了满足严肃、安静办…

elementui树形复选框,element-ui checkbox 组件的树形联动

前言示例版本为 Element-ui 2.13.0 Vue 2.6.11最近想弄 Element-ui checkbox 的多级联动&#xff0c;网上相关的例子大多数为二级联动&#xff0c;自己研究了一下&#xff0c;弄了一个树形菜单的多级联动&#xff0c;常用于角色管理等业务。(仅供参考&#xff0c;未考虑性能问…

java 先入先出,java_阻塞队列(FIFO先进先出)

java_阻塞队列(FIFO先进先出)ArrayBlockingQueue&#xff1a;由数组结构组成的有界阻塞队列&#xff1b;LinkedBlockingQueue&#xff1a;由链表结构组成的有界阻塞队列(但大小默认值为&#xff1a;Integer.MAX_VALUE)&#xff1b;PriorityBlockingQueue&#xff1a;支持优先级…