在Kafka中发布订阅模型

这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 )。 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现。

卡夫卡概念

根据官方文件 :

Kafka是一种分布式的,分区的,复制的提交日志服务。 它提供消息传递系统的功能,但具有独特的设计。

Kafka作为集群运行,这些节点称为代理。 代理可以是领导者或副本,以提供高可用性和容错能力。 代理负责分区,分区是存储消息的分发单元。 这些消息是有序的,可以通过名为offset的索引进行访问。 一组分区构成一个主题,是消息的提要。 一个分区可以有不同的使用者,它们使用自己的偏移量访问消息。 生产者将消息发布到Kafka主题中。 Kafka文档中的以下图表可以帮助您理解以下内容:

话题

排队与发布-订阅

消费者群体是另一个关键概念,有助于解释为什么Kafka比RabbitMQ等其他消息传递解决方案更灵活,功能更强大。 消费者与消费者群体相关联。 如果每个使用者都属于同一个使用者组,则主题的消息将在各个使用者之间平均负载均衡; 这就是所谓的“排队模型”。 相反,如果每个使用者都属于不同的使用者组,则所有消息都将在每个客户端中使用。 这就是所谓的“发布-订阅”模型。

您可以混合使用这两种方法,分别针对不同的需求使用不同的逻辑使用者组,并在每个组中有多个使用者以通过并行提高吞吐量。 同样, Kafka文档中的另一个图表:

消费群体

了解我们的需求

执法

正如我们在以前的文章(见1, 2, 3 )该项目服务发布消息到卡夫卡的话题叫item_deleted 。 此消息将位于该主题的一个分区中。 为了定义消息将驻留在哪个分区中,Kafka提供了三种选择 :

  • 如果记录中指定了分区,请使用它
  • 如果未指定分区但存在键,则根据键的哈希值选择一个分区
  • 如果没有分区或密钥,则以循环方式选择一个分区

我们将使用item_id作为密钥。 执法服务的不同实例中包含的消费者仅对特定分区感兴趣,因为他们保留某些商品的内部状态。 让我们检查不同的Kafka使用者实现,以了解哪种使用最方便。

卡夫卡消费者

卡夫卡共有三个消费者: 高级消费者 , 简单消费者和新消费者

在这三个消费者中, 简单消费者在最低级别上运行。 它满足我们的要求,因为它允许消费者“在流程中仅使用主题中分区的子集”。 但是,正如文档所述:

SimpleConsumer确实需要使用者组中不需要的大量工作:

  • 您必须跟踪应用程序中的偏移量,才能知道从何处停止消费
  • 您必须确定哪个Broker是主题和分区的主要Broker。
  • 您必须处理经纪人负责人变更

如果您阅读了建议的用于处理这些问题的代码,则将不鼓励您使用此使用者。

新使用者提供正确的抽象级别,并允许我们订阅特定的分区。 他们在文档中建议以下用例:

第一种情况是,如果进程正在维护与该分区关联的某种本地状态(例如本地磁盘上的键值存储),因此该进程应仅获取其在磁盘上维护的分区的记录。

不幸的是,我们的系统使用的是Kafka 0.8,而该使用者仅从0.9开始可用。 我们没有足够的资源来迁移到该版本,因此我们需要坚持使用高级消费者

该使用者提供了一个不错的API,但不允许我们订阅特定的分区。 这意味着,执法服务的每个实例都将使用每条消息,即使那些无关的消息也是如此。 我们可以通过为每个实例定义不同的消费者组来实现。

利用Akka Event Bus

在上一篇文章中,我们定义了一些等待ItemDeleted消息的有限状态机ItemDeleted

when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}

我们的卡夫卡消费者可以将所有消息转发给那些演员,并让他们丢弃/过滤不相关的物品。 但是,我们不想让参与者浪费大量的冗余和低效的工作,因此我们将添加一层抽象,使他们能够以一种非常有效的方式丢弃适当的消息。

final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b)
}

Akka Event Bus按分区为我们提供订阅,而我们的Kafka高级消费者中缺少该分区。 我们将从卡夫卡消费者处发布每条消息到公交车上:

itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))

在上一篇文章中,我们展示了如何使用该分区键订阅消息:

itemDeletedBus.subscribe(self, item.partitionKey)

LookupClassification将过滤不需要的消息,因此我们的参与者不会过载。

摘要

由于Kafka提供的灵活性,我们能够设计我们的系统以了解不同的折衷方案。 在接下来的文章中,我们将看到如何协调这些FSM的结果以向客户端提供同步响应。

第一部分 | 第2部分 | 第三部分

翻译自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/353668.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解C++中的mutable关键字

2006-12-16 05:00 来源:BLOG 作者:寒星轩 责任编辑:方舟yesky 评论(32)推荐:经典教程专区mutalbe的中文意思是“可变的,易变的”,跟constant(既C中的const)是反义词。在C中&…

使用Boxfuse为您的REST API设置https

在我的上 一篇 文章中,我展示了在Boxfuse的帮助下,基于Spring Boot框架建立REST API并在AWS上运行非常容易 。 下一步是利用SSL与API进行通信。 通过使用SSL,我们确保在REST API服务器和API客户端之间的传输过程中保存了数据 。 要为Spring B…

Python类与对象实验

一、任务描述 本实验任务主要对Python类与对象进行一些基本操作,通过完成本实验任务,要求学生熟练掌握Python类与对象的关系,并对Python类与对象的基本操作进行整理并填写工作任务报告。 二、任务目标 1、掌握Python类的创建 2、掌握类对象 三…

matlab 五点三次平滑算法

(2012-04-23 21:01:31) 转载▼标签: 杂谈 分类: matlab http://www.ilovematlab.cn/thread-71818-1-1.html 这里提供一个函数mean5_3(五点三次平滑算法)对数据进行平滑处理: load V1.mat subplot 211; plot(V1); ylim([2000 7000]); grid; y…

您在2016年OpenStack峰会上错过的事情

今年我第一次参加了4月25日至29日在德克萨斯州奥斯汀举行的OpenStack峰会。 今天结束了,我要回家了,我想回顾一下,从我的角度分享你错过的事情。 作为以应用程序开发人员为重点的技术传播者,转移到包含Red Hat产品组合的基础架构…

C/C++中的常量指针与指针常量

常量指针 常量指针是指向常量的指针,指针指向的内存地址的内容是不可修改的。 常量指针定义“const int *p&a;”告诉编译器,*p是常量,不能将*p作为左值进行操作。但这里的指针p还是一个变量,它的内容存放常量的地址&#xff0…

基于javafx的五子棋_JavaFX中基于表达式的PathTransitions

基于javafx的五子棋在JavaFX中,您可以使用PathTransition对象为路径上的节点设置动画。 PathTransitions使用Shape对象来描述它们需要沿其动画的路径。 JavaFX提供了各种类型的形状(例如,多边形,圆形,多边形&#xff0…

Drools 6.4.0.Final提供

最新和最出色的Drools 6.4.0.Final版本现已可供下载。 这是我们先前构建的增量版本,对核心引擎和Web工作台进行了一些改进。 您可以在此处找到更多详细信息,下载和文档: Drools网站 资料下载 文献资料 发行说明 请阅读下面的一些发行要…

软考解析:2014年下半年下午试题

软考解析:2014年下半年下午试题 第一题:数据流图 第四题:算法题 第五题:Java设计模式 转载于:https://www.cnblogs.com/MrSaver/p/9073778.html

malloc()参数为0的情况

问题来自于《程序员面试宝典(第三版)》第12.2节问题9(这里不评价《程序员面试宝典》,就题论题): 下面的代码片段输出是什么?为什么? char *ptr;if((ptr (char *)malloc(0))NULL)put…

C++ 关键字typeid

转载网址:http://www.cppblog.com/smagle/archive/2010/05/14/115286.aspx 在揭开typeid神秘面纱之前,我们先来了解一下RTTI(Run-Time Type Identification,运行时类型识别),它使程序能够获取由基指针或引用…

使用Apache Camel进行负载平衡

在此示例中,我们将向您展示如何使用Apache Camel作为系统的负载平衡器。 在计算机世界中,负载均衡器是一种充当反向代理并在许多服务器之间分配网络或应用程序流量的设备。 负载平衡器用于增加容量(并发用户)和应用程序的可靠性。…

Java8-Guava实战示例

示例一&#xff1a; 跟示例三对比一下&#xff0c;尽量用示例三 List<InvoiceQueryBean> invoiceQueryBeanList new ArrayList<>(); List<String> invoices Lists.newArrayList(Iterators.transform(invoiceQueryBeanList.iterator(), new Function<Inv…

java项目构建部署包

博客分类&#xff1a; JAVA Java 工程在生产环境运行时&#xff0c;一般需要构建成一个jar&#xff0c;同时在运行时需要把依赖的jar添加到classpath中去&#xff0c;如果直接运行添加classpath很不方便&#xff0c;比较方便的是创建一个shell脚本。在公司项目中看到把工程代码…

表达式前后缀表达形式 [zz]

(2012-09-12 13:08:39) 转载▼标签&#xff1a; 杂谈 转自&#xff1a;http://blog.csdn.net/whatforever/article/details/673853835,15,,80,70,-,*,20,/ //后缀表达方式(((3515)*(80-70))/20&#xff09;25 //中缀表达方式 /,*,,35,15,-,80,70, 2…

引用:初探Sql Server 执行计划及Sql查询优化

引用:初探Sql Server 执行计划及Sql查询优化 原文:引用:初探Sql Server 执行计划及Sql查询优化初探Sql Server 执行计划及Sql查询优化 收藏MSSQL优化之————探索MSSQL执行计划作者&#xff1a;no_mIss最近总想整理下对MSSQL的一些理解与感悟&#xff0c;却一直没有心思和时间…

Lombok,自动值和不可变项

我喜欢布兰登&#xff08;Brandon &#xff09;在博客文章中比较Project Lombok &#xff0c; AutoValue和Immutables的建议 &#xff0c;而这篇文章试图做到这一点。 我已经简要概述了Project Lombok &#xff0c; AutoValue和Immutables &#xff0c;但是这篇文章有所不同&am…

用interrupt()中断Java线程

Javathread 最近在学习Java线程相关的东西&#xff0c;和大家分享一下&#xff0c;有错误之处欢迎大家指正&#xff0e; 假如我们有一个任务如下&#xff0c;交给一个Java线程来执行&#xff0c;如何才能保证调用interrupt()来中断它呢&#xff1f; Java代码 class ATask imple…

JAVA分代收集机制详解

Java堆中是JVM管理的最大一块内存空间。主要存放对象实例。在JAVA中堆被分为两块区域&#xff1a;新生代&#xff08;young&#xff09;、老年代&#xff08;old&#xff09;。堆大小新生代老年代&#xff1b;&#xff08;新生代占堆空间的1/3、老年代占堆空间2/3&#xff09;新…

高可用架构

转载于:https://www.cnblogs.com/138026310/p/9088341.html