kafka netty
介绍
在上一个博客中 ,我介绍了Netty用作Web服务器。 该示例运行良好……只要需要广播服务器即可。
大多数情况下不是很有用。 更有可能的是,每个客户端仅接收针对其的数据,并保留了特殊情况下的广播,例如“服务器在15分钟内停机!” 关于该特定服务器示例的另一件事是,一切都是独立的。 例如,单片应用程序很好,但是在当今环境中,分布式微服务要好得多。 可伸缩性和可靠性至关重要。
Netty和Kafka在一起很棒。 Netty擅长处理大量客户,Kafka擅长使大量服务协同工作。 结合起来,它们是开发中的最佳选择。 但是,有些“陷阱”可能会使其变得繁琐。 该博客以及示例微服务/ Netty体系结构和功能全面的代码将有望帮助减轻烦恼并实现甜味。
第一要务
示例代码位于此处 。
有详细的自述文件,描述了设置环境所需的内容。 我试图将需求降到最低,仅Java 8和Maven 。 SLF4J和Logback用于记录日志。 我为Mac OSX和Ubuntu设置了脚本(我在Parallels容器中运行的14.04版本是我测试过的脚本),因此如果您在Windows上进行开发,则表示歉意。 该代码全是Java,并且我在Windows上看到过Kafka教程,因此所有内容都应在此处运行。 Maven构建也应该产生可以启动的目标,因此,在安装Zookeeper / Kafka的时候加了一点肘油(您可以按照脚本查看需要的设置),手动运行它并不重要。视窗。
注:如README.md
中所述,该脚本将删除任何现有的Zookeeper / Kafka安装和数据。 如果您已有设置,请不要使用脚本!
安装和配置必备mvn package
如果不使用脚本,请运行mvn package
如果是,则运行maclocal_run.sh
(或linuxlocal_run.sh
)。 该脚本将下载Zk / Kafka(如果尚未下载),进行安装,配置,启动它们,运行mvn package
,启动服务并最终启动服务器。 一旦启动,就抵制离开外壳的冲动,因为它会自动为架构的每个部分弹出新的选项卡。 启动Whirlpool服务器之后,就可以开始了。
我强烈建议创建一个脚本,以在本地安装,配置,构建和启动微服务环境。 创建每个单独的服务是一个很大的痛苦。 必要时也可以使用Docker,但我发现只需本地运行所有内容,下载所需的内容就少得多。
作为一个预告片,这里是UI(您也可以从GitHub上的README.md看到它)。
- 要添加股票代码,请输入它(即“ GOOG”),然后单击“股票”下的A按钮。 要删除它,请单击X。
- 要添加一个网站来测试它是打开还是关闭,请键入完全限定的URL(即http://facebook.com ),然后单击“ UpDown”下的A按钮。 要删除它,请单击X。
- 要添加天气检查,请在中键入城市,州(即“芝加哥,il”),然后单击“城市,州”下的A按钮。 要删除它,请单击X。
- 由于订阅与每个服务一起存储在内存中,因此订阅在页面刷新甚至登录/注销(具有相同的用户ID)后都不会丢失。 当然,“真实”系统将使用数据库。
- 订阅每10秒钟更新一次,因此我不会压倒Yahoo API,因此添加数据后请耐心等待。
建筑
在此示例中,我试图考虑可能有用的良好通用服务。 我最终选择了股票报价服务,“此网站是否正常运转”服务以及气象服务。 这些中的每一个都独立于各自具有Kafka主题的其他主题运行。
我选择配置Kafka的方式是每个服务使用一个命令主题,每个服务使用一个数据主题。 一切都可以只使用一个全局主题,读者可以决定要处理的内容,但将其分离出来可以使其更加清晰和整洁。
这是数据如何通过Kafka流动的示意图。 它是通过一个免费的基于Keyhole的基于Web的实用程序Mockola完成的 。 请注意,服务器知道所有主题,但是服务仅知道它们自己的主题。 cmd
主题用于将命令发送到服务,而数据主题(在其上没有-cmd
主题)用于从服务发送数据。 同样,所有这些都可以在一个bus
主题上进行处理,但是通过将它们分离出来,可以更轻松地了解发生了什么。
服务
现在让我们谈谈服务。 这三者非常相似,因此有一项基本服务可以完成大部分工作。 每个服务都有三个线程,由Java ExecutorService处理。 关于Executor服务的一件好事是,如果出现问题,它将自动重新启动线程。 这有助于弹性。
每个服务通过告诉基类使用什么主题和命令主题来启动自己。 然后,基类启动三个线程:一个用于从cmd主题读取命令,一个用于定期为客户端收集数据,一个用于在数据主题上发送数据。 这些线程使用非阻塞Java并发类ConcurrentLinkedQueue
和ConcurrentHashMap
。 哈希映射存储每个用户的订阅集,队列存储准备发送给数据主题的响应。
每个服务的流程是同时工作的三个线程。 阅读器使用Kafka使用者从其命令主题读取命令。 根据命令,添加或删除订阅。 该线程非常笨拙,因为它不要求服务对请求进行任何验证,而只是盲目地将发送给订阅的内容添加进去。 生产代码显然会添加一个调用,要求服务在允许成功订阅之前验证命令。 创建一个响应以放置主题,然后等待下一个命令。
注意 :关于数据的一些话题。 我使用JSON作为传输格式,但是XML或您想要的其他任何内容也可以使用。 重要的是,每个人都同意数据格式并坚持使用。 通用模块具有POJO类,这些类定义了数据将遵循的协定。 通常对所有消息有用的是时间戳,消息类型和客户端的ID。
另一个有用的东西是到期时间戳。 这些示例消息永远存在。 Message
类仅查看Message
的类型和ID。 服务器使用它来确定需要处理哪种类型的消息以及谁对该消息感兴趣。 没有这些,就很难甚至不可能处理数据。 现在,消息格式可以涉及很多,其中一些格式使用标题和部分来描述复杂的数据。 本示例尝试使所有内容尽可能简单。
净值服务器
让我们一次上一堂课。
NettyHttpFileHandler
与以前的博客相比,该类几乎没有变化。 可重用的片段已移至WebSocketHelper
类。 该文件的主要用途是提供浏览器要求的文件。
WebSocket助手
可能令人困惑的第一项是类变量clientAttr
。 在Netty Channel中存储数据要求将其附加到AttributeKey
。 这类似于Java并发类中的Atomic实例-它提供了数据容器。 我们将存储客户端ID(在本例中为用户名,但也可以很容易地作为会话ID),以便我们确定哪个Channel需要接收消息。
realWriteAndFlush()
方法设置适当的标题,内容长度和cookie。 然后,它写入并刷新HTTP响应。 线
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
告诉Netty这是需要写入客户端的数据的结尾,因此Netty会将其发送出去。
特别说明 :关于cookie的创建,请确保未设置HTTP Only
标志。 如果是,则JavaScript无法看到Cookie,也不会与WebSocket升级请求一起发送。 这样一来,您就必须创建自己的页面刷新管理和会话管理方法。
关于cookie的另一件事是使用Netty cookie编码器的STRICT版本,因此它将不允许多个具有相同名称的cookie。 我不确定何时允许这种情况发生。
WebSocketMessageHandler
这个类只是定义了一个接口WhirlpoolServerHandler
使用交谈的WhirlpoolMessageHandler
。
WhirlpoolMessageHandler
这是Netty和Kafka之间存在连接的地方。 两个执行器处理一个读取器线程和一个写入器线程。
编写器线程在请求队列中查找消息(有关这些消息在一分钟内来自何处的更多信息),并将消息放置在适当的Kafka命令主题上。
阅读器线程在Kafka数据主题上查找传入消息,为每个主题查找正确的Channel,然后将消息写入这些主题。
当客户端通过WebSockets发送消息时, WhirlpoolServerHandler
将确保已handleMessage()
完整的消息,然后调用handleMessage()
。 该方法确定是否为有效消息,然后将请求添加到请求队列中,以便读取器线程可以将其提取并提供给Kafka。
WhirlpoolServerHandler
这堂课有几件有趣的事。 首先,它可以区分HTTP,REST和WebSocket消息之间的区别。 执行此操作的Netty重写方法是channelRead0
。 这是Netty用来告诉我们消息何时到达以及消息是哪种类型的方法。 对于HTTP和REST调用, handleHttpRequest
调用handleHttpRequest
,对于handleWebSocketFrame
将调用handleWebSocketFrame
。
如果存在cookie方法,则handleHttpRequest
方法handleHttpRequest
读取该cookie。 在POST上,它会查找登录和注销信息。 对于登录,它将找出用户名/密码,创建cookie,并防止多次使用相同的名称登录。 所有这些代码将在应用程序的生产版本中添加额外的安全性进行拆分。 要注销,它会查找Channel,清理,关闭它并使cookie过期。
对于WebSocketUpgrade
,它要求Netty处理启动websocket所需的复杂握手。 完成此操作后,会将用户添加到握手期间创建的Channel。 这是用户连接到Channel的地方,如果cookie没有在请求中出现,那将不是一件容易的事。
在此唯一需要注意的另一件事是,此类设置为处理为SPA(单页应用程序)编码的客户端,因为它将将所有无法识别的调用重定向到index.html
。
该类中的其他方法更多地是为了提供信息,将在高级情况下使用。
漩涡服务器
此类启动Netty服务器并创建通道管道。 这是Netty的一个标准类,紧随Netty示例。
最后的想法
显然,此代码中还有更多内容。 每个服务和服务器的多个实例可以同时运行,并且Zk / Kafka可以集群以帮助提高弹性。 一个测试微服务应用程序弹性的强大实用程序是另一个名为TroubleMaker的免费开源Keyhole实用程序。 我还没有机会测试这个例子,但是我很期待这个机会。
我们没有涉及安全性,尽管我以前希望展示Netty与Shiro的集成,但这是一个非常复杂的话题。 我只能说这是有可能的,但是我还没有将所有内容都包裹在脑海中,以至于无法形成一个连贯的博客。
希望您喜欢该博客,并找到有用的代码。 通过博客或Twitter与我联系( @johnwboardman ,在这里我总是很欣赏新的关注)。
翻译自: https://www.javacodegeeks.com/2016/05/whirlpool-microservices-using-netty-kafka.html
kafka netty