文章目录
- 今日内容
- 1 Kafka
- 1.1 消息中间件对比
- 1.2 kafka介绍
- 1.3 kafka安装及配置
- 1.4 kafka案例
- 1.4.1 导入kafka客户端
- 1.4.2 编写生产者消费者
- 1.4.3 启动测试
- 1.4.4 多消费者启动
- 1.5 kafka分区机制
- 1.5.1 topic剖析
- 1.6 kafka高可用设计
- 1.7 kafka生产者详解
- 1.7.1 同步发送
- 1.7.2 异步发送
- 1.7.3 参数详解
- 1.7.3.1 ack
- 1.7.3.2 retries
- 1.7.3.3 消息压缩
- 1.8 kafka消费者详解
- 1.8.1 消费者组
- 1.8.2 消息有序性
- 1.8.3 提交和偏移量
- 1.8.3.1 同步提交
- 1.8.3.2 异步提交
- 1.8.3.3 同步异步混合提交
- 1.9 Spring集成kafka
- 1.9.1 导入依赖
- 1.9.2 创建配置文件
- 1.9.3 创建生产者
- 1.9.4 创建消费者
- 1.9.5 启动类
- 1.9.6 测试
- 1.10 kafka传递对象
- 1.10.1 创建User
- 1.10.2 添加User的发送和接收
- 2 自媒体文章上下架
- 2.1 接口定义
- 2.2 Controller
- 2.3 Service
- 2.4 通知Article修改文章配置
- 2.4.1 导入kafka依赖
- 2.4.2 在Nacos中配置kafka的生产者
- 2.4.3 自媒体通知Article
- 2.4.4 在Nacos中配置kafka的消费者
- 2.4.5 配置ap_article_config表
- 2.4.6 article端监听
- 2.4.7 测试
今日内容
1 Kafka
1.1 消息中间件对比
1.2 kafka介绍
1.3 kafka安装及配置
- Docker安装zookeeper
拉取镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- Docker安装kafka
下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.204.129 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.204.129:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.129:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.204.129 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.204.129:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.129:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
-p 9092:9092 wurstmeister/kafka:2.12-2.3.1
-p 9092:9092做端口映射
1.4 kafka案例
1.4.1 导入kafka客户端
在heima-leadnews-test模块中创建kafka-demo的模块
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
1.4.2 编写生产者消费者
创建com.heima.kafka.sample包
下面两个类ConsumerQuickStart和ProducerQuickStart类
生产者:
package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);/*** 第一个参数:topic 第二个参数:key 第三个参数:value*///封装发送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功producer.close();}}
消费者:
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.129:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
1.4.3 启动测试
消费者成功收到消息
1.4.4 多消费者启动
同一个组下只能有一个消费者的收到消息
如果想一对多,则需要将消费者放在不同组中
1.5 kafka分区机制
1.5.1 topic剖析
ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001",0,"hello kafka");
在发送消息时可以指定分区partition
1.6 kafka高可用设计
1.7 kafka生产者详解
1.7.1 同步发送
/*** 第一个参数:topic 第二个参数:key 第三个参数:value*/
//封装发送的消息
ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka");//3.发送消息
//producer.send(record);//3.1 同步发送消息
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("同步发送消息结果:topic="+recordMetadata.topic()+",partition="+recordMetadata.partition()+",offset="+recordMetadata.offset());
发送结果:
同步发送消息结果:topic=topic-first,partition=0,offset=1
1.7.2 异步发送
//3.2 异步发送消息
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e!=null){e.printStackTrace();}else{System.out.println("异步发送消息结果:topic="+recordMetadata.topic()+",partition="+recordMetadata.partition()+",offset="+recordMetadata.offset());}}
});
发送结果:
异步发送消息结果:topic=topic-first,partition=0,offset=2
1.7.3 参数详解
1.7.3.1 ack
1.7.3.2 retries
1.7.3.3 消息压缩
1.8 kafka消费者详解
1.8.1 消费者组
1.8.2 消息有序性
1.8.3 提交和偏移量
手动提交
//手动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
1.8.3.1 同步提交
把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}}
}
1.8.3.2 异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API :commitAsync()。
while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);}}});
}
1.8.3.3 同步异步混合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。
相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try {while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){+e.printStackTrace();System.out.println("记录错误信息:"+e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}
1.9 Spring集成kafka
1.9.1 导入依赖
在kafka-demo中导入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
1.9.2 创建配置文件
在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.9.3 创建生产者
创建com.heima.kafka.controller.HelloController类,负责发送消息
@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello() {kafkaTemplate.send("itcast-topic", "hello kafka");return "success";}
}
1.9.4 创建消费者
建com.heima.kafka.listener.HelloListener类,负责监听消息
@Component
public class HelloListener {@KafkaListener(topics = "itcast-topic")public void listen(String message) {if(!StringUtils.isEmpty(message)) {System.out.println("message = " + message);}}
}
1.9.5 启动类
@SpringBootApplication
public class KafkaAppication {public static void main(String[] args) {SpringApplication.run(KafkaAppication.class, args);}
}
1.9.6 测试
打开localhost:9991/hello
已经接收到消息
1.10 kafka传递对象
1.10.1 创建User
创建com.heima.kafka.pojo.User
@Data
public class User {private String username;private Integer age;
}
1.10.2 添加User的发送和接收
使用fastjson进行转换
Controller:
@GetMapping("/user")
public String user() {User user = new User();user.setUsername("zhangsan");user.setAge(20);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "success";
}
Listener:
@KafkaListener(topics = "user-topic")
public void listenUser(String message) {if(!StringUtils.isEmpty(message)) {User user = JSON.parseObject(message, User.class);System.out.println(user);}
}
2 自媒体文章上下架
2.1 接口定义
2.2 Controller
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto wmNewsDto){return wmNewsService.downOrUp(wmNewsDto);
}
2.3 Service
接口
ResponseResult downOrUp(WmNewsDto wmNewsDto);
实现
@Override
public ResponseResult downOrUp(WmNewsDto wmNewsDto) {// 1.参数检查if(wmNewsDto.getId()==null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章id不能为空");}// 2.查询文章WmNews wmNews = getById(wmNewsDto.getId());if(wmNews == null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}// 3.修改文章状态if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"只有已发布的文章才能上下架");}if(wmNewsDto.getEnable()==null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"enable不能为空");}wmNews.setEnable(wmNewsDto.getEnable());updateById(wmNews);// 4.返回结果return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
2.4 通知Article修改文章配置
2.4.1 导入kafka依赖
在heima-leadnews-common模块下导入kafka依赖
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
2.4.2 在Nacos中配置kafka的生产者
在自媒体端的nacos配置中心配置kafka的生产者,在heima-leadnews-wemedia下的配置文件中配置kafka
spring:kafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
2.4.3 自媒体通知Article
创建com.heima.common.constants.mNewsMessageConstants常量类,保存kafka的topic.
public class WmNewsMessageConstants {public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
注入kafka
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
发送消息,通知article端修改文章配置
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){Map<String,Object> map = new HashMap<>();map.put("articleId",wmNews.getArticleId());map.put("enable",dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}
2.4.4 在Nacos中配置kafka的消费者
在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: 192.168.204.129:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.4.5 配置ap_article_config表
因为需要修改ap_article_config,所以需要创建对应service和mapper
创捷Service,com.heima.article.service.ApArticleConfigService接口
public interface ApArticleConfigService extends IService<ApArticleConfig> {/*** 修改文章配置* @param map*/public void updateByMap(Map map);
}
实现
@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {/*** 修改文章配置* @param map*/@Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}//修改文章配置update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));}
}
2.4.6 article端监听
在article端编写监听,接收数据
@Component
@Slf4j
public class ArtilceIsDownListener {@Autowiredprivate ApArticleConfigService apArticleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}
}
2.4.7 测试
启动相应启动类
打开自媒体管理界面,准备下架这个新闻
下架该文件,发现两张表都已经修改,完美进行下架
说明我们kafka的消息传递已经成功。