说明:kafka是一款消息中间件,可实现微服务之间的异步调用。本文介绍kafka的简单使用。windows操作系统下的kafka安装,参考下面这篇文章
- Kafka安装
启动
按照上面博客的介绍,使用CMD命令启动,如下:
Demo
Github上有一个现成的Demo,地址:https://github.com/xiaour/SpringBootDemo,clone到本地,里面有一个kafka的demo,打开。
启动
打开后pom文件中这个版本号需要修改成如下,不然启动会提示一个方法没找到
其他配置都可以不改(当然如果端口冲突了,可以换个端口),启动
运行
接着来看下代码,代码中消息生产者、消费者如下:
(消息生产者,手动创建一个对象,推到名为test的topic里)
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.UUID;@Component
public class Producer {@Autowiredprivate KafkaTemplate kafkaTemplate;private static Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId("KFK_"+System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());kafkaTemplate.send("test", gson.toJson(message));}
}
(Message是自定义对象,如下)
import java.util.Date;public class Message {private String id;private String msg;private Date sendTime;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}public Date getSendTime() {return sendTime;}public void setSendTime(Date sendTime) {this.sendTime = sendTime;}
}
(消息消费者,监听名为test的topic,打印消息内容)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
public class Consumer {@KafkaListener(topics = {"test"})public void listen(ConsumerRecord<?, ?> record){Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}
}
定义一个controller,手动触发,发送一个消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/kafka")
public class SendController {@Autowiredprivate Producer producer;@RequestMapping(value = "/send")public String send() {producer.send();return "{\"code\":0}";}
}
调用接口,往kafka中发送一个消息
可在kafka的可视化界面中,看到消息内容
控制台,可见消息消费者这边消费了消息,打印了消息内容
到这,kafka的简单使用就完成了。
实际开发中,可以在业务需要的地方发送消息到kafka中,如发送验证码、数据存入缓存、资源上传到OSS、生成静态资源文件等一些不需要实时进行的操作,可以发个消息到kafka,在消息消费者这边完成对应的业务逻辑。
总结
本文介绍了kafka在Spring Boot中的简单使用