kafka 发布订阅_在Kafka中发布订阅模型

kafka 发布订阅

这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 )。 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现。

卡夫卡概念

根据官方文件 :

Kafka是一种分布式的,分区的,复制的提交日志服务。 它提供消息传递系统的功能,但具有独特的设计。

Kafka作为集群运行,这些节点称为代理。 代理可以是领导者或副本,以提供高可用性和容错能力。 代理负责分区,分区是存储消息的分发单元。 这些消息是有序的,可以通过名为offset的索引进行访问。 一组分区构成一个主题,是消息的提要。 分区可以具有不同的使用者,并且它们使用自己的偏移量访问消息。 生产者将消息发布到Kafka主题中。 Kafka文档中的以下图表可以帮助您理解这一点:

话题

排队与发布-订阅

消费者群体是另一个关键概念,有助于解释为什么Kafka比RabbitMQ等其他消息传递解决方案更灵活,功能更强大。 消费者与消费者群体相关联。 如果每个使用者都属于同一个使用者组,则主题的消息将在各个使用者之间平均负载均衡; 这就是所谓的“排队模型”。 相反,如果每个使用者都属于不同的使用者组,则所有消息都将在每个客户端中使用。 这就是所谓的“发布-订阅”模型。

您可以混合使用这两种方法,分别针对不同的需求使用不同的逻辑使用者组,并在每个组中有多个使用者以通过并行提高吞吐量。 同样, Kafka文档中的另一个图表:

消费者群体

了解我们的需求

执法

正如我们在以前的文章(见1, 2, 3 )该项目服务发布消息到卡夫卡的话题叫item_deleted 。 此消息将位于该主题的一个分区中。 为了定义消息将驻留在哪个分区,Kafka提供了三种选择 :

  • 如果记录中指定了分区,请使用它
  • 如果未指定分区但存在密钥,则根据密钥的哈希值选择一个分区
  • 如果不存在分区或密钥,则以循环方式选择一个分区

我们将使用item_id作为密钥。 执法服务的不同实例中包含的消费者仅对特定分区感兴趣,因为他们保留某些项目的内部状态。 让我们检查不同的Kafka使用者实现,以了解哪种使用最方便。

卡夫卡消费者

卡夫卡共有三个消费者: 高级消费者 , 简单消费者和新消费者

在这三个消费者中, 简单消费者在最低级别上运行。 它满足我们的要求,因为它允许消费者“在流程中仅使用主题中分区的子集”。 但是,如文档所述:

SimpleConsumer确实需要使用者组中不需要的大量工作:

  • 您必须跟踪应用程序中的偏移量,才能知道从何处停止消费
  • 您必须确定哪个Broker是主题和分区的主要Broker。
  • 您必须处理经纪人负责人变更

如果您阅读了建议的用于处理这些问题的代码,则将不鼓励您使用此使用者。

新使用者提供正确的抽象级别,并允许我们订阅特定的分区。 他们在文档中建议以下用例:

第一种情况是,如果进程正在维护与该分区相关联的某种本地状态(例如本地磁盘上的键值存储),因此该进程应仅获取其在磁盘上维护的分区的记录。

不幸的是,我们的系统使用的是Kafka 0.8,而该使用者仅从0.9开始可用。 我们没有足够的资源来迁移到该版本,因此我们需要坚持使用高级消费者

该使用者提供了一个不错的API,但不允许我们订阅特定的分区。 这意味着,执法服务的每个实例都将使用每条消息,甚至是无关的消息。 我们可以通过为每个实例定义不同的消费者组来实现这一目标。

利用Akka Event Bus

在上一篇文章中,我们定义了一些等待ItemDeleted消息的有限状态机ItemDeleted

when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}

我们的卡夫卡消费者可以将所有消息转发给那些演员,并让他们丢弃/过滤不相关的物品。 但是,我们不想让演员浪费很多多余的工作,因此我们将添加一层抽象,让他们以真正有效的方式丢弃适当的消息。

final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b)
}

Akka Event Bus按分区为我们提供订阅,而我们的Kafka高级消费者中缺少该分区。 我们将从卡夫卡消费者处发布每条消息到公交车上:

itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))

在上一篇文章中,我们展示了如何使用该分区键订阅消息:

itemDeletedBus.subscribe(self, item.partitionKey)

LookupClassification将过滤不需要的消息,因此我们的参与者不会过载。

摘要

得益于Kafka提供的灵活性,我们能够设计我们的系统以了解不同的折衷方案。 在接下来的文章中,我们将看到如何协调这些FSM的结果以向客户端提供同步响应。

第一部分 | 第2部分 | 第三部分

翻译自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html

kafka 发布订阅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335889.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

apache camel_使用Apache Camel进行负载平衡

apache camel在此示例中,我们将向您展示如何使用Apache Camel作为系统的负载平衡器。 在计算机世界中,负载平衡器是一种充当反向代理并在许多服务器之间分配网络或应用程序流量的设备。 负载平衡器用于增加容量(并发用户)和应用程…

lombok 自动使用_Lombok,自动值和不可变项

lombok 自动使用我喜欢布兰登(Brandon )在博客文章中比较Project Lombok , AutoValue和Immutables的建议 ,而这篇文章试图做到这一点。 我已经简要概述了Project Lombok , AutoValue和Immutables ,但是这篇…

邮箱批量登录接验证码_记一次莫名的需求(临时邮箱|企业邮箱)

目录:前言行情伪需求过程1.前戏2.买域名3.网易企业邮箱4.模糊的需求5.晚饭后6.临时邮箱16.临时邮箱27.域名版临时邮箱8.遇见问题8.1.DNSPOD8.2.换种思路拓展1.思路2.后续2.1.简单2.2.自建临时邮箱后话记一次需求不明的亏看完这篇文章你会学到: 免费企业邮…

java 补充日期_Java 9对可选的补充

java 补充日期哇&#xff0c;人们真的对Java 9对Stream API的添加感兴趣。 想要更多&#xff1f; 让我们看一下…… 可选的 可选::流 无需解释&#xff1a; Stream<T> stream();想到的第一个词是&#xff1a; 终于 &#xff01; 最后&#xff0c;我们可以轻松地从可选…

【Python科学计算系列】行列式

1.二元线性方程组求解 import numpy as np a np.array([[3, -2], [2, 1]]) b np.array([12, 1]) d np.linalg.solve(a, b) print(d) 2.三阶行列式求值 import numpy as np a np.array([[1, 2, -4], [-2, 2, 1], [-3, 4, -2]]) d np.linalg.det(a) print(d) 3.行列式的余…

【Python科学计算系列】矩阵

1.矩阵的幂计算&#xff08;设计思想&#xff1a;递归&#xff09; #!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np def matrixPow(Matrix,n):if(type(Matrix)list):Matrixnp.array(Matrix)if(n1):return Matrixelse:return np.matmul(Matrix,matrixPow(…

swarm 本地管理远程_带有WildFly Swarm的远程JMS

swarm 本地管理远程我再次在博客中谈论WildFly群&#xff1f; 简短的版本是&#xff1a;我需要对远程JMS访问进行测试&#xff0c;并且拒绝设置复杂的功能&#xff08;如完整的应用程序服务器&#xff09;。 这个想法是要有一个简单的WildFly Swarm应用程序&#xff0c;该应用程…

java解码_Java数组已排序解码

java解码排序是我们在计算机科学中学习的第一个算法。 排序是一个非常有趣的领域&#xff0c;它有大约20多种算法&#xff0c;而且总是很难确定哪种算法最好。 排序算法的效率是根据占用的时间和所需的空间来衡量的。 一些时间气泡排序是最好的&#xff0c;因为它没有空间需求&…

【数论系列】反函数

一、判断反函数是否存在&#xff1a; 由反函数存在定理&#xff1a;严格单调函数必定有严格单调的反函数&#xff0c;并且二者单调性相同&#xff1a; 1、先判读这个函数是否为单调函数&#xff0c;若非单调函数&#xff0c;则其反函数不存在。 设yf(x)的定义域为D&#xff…

java附加属性_Java 9附加流

java附加属性Java 9即将发布&#xff01; 它不仅仅是Jigsaw项目 。 &#xff08;我也很惊讶。&#xff09;它给平台带来了很多小的变化&#xff0c;我想一一看一下。 我将标记所有这些帖子&#xff0c;您可以在这里找到它们。 让我们从…开始 流 Streams学习了两个新技巧。 第…

envi最大似然分类_闲谈最大后验概率估计(MAP estimate)amp;极大似然估计(MLE)和机器学习中的误差分类...

上一篇文章中提到了一个有趣的实验&#xff0c;简单来说就是1-100中有若干个数字是“正确的”&#xff0c;只告诉其中一部分“正确的”数字&#xff0c;去猜全部“正确的”数字。为了严谨的去研究这个问题&#xff0c;我们需要将一些概念进行抽象。首先&#xff0c;把提前告知的…

html 完全复制div中的内容_LOL手游现在远非完全体,未来还有哪些端游内容会加入手游中?...

LOL手游上线已经有一段时间了&#xff0c;虽然绝大多数情况下LOL端游的内容被继承到了手游当中&#xff0c;但是仍然有一部分端游的内容尚未出现在手游之内。今天小编就带领大家来盘点一下&#xff0c;那些未来可能出现在手游当中的端游内容。排位赛ban选英雄机制Moba游戏排位赛…

光盘 机密_使用保险柜管理机密

光盘 机密您如何存储秘密&#xff1f; 密码&#xff0c;API密钥&#xff0c;安全令牌和机密数据属于秘密类别。 那是不应该存在的数据。 在容易猜测的位置&#xff0c;不得以纯文本格式提供。 实际上&#xff0c;不得在任何位置以明文形式存储它。 可以使用Spring Cloud Confi…

junit5 动态测试_JUnit 5 –动态测试

junit5 动态测试在定义测试时&#xff0c;JUnit 4有一个很大的弱点&#xff1a;它必须在编译时发生。 现在&#xff0c;JUnit 5将解决此问题&#xff01; Milestone 1 刚刚发布 &#xff0c;它带有全新的动态测试&#xff0c;可以在运行时创建测试。 总览 本系列中有关JUnit 5…

C++ 11 深度学习(十)原始字面量

你是否曾经为了各种json格式无法写入string中而烦恼&#xff0c;为了各种转义而烦恼。如下图 c11为我们带来了全新的解决方法 其新特性为使用. R"(xxxxxxxxxxxx)" ,此种形式可以使得以原有形式进行表现出来

交流伺服系统设计指南_交流设计

交流伺服系统设计指南软件设计至关重要。 它是应用程序的基础。 就像蓝图一样&#xff0c;它为所有背景的聚会提供了一个通用平台。 它有助于理解&#xff0c;协作和发展。 设计不应仅视为开发的要素。 它不应该仅仅存在于开发人员的脑海中&#xff0c;否则团队将发现它几乎无…

maven 父maven_Maven神秘化

maven 父maven由于我的Android开发的背景下&#xff0c;我比较习惯到Gradle &#xff0c;而不是Maven的 。 尽管我知道Gradle基于Maven&#xff0c;但我从未调查过幕后发生的事情。 在过去的一周中&#xff0c;我一直在尝试了解细节并找出Maven的不同组成部分。 什么是Maven M…

【WebRTC---序篇】(一)为什么要使用WebRTC

1.1.1自研直播客户端架构 一个最简单的直播客户端至少应该包括音视频采集模块,音视频编码模块,网络传输模块,音视频解码模块和音视频渲染模块五大部分。如下图所示 1.1.2拆分音视频模块 在实际开发中,音频和视频处理完全是独立的。如下图所示,经过细分后,音频采集与视频…

DFS深搜与BFS广搜专题

一般搜索算法的流程框架 DFS和BFS与一般搜索流程的关系 如果一般搜索算法流程4使用的是stack栈结构(先进后出&#xff0c;后进先出)那么就会越搜越深。即&#xff0c;DFS&#xff0c;DFS只保存当前一条路径&#xff0c;其目的是枚举出所有可能性。反之&#xff0c;如果流程4使…

cloud foundry_使用“另类” Cloud Foundry Gradle插件无需停机

cloud foundry我一直在尝试编写用于将应用程序部署到Cloud Foundry的gradle插件 &#xff0c;并在上一篇文章中写了有关此插件的文章 。 现在&#xff0c;我通过使用两种方法支持将无停机时间部署到Cloud Foundry中来增强此插件&#xff1a; 自动驾驶风格部署和更常用的蓝绿色风…