Kafka的Spring Cloud Stream

总览

该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序。

在本教程结束时,您将运行一个简单的基于Spring Boot的Greetings微服务

  1. 从REST API获取消息
  2. 将其写入Kafka主题
  3. 从主题中读取
  4. 将其输出到控制台

让我们开始吧!

顺便说一句,您可以在此处找到源代码。

什么是Spring Cloud Streaming?

Spring Cloud Stream是基于Spring Boot构建的框架,用于构建消息驱动的微服务。

什么是卡夫卡?

Kafka是最初由LinkedIn开发的流行的高性能和水平可伸缩的消息传递平台。

安装Kafka

从这里下载Kafka并将其解压缩:

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

启动Zookeeper和Kafka

在Windows上:

> bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> bin\windows\kafka-server-start.bat config\server.properties

在Linux或Mac上:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties

如果计算机从休眠状态唤醒后,Kafka没有运行并且无法启动,请删除<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。

什么是Lombok?

Lombok是一个Java框架,可在代码中自动生成getter,setter,toString(),构建器,记录器等。

Maven依赖

转到https://start.spring.io创建一个Maven项目:

  1. 添加必要的依赖项: Spring Cloud StreamKafkaDevtools (用于在开发过程中进行热重新部署,可选), Actuator (用于监视应用程序,可选), Lombok (确保在IDE中也安装了Lombok插件)
  2. 单击生成项目按钮以zip文件形式下载项目
  3. 解压缩zip文件并将maven项目导入到您喜欢的IDE

注意pom.xml文件中的Maven依赖项:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><!-- Also install the Lombok plugin in your IDE --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency>

…还有<dependencyManagement>部分:

<dependencyManagement><dependencies><dependency><!-- Import dependency management from Spring Boot --><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>${spring-cloud-stream.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

…和<repository>部分:

<repository><id>spring-milestones</id><name>Spring Milestones</name><url>http://repo.spring.io/libs-milestone</url><snapshots><enabled>false</enabled></snapshots>
</repository>

定义卡夫卡流

package com.kaviddiss.streamkafka.stream;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  public interface GreetingsStreams {String INPUT = "greetings-in";String OUTPUT = "greetings-out";@Input(INPUT)SubscribableChannel inboundGreetings();@Output(OUTPUT)MessageChannel outboundGreetings();
}

为了使我们的应用程序能够与Kafka进行通信,我们需要定义一个出站流以将消息写入Kafka主题,并定义一个入站流以读取来自Kafka主题的消息。

通过简单地创建一个接口为每个流定义单独的方法,Spring Cloud提供了一种方便的方法。

inboundGreetings()方法定义要从Kafka读取的入站流,而outboundGreetings()方法定义要写入Kafka的出站流。

在运行时,Spring将为GreetingsStreams接口创建一个基于Java代理的实现,该实现可以作为Spring Bean注入到代码中的任何位置,以访问我们的两个流。

配置Spring Cloud Stream

下一步是将Spring Cloud Stream配置为绑定到GreetingsStreams接口中的流。 这可以通过使用以下代码创建@Configurationcom.kaviddiss.streamkafka.config.StreamsConfig来完成:

package com.kaviddiss.streamkafka.config;import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

使用@EnableBinding批注(将GreatingsService接口传递到该批注)完成@EnableBindingGreatingsService

Kafka的配置属性

默认情况下,配置属性存储在src/main/resources/application.properties文件中。

但是,我更喜欢使用YAML格式,因为它不太冗长,并且允许将公共属性和特定于环境的属性保留在同一文件中。

现在,让我们将application.properties重命名为application.yaml并将config片段下方粘贴到文件中:

spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:greetings-in:destination: greetingscontentType: application/jsongreetings-out:destination: greetingscontentType: application/json

上面的配置属性配置要连接的Kafka服务器的地址,以及我们用于代码中的入站和出站流的Kafka主题。 他们俩都必须使用相同的Kafka主题!

contentType属性告诉Spring Cloud Stream在流中以String的形式发送/接收我们的消息对象。

创建消息对象

使用下面的代码创建一个简单的com.kaviddiss.streamkafka.model.Greetings类,该代码将表示我们从中读取并写入的greetings Kafka主题:

package com.kaviddiss.streamkafka.model;// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;@Getter @Setter @ToString @Builder
public class Greetings {private long timestamp;private String message;
}

注意,由于Lombok批注,该类如何没有任何getter和setter。 @ToString将使用类的字段生成toString()方法,而@Builder批注将允许我们使用流畅的生成器创建Greetings对象(请参见下文)。

创建服务层以写入Kafka

让我们创建的com.kaviddiss.streamkafka.service.GreetingsService下面的代码,将写一个类Greetings对象的greetings卡夫卡话题:

package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;@Service
@Slf4j
public class GreetingsService {private final GreetingsStreams greetingsStreams;public GreetingsService(GreetingsStreams greetingsStreams) {this.greetingsStreams = greetingsStreams;}public void sendGreeting(final Greetings greetings) {log.info("Sending greetings {}", greetings);MessageChannel messageChannel = greetingsStreams.outboundGreetings();messageChannel.send(MessageBuilder.withPayload(greetings).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());}

@Service批注会将此类配置为Spring Bean,并通过构造函数注入GreetingsService依赖项。

@Slf4j批注将生成一个SLF4J记录器字段,可用于记录日志。

sendGreeting()方法中,我们使用注入的GreetingsStream对象发送由Greetings对象表示的消息。

创建REST API

现在,我们将创建一个REST api端点,该端点将触发使用GreetingsService Spring Bean向Kafka发送消息:

package com.kaviddiss.streamkafka.web;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; @RestController
public class GreetingsController {private final GreetingsService greetingsService;public GreetingsController(GreetingsService greetingsService) {this.greetingsService = greetingsService;}@GetMapping("/greetings")@ResponseStatus(HttpStatus.ACCEPTED)public void greetings(@RequestParam("message") String message) {Greetings greetings = Greetings.builder().message(message).timestamp(System.currentTimeMillis()).build();greetingsService.sendGreeting(greetings);}
}

@RestController注释告诉Spring这是一个Controller bean(MVC中的C)。 greetings()方法定义一个HTTP GET /greetings端点,该端点接受message请求参数,并将其传递给GreetingsServicesendGreeting()方法。

听问候卡夫卡主题

让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类,该类将侦听greetings Kafka主题上的消息并将其记录在控制台上:

package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class GreetingsListener {@StreamListener(GreetingsStreams.INPUT)public void handleGreetings(@Payload Greetings greetings) {log.info("Received greetings: {}", greetings);}
}

@Component批注类似于@Service @Component@Service @RestController定义了一个Spring Bean。

GreetingsListener有一个方法, handleGreetings()将通过云春流与每一个新的调用Greetings的消息对象greetings卡夫卡的话题。 这要感谢为handleGreetings()方法配置的@StreamListener批注。

运行应用程序

最后一个难题是由Spring Initializer自动生成的com.kaviddiss.streamkafka.StreamKafkaApplication类:

package com.kaviddiss.streamkafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamKafkaApplication {public static void main(String[] args) {SpringApplication.run(StreamKafkaApplication.class, args);}
}

无需在此处进行任何更改。 您可以在您的IDE中将此类作为Java应用程序运行,也可以使用Spring Boot maven插件从命令行运行该应用程序:

> mvn spring-boot:run

应用程序运行后,在浏览器中转到http:// localhost:8080 / greetings?message = hello并检查您的控制台。

摘要

我希望您喜欢本教程。 随时提出任何问题并留下您的反馈。

翻译自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/348842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用JShell的Java 9 Streams API

这篇文章着眼于使用JShell的Java 9 Streams API。 Streams API的更改以Java 8中Streams的成功为基础&#xff0c;并引入了许多实用程序方法– takeWhile&#xff0c;dropWhile和iterate。 这篇文章延续了My Top Java 9功能&#xff0c;并使用Jshell探索了这些方法。 流API Str…

常见的股票技术因子学习以及计算

最近在看《量化投资数据挖掘技术与实践&#xff08;MATLAB版&#xff09;》。学习了其中的常见的股票衍生变量&#xff0c;并且利用WIND金融数据终端的matlab借口windmatlab导出一些数据进行了一个简单的学习。特此记录。 下面是我对于书中提到的几个因子的学习总结&#xff1…

算法题:输入一个表示整数的字符串,把该字符串转换成整数并输出。例如输入字符串“12345”,则输出整数“12345”

今天这道算法题比较简单&#xff0c;主要考察的思考问题的全面性。这个需要考虑的几种情况。 如果输入的整数字符串是个负数&#xff0c;怎么处理&#xff1f; 如果输入的第一个字符是0&#xff0c;则怎么处理&#xff1f; 如果输入的是非0~9之间的字符怎么处理&#xff1f;…

排序算法一:冒泡排序,插入排序以及选择排序原理与MATLAB实现

最近在学习排序算法的一些知识。还是比较有趣的。所以好好研究了一下各个算法。并且使用matlab进行了个基本的实现&#xff0c;目前仅仅是实现吧&#xff0c;优化什么的可能目前的水平达不到吧&#xff0c;毕竟是用matlab实现&#xff0c;还是比较简单。以后还是希望使用C/C&am…

Java – HashMap详细说明

HashMap基于哈希算法工作&#xff0c;根据Java文档HashMap具有以下四个构造函数&#xff0c; 建设者 描述 HashMap ​() 构造一个空的 具有默认初始容量&#xff08;16&#xff09;和默认加载因子&#xff08;0.75&#xff09;的HashMap 。 HashMap ​(int initialCapaci…

Python实现石头-剪刀-布小游戏

近日在学习Python的一些基础知识&#xff0c;觉得还是很有趣的一个一门语言&#xff01;就目前的学习的一些知识&#xff0c;编写了一些一个简单的石头剪刀布的游戏。主要是熟悉一些Python的一些控制语句。 import random while 1:sint(random.randint(1,3))print(s)print()if…

Python:递归输出斐波那契数列

今天学习Python的时候做一道练习题&#xff0c;题目是这样的&#xff1a; 题目 导入 问题 有一对兔子&#xff0c;从出生后第3个月起每个月都生一对兔子&#xff0c;小兔子长到第三个月后每个月又生一对兔子&#xff0c;假如兔子都不死&#xff0c;问每个月的兔子总对数为多…

Spring Webflux –编写过滤器

Spring Webflux是Spring 5的一部分提供的新的响应式Web框架。 在传统的基于Spring MVC的应用程序&#xff08; Servlet Filter &#xff0c; HandlerInterceptor &#xff09;中编写过滤器的方式与在基于Spring Webflux的应用程序中编写过滤器的方式非常不同&#xff0c;本文将…

排序算法二:快速排序算法原理以及MATLAB与Python实现

今天继续学习排序算法。今天的主角是快速排序算法。 1. 快速排序基本原理 快速排序是C.R.A.Hoare于1962年提出的一种划分交换排序。它采用了一种分治的策略&#xff0c;通常称其为分治法(Divide-and-ConquerMethod)。 该方法的基本思想是&#xff1a; 1&#xff0e;先从数列…

排序算法三:堆排序基本原理以及Python实现

1. 基本原理 堆排序就是利用堆的特性进行一个无序序列的排序工作。 堆的特点 堆分为最大堆和最小堆&#xff0c;其实就是完全二叉树。 最大堆要求节点的元素都要不小于其孩子最小堆要求节点元素都不大于其左右孩子。 两者对左右孩子的大小关系不做任何要求&#xff0c;其实…

spring jms 消息_Spring JMS,消息自动转换,JMS模板

spring jms 消息在我的一个项目中&#xff0c;我应该创建一个消息路由器&#xff0c;就像所有路由器一样&#xff0c;它应该从一个主题获取JMS消息并将其放入另一个主题。 该消息本身是JMS文本消息&#xff0c;实际上包含XML消息。 收到消息后&#xff0c;我还应该添加一些其他…

排序算法四:归并排序基本原理以及Python实现

1. 基本原理 归并排序建立在归并操作上的一种算法。该算法是采用分治法&#xff08;Divide and Conquer&#xff09;的一个非常典型的应用。归并排序是将两 个已经有序的序列合成一个有序的序列的过程。 因此&#xff0c;对于一个待排序的序列来说&#xff0c;首先要将其进行…

如何将JAR添加到Jetbrains MPS项目

Jetbrains MPS是创建DSL的绝佳工具。 我们喜欢它&#xff0c;并在我们的咨询工作中定期使用它。 因此&#xff0c;我们之前已经写过关于Jetbrains MPS的文章 。 作为投影编辑器&#xff0c;您可以轻松创建可通过图形界面或数学公式之类使用的DSL。 尽管所有这些功能都需要做一…

Python 3实现k-邻近算法以及 iris 数据集分类应用

前言 这个周基本在琢磨这个算法以及自己利用Python3 实现自主编程实现该算法。持续时间比较长&#xff0c;主要是Pyhton可能还不是很熟练&#xff0c;走了很多路&#xff0c;基本是一边写一边学。不过&#xff0c;总算是基本搞出来了。不多说&#xff0c;进入正题。 1. K-邻近…

spring mvc 异步_DeferredResult – Spring MVC中的异步处理

spring mvc 异步DeferredResult是一个可能尚未完成的计算的容器&#xff0c;它将在将来提供。 Spring MVC使用它来表示异步计算&#xff0c;并利用Servlet 3.0 AsyncContext异步请求处理。 简要介绍一下它是如何工作的&#xff1a; RequestMapping("/") ResponseBod…

切换表达式到Java吗?

已创建一个标题为“ Java语言的开关表达式”的JEP草案 。 当前的“摘要”状态为&#xff1a;“扩展switch语句&#xff0c;以便可以将其用作语句或表达式&#xff0c;并改善switch处理null的方式。 这些将简化日常编码&#xff0c;并为在switch使用模式匹配做好准备。” 除了启…

WildFly Kubernetes exec探针

活动和就绪探针会告诉Kubernetes吊舱是否正在运行并准备进行一些工作。 企业应用程序可以通过HTTP探测应用程序的状态。 如果没有暴露HTTP端点&#xff0c;Kubernetes也可以通过执行命令进行探测。 WildFly附带了有用的jboss-cli.sh 。 此CLI检索有关服务器和部署状态的信息&a…

FPGA硬件学习基础知识点总结(1)

FPGA硬件学习基础知识点总结&#xff08;1&#xff09;锁存器与触发器 总结一下数电&#xff0c;FPGA的一些基础知识&#xff0c;涉及到硬件电路的设计。主要是记录自己的学习过程。 锁存器与触发器 锁存器&#xff08;latch&#xff09;&#xff1a;锁存器是电平触发的存储单…

ejb java_EJB继承与Java继承不同

ejb java尽管EJB继承有时使用Java继承&#xff0c;但事实并非总是如此。 就像您在我以前的文章中可以读到的那样 &#xff0c;EJB不必实现任何接口即可公开业务接口。 反之亦然-仅仅是因为EJB实现了某个接口或扩展了其他EJB&#xff0c;并不意味着它公开了全部或任何视图。 假…

UART原理

UART原理 通用异步收发传输器&#xff08;Universal Asynchronous Receiver / Transmitter)&#xff0c;通常称作UART&#xff0c;是一种异步收发传输器&#xff0c;是电脑硬件的一部分。将资料由串行通信与并行通信间作传输转换&#xff0c;作为并行输入成为串行输出的芯片&am…