Kafka的Spring Cloud Stream

总览

该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序。

在本教程结束时,您将运行一个简单的基于Spring Boot的Greetings微服务

  1. 从REST API获取消息
  2. 将其写入Kafka主题
  3. 从主题中读取
  4. 将其输出到控制台

让我们开始吧!

顺便说一句,您可以在此处找到源代码。

什么是Spring Cloud Streaming?

Spring Cloud Stream是基于Spring Boot构建的框架,用于构建消息驱动的微服务。

什么是卡夫卡?

Kafka是最初由LinkedIn开发的流行的高性能和水平可伸缩的消息传递平台。

安装Kafka

从这里下载Kafka并将其解压缩:

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

启动Zookeeper和Kafka

在Windows上:

> bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> bin\windows\kafka-server-start.bat config\server.properties

在Linux或Mac上:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties

如果计算机从休眠状态唤醒后,Kafka没有运行并且无法启动,请删除<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。

什么是Lombok?

Lombok是一个Java框架,可在代码中自动生成getter,setter,toString(),构建器,记录器等。

Maven依赖

转到https://start.spring.io创建一个Maven项目:

  1. 添加必要的依赖项: Spring Cloud StreamKafkaDevtools (用于在开发过程中进行热重新部署,可选), Actuator (用于监视应用程序,可选), Lombok (确保在IDE中也安装了Lombok插件)
  2. 单击生成项目按钮以zip文件形式下载项目
  3. 解压缩zip文件并将maven项目导入到您喜欢的IDE

注意pom.xml文件中的Maven依赖项:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><!-- Also install the Lombok plugin in your IDE --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency>

…还有<dependencyManagement>部分:

<dependencyManagement><dependencies><dependency><!-- Import dependency management from Spring Boot --><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>${spring-cloud-stream.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

…和<repository>部分:

<repository><id>spring-milestones</id><name>Spring Milestones</name><url>http://repo.spring.io/libs-milestone</url><snapshots><enabled>false</enabled></snapshots>
</repository>

定义卡夫卡流

package com.kaviddiss.streamkafka.stream;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  public interface GreetingsStreams {String INPUT = "greetings-in";String OUTPUT = "greetings-out";@Input(INPUT)SubscribableChannel inboundGreetings();@Output(OUTPUT)MessageChannel outboundGreetings();
}

为了使我们的应用程序能够与Kafka进行通信,我们需要定义一个出站流以将消息写入Kafka主题,并定义一个入站流以读取来自Kafka主题的消息。

通过简单地创建一个接口为每个流定义单独的方法,Spring Cloud提供了一种方便的方法。

inboundGreetings()方法定义要从Kafka读取的入站流,而outboundGreetings()方法定义要写入Kafka的出站流。

在运行时,Spring将为GreetingsStreams接口创建一个基于Java代理的实现,该实现可以作为Spring Bean注入到代码中的任何位置,以访问我们的两个流。

配置Spring Cloud Stream

下一步是将Spring Cloud Stream配置为绑定到GreetingsStreams接口中的流。 这可以通过使用以下代码创建@Configurationcom.kaviddiss.streamkafka.config.StreamsConfig来完成:

package com.kaviddiss.streamkafka.config;import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

使用@EnableBinding批注(将GreatingsService接口传递到该批注)完成@EnableBindingGreatingsService

Kafka的配置属性

默认情况下,配置属性存储在src/main/resources/application.properties文件中。

但是,我更喜欢使用YAML格式,因为它不太冗长,并且允许将公共属性和特定于环境的属性保留在同一文件中。

现在,让我们将application.properties重命名为application.yaml并将config片段下方粘贴到文件中:

spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:greetings-in:destination: greetingscontentType: application/jsongreetings-out:destination: greetingscontentType: application/json

上面的配置属性配置要连接的Kafka服务器的地址,以及我们用于代码中的入站和出站流的Kafka主题。 他们俩都必须使用相同的Kafka主题!

contentType属性告诉Spring Cloud Stream在流中以String的形式发送/接收我们的消息对象。

创建消息对象

使用下面的代码创建一个简单的com.kaviddiss.streamkafka.model.Greetings类,该代码将表示我们从中读取并写入的greetings Kafka主题:

package com.kaviddiss.streamkafka.model;// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;@Getter @Setter @ToString @Builder
public class Greetings {private long timestamp;private String message;
}

注意,由于Lombok批注,该类如何没有任何getter和setter。 @ToString将使用类的字段生成toString()方法,而@Builder批注将允许我们使用流畅的生成器创建Greetings对象(请参见下文)。

创建服务层以写入Kafka

让我们创建的com.kaviddiss.streamkafka.service.GreetingsService下面的代码,将写一个类Greetings对象的greetings卡夫卡话题:

package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;@Service
@Slf4j
public class GreetingsService {private final GreetingsStreams greetingsStreams;public GreetingsService(GreetingsStreams greetingsStreams) {this.greetingsStreams = greetingsStreams;}public void sendGreeting(final Greetings greetings) {log.info("Sending greetings {}", greetings);MessageChannel messageChannel = greetingsStreams.outboundGreetings();messageChannel.send(MessageBuilder.withPayload(greetings).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());}

@Service批注会将此类配置为Spring Bean,并通过构造函数注入GreetingsService依赖项。

@Slf4j批注将生成一个SLF4J记录器字段,可用于记录日志。

sendGreeting()方法中,我们使用注入的GreetingsStream对象发送由Greetings对象表示的消息。

创建REST API

现在,我们将创建一个REST api端点,该端点将触发使用GreetingsService Spring Bean向Kafka发送消息:

package com.kaviddiss.streamkafka.web;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; @RestController
public class GreetingsController {private final GreetingsService greetingsService;public GreetingsController(GreetingsService greetingsService) {this.greetingsService = greetingsService;}@GetMapping("/greetings")@ResponseStatus(HttpStatus.ACCEPTED)public void greetings(@RequestParam("message") String message) {Greetings greetings = Greetings.builder().message(message).timestamp(System.currentTimeMillis()).build();greetingsService.sendGreeting(greetings);}
}

@RestController注释告诉Spring这是一个Controller bean(MVC中的C)。 greetings()方法定义一个HTTP GET /greetings端点,该端点接受message请求参数,并将其传递给GreetingsServicesendGreeting()方法。

听问候卡夫卡主题

让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类,该类将侦听greetings Kafka主题上的消息并将其记录在控制台上:

package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class GreetingsListener {@StreamListener(GreetingsStreams.INPUT)public void handleGreetings(@Payload Greetings greetings) {log.info("Received greetings: {}", greetings);}
}

@Component批注类似于@Service @Component@Service @RestController定义了一个Spring Bean。

GreetingsListener有一个方法, handleGreetings()将通过云春流与每一个新的调用Greetings的消息对象greetings卡夫卡的话题。 这要感谢为handleGreetings()方法配置的@StreamListener批注。

运行应用程序

最后一个难题是由Spring Initializer自动生成的com.kaviddiss.streamkafka.StreamKafkaApplication类:

package com.kaviddiss.streamkafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamKafkaApplication {public static void main(String[] args) {SpringApplication.run(StreamKafkaApplication.class, args);}
}

无需在此处进行任何更改。 您可以在您的IDE中将此类作为Java应用程序运行,也可以使用Spring Boot maven插件从命令行运行该应用程序:

> mvn spring-boot:run

应用程序运行后,在浏览器中转到http:// localhost:8080 / greetings?message = hello并检查您的控制台。

摘要

我希望您喜欢本教程。 随时提出任何问题并留下您的反馈。

翻译自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/348842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常见的股票技术因子学习以及计算

最近在看《量化投资数据挖掘技术与实践&#xff08;MATLAB版&#xff09;》。学习了其中的常见的股票衍生变量&#xff0c;并且利用WIND金融数据终端的matlab借口windmatlab导出一些数据进行了一个简单的学习。特此记录。 下面是我对于书中提到的几个因子的学习总结&#xff1…

Java – HashMap详细说明

HashMap基于哈希算法工作&#xff0c;根据Java文档HashMap具有以下四个构造函数&#xff0c; 建设者 描述 HashMap ​() 构造一个空的 具有默认初始容量&#xff08;16&#xff09;和默认加载因子&#xff08;0.75&#xff09;的HashMap 。 HashMap ​(int initialCapaci…

Python实现石头-剪刀-布小游戏

近日在学习Python的一些基础知识&#xff0c;觉得还是很有趣的一个一门语言&#xff01;就目前的学习的一些知识&#xff0c;编写了一些一个简单的石头剪刀布的游戏。主要是熟悉一些Python的一些控制语句。 import random while 1:sint(random.randint(1,3))print(s)print()if…

Python:递归输出斐波那契数列

今天学习Python的时候做一道练习题&#xff0c;题目是这样的&#xff1a; 题目 导入 问题 有一对兔子&#xff0c;从出生后第3个月起每个月都生一对兔子&#xff0c;小兔子长到第三个月后每个月又生一对兔子&#xff0c;假如兔子都不死&#xff0c;问每个月的兔子总对数为多…

排序算法二:快速排序算法原理以及MATLAB与Python实现

今天继续学习排序算法。今天的主角是快速排序算法。 1. 快速排序基本原理 快速排序是C.R.A.Hoare于1962年提出的一种划分交换排序。它采用了一种分治的策略&#xff0c;通常称其为分治法(Divide-and-ConquerMethod)。 该方法的基本思想是&#xff1a; 1&#xff0e;先从数列…

排序算法三:堆排序基本原理以及Python实现

1. 基本原理 堆排序就是利用堆的特性进行一个无序序列的排序工作。 堆的特点 堆分为最大堆和最小堆&#xff0c;其实就是完全二叉树。 最大堆要求节点的元素都要不小于其孩子最小堆要求节点元素都不大于其左右孩子。 两者对左右孩子的大小关系不做任何要求&#xff0c;其实…

spring jms 消息_Spring JMS,消息自动转换,JMS模板

spring jms 消息在我的一个项目中&#xff0c;我应该创建一个消息路由器&#xff0c;就像所有路由器一样&#xff0c;它应该从一个主题获取JMS消息并将其放入另一个主题。 该消息本身是JMS文本消息&#xff0c;实际上包含XML消息。 收到消息后&#xff0c;我还应该添加一些其他…

排序算法四:归并排序基本原理以及Python实现

1. 基本原理 归并排序建立在归并操作上的一种算法。该算法是采用分治法&#xff08;Divide and Conquer&#xff09;的一个非常典型的应用。归并排序是将两 个已经有序的序列合成一个有序的序列的过程。 因此&#xff0c;对于一个待排序的序列来说&#xff0c;首先要将其进行…

如何将JAR添加到Jetbrains MPS项目

Jetbrains MPS是创建DSL的绝佳工具。 我们喜欢它&#xff0c;并在我们的咨询工作中定期使用它。 因此&#xff0c;我们之前已经写过关于Jetbrains MPS的文章 。 作为投影编辑器&#xff0c;您可以轻松创建可通过图形界面或数学公式之类使用的DSL。 尽管所有这些功能都需要做一…

Python 3实现k-邻近算法以及 iris 数据集分类应用

前言 这个周基本在琢磨这个算法以及自己利用Python3 实现自主编程实现该算法。持续时间比较长&#xff0c;主要是Pyhton可能还不是很熟练&#xff0c;走了很多路&#xff0c;基本是一边写一边学。不过&#xff0c;总算是基本搞出来了。不多说&#xff0c;进入正题。 1. K-邻近…

UART原理

UART原理 通用异步收发传输器&#xff08;Universal Asynchronous Receiver / Transmitter)&#xff0c;通常称作UART&#xff0c;是一种异步收发传输器&#xff0c;是电脑硬件的一部分。将资料由串行通信与并行通信间作传输转换&#xff0c;作为并行输入成为串行输出的芯片&am…

AttributeError: module 'tensorflow' has no attribute 'placeholder'等一系列tensorflow版本导致的问题

新人tensorflow2.1版本导致程序我无法运行最简单的办法 法1 tensorflow.compat.v1 import tensorflow.compat..v1 as tf tf.disable_v2_behavior() 亲测不好用 法2 卸载2.1&#xff0c;安装老版本 在Terminal界面输入 pip uninstall tensorflow接着输入Y确定卸载。 安装t…

word中一直提示校对错误,如何关闭当前文档校对功能

关闭当前文档校对功能 文件>选项>校对>例外项&#xff0c;选中两个&#xff0c;如图 对比效果&#xff1a;

将测微仪与Spring Boot 2一起使用

这是快速入门&#xff0c;介绍了如何使用出色的Micrometer库来检测基于Spring Boot 2的应用程序并在Prometheus中记录指标 介绍 Micrometer在各种监视工具提供的客户端库上提供了基于Java的外观。 以Prometheus为例&#xff0c;如果我要将Java应用程序与Prometheus集成&#…

与Maven的集成测试

用Maven实施单元测试是很普通的事情&#xff0c;我们大多数人都熟悉项目结构以及单元测试所在的位置。 但是&#xff0c;集成测试是一种不同的情况&#xff0c;大多数情况下它们具有完全不同的要求。 例如&#xff0c;可以让您的单元测试在内存数据库中的h2上运行&#xff0c;…

通信原理-通信系统的组成

第一章 通信系统的组成 1、通信系统一般模型 发送设备&#xff1a;将信源产生的原始电信号变换成适合在信道中传输的形式。变换方式有调制、放大、滤波、编码、多路复用等。 信道&#xff1a;传输信号的通道.即传输媒质。在给子信号通道的同时&#xff0c;信道也会对信号产生损…

使用log4j2免费分配日志记录

介绍 最近&#xff0c;我正在为一个客户端工作&#xff0c;试图为大型精心制作的Java系统消除一些GC暂停。 经过分析后&#xff0c;我意识到大部分垃圾都是通过日志记录产生的&#xff01; 是否有一种简单的方法来删除所有分配&#xff1f; 原来有:) 我应该使用哪个框架进行GC…

Verilog中fork...join 的用法

特点 中间的语句并行执行&#xff1b;&#xff08;延时不累加&#xff09; 不能用于综合&#xff1b; 代码 module signal_gen; reg wave; parameter cycle 5; initial beginforkwave 0;#(cycle) wave 1;#(2*cycle) wave 0;#(3*cycle) wave 1;#(4*cycle…

使用JWT的Cloud Native应用程序

本机云应用程序是为云计算环境开发的应用程序。 对于“ 什么是云原生应用程序 ”这个问题没有具体答案&#xff0c;但是必须满足不同的概念。 在我看来&#xff0c;最重要的功能之一就是能够快速缩放 。 这意味着我们的应用程序在每台服务器上都无法具有任何状态&#xff0c;…

开源项目GoodView点赞效果

点赞1效果&#xff1a; GoodView方法&#xff1a; 使用GoodView的Demo: public class MainActivity extends Activity { Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main)final Good…