JavaWeb_LeadNews_Day6-Kafka

JavaWeb_LeadNews_Day6-Kafka

  • Kafka
    • 概述
    • 安装配置
    • kafka入门
    • kafka高可用方案
    • kafka详解
      • 生产者同步异步发送消息
      • 生产者参数配置
      • 消费者同步异步提交偏移量
    • SpringBoot集成kafka
  • 自媒体文章上下架
    • 实现思路
    • 具体实现
  • 来源
  • Gitee

Kafka

概述

  • 对比
  • 选择
  • 介绍
    • producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
    • topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
    • consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
    • broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息

安装配置

  • 安装zookeeper
    // 下载zookeeper镜像
    docker pull zookeeper:3.4.14
    // 创建容器
    docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    
  • 安装kafka
    // 下载kafka镜像
    docker pull wurstmeister/kafka:2.12-2.3.1
    // 创建容器
    docker run -d --name kafka \
    --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \
    --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \
    --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \
    --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
    --net=host wurstmeister/kafka:2.12-2.3.1// 解释
    --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
    

kafka入门

  • 依赖
    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
    </dependency>
    
  • Producer
    public class ProducerQuickStart {public static void main(String[] args) {// 1. kafka链接配置信息Properties prop = new Properties();// 1.1 kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建kafka生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 3. 发送信息// 参数列表: topic, key, valueProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");producer.send(record);// 4. 关闭消息通道// 必须关闭, 否则消息发送bucgproducer.close();}
    }
    
  • Consumer
    public class ConsumerQuickStart {public static void main(String[] args) {// 1. kafka的配置信息Properties prop = new Properties();// 1.1 链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");// 1.2 key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 1.3 设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 2. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 3. 订阅主题consumer.subscribe(Collections.singleton("topic-first"));// 4. 拉取信息while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}}
    }
    
  • 总结
    • 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组

kafka高可用方案

  • 集群

  • 备份

    kafka定义了两类副本:

    • 领导者副本
    • 追随者副本

    数据在领导者副本存储后, 会同步到追随者副本

    同步方式
    leader失效后, 选择leader的原则

    1. 优先从ISR中选取, 因为ISR的数据和leader是同步的.
    2. ISR中的follower都不行了, 就从其他的follower中选取.
    3. 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.

kafka详解

生产者同步异步发送消息

// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());// 异步发送
producer.send(record, new Callback(){@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null) {System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});

生产者参数配置

  • 消息确认
    确认机制说明
    acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
    acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
    acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
    prop.put(ProducerConfig.ACKS_CONFIG, "all");
    
  • 消息重传
    设置消息重传次数, 默认每次重试之间等待100ms
    prop.put(ProducerConfig.RETRIES_CONFIG, 10);
    
  • 消息压缩
    默认情况, 消息发送不会压缩
    使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在
    压缩算法说明
    snappy占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用
    lz4占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观
    gzip占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
    prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    

消费者同步异步提交偏移量

// 同步提交偏移量
consumer.commitSync();// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);}}
});// 同步异步提交
try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());System.out.println(record.partition());System.out.println(record.offset());}// 异步提交偏移量consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();System.out.println("记录错误的信息:"+e);
}finally {// 同步consumer.commitSync();
}

SpringBoot集成kafka

  • 依赖
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
    </dependency>
    
  • 配置
    server:port: 9991
    spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
  • Producer
    @RestController
    public class HelloController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("itcast-topic", "黑马程序员");return "ok";}
    }
    
  • Consumer
    @Component
    public class HelloListener {@KafkaListener(topics = "itcast-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
    }
    
  • 传递对象
    // Producer
    User user = new User();
    user.setName("tom");
    user.setAge(18);
    kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));// Consumer
    System.out.println(JSON.parseObject(message, User.class));
    

自媒体文章上下架

实现思路

具体实现

  • Producer
    public ResponseResult downOrUp(WmNewsDto dto) {// 1. 检验参数// 1.0 检查文章dto是否为空if(dto == null){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");}// 1.1 检查文章上架参数是否合法if(dto.getEnable() != 0 && dto.getEnabl!= 1){// 默认上架dto.setEnable((short) 1);}// 2. 查询文章WmNews news = getById(dto.getId());if(news == null){return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");}// 3. 查询文章状态if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");}// 4. 上下架news.setEnable(dto.getEnable());updateById(new// 5. 发送消息, 通知article修改文章的配置if(news.getArticleId() != null){HashMap<String, Object> map = HashMap<>();map.put("articleId", news.getArtic());map.put("enable", news.getEnable());kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS);
    }
    
  • Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{if(StringUtils.isNotBlank(message)){Map map = JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);}
}// Service
public void updateByMap(Map map) {// 0 下架, 1 上架Object enable = map.get("enable");boolean isDown = true;if(enable.equals(1)){isDown = false;}// 修改文章update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).set(ApArticleConfig::getIsDown, isDown));
}

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/45045.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

寻路算法小游戏

寻路算法小demo 寻路算法有两种&#xff0c;一种是dfs 深度优先算法&#xff0c;一种是 dfs 深度优先算法 深度优先搜索的步骤分为 1.递归下去 2.回溯上来。顾名思义&#xff0c;深度优先&#xff0c;则是以深度为准则&#xff0c;先一条路走到底&#xff0c;直到达到目标。这…

矩形重叠问题

矩形重叠 文章目录 题目描述解题思路方法一方法二 题目描述 矩形以列表 [x1, y1, x2, y2] 的形式表示&#xff0c;其中 (x1, y1) 为左下角的坐标&#xff0c;(x2, y2) 是右上角的坐标。矩形的上下边平行于 x 轴&#xff0c;左右边平行于 y 轴。 如果相交的面积为 正 &#xff0…

linux tomcat server.xml 项目访问路径变更不生效

如果想改成默认的127.0.0.1:8080 访问项目 先确定更改的作用文件 server.xml 的 host:appBase 标签 默认找到appBase webapps 下的war包&#xff0c;并解压&#xff0c;解压后的appname为访问路径 也就变成了 127.0.0.1:8080/appname host:Context:path 标签 appBase的 优先…

深入探索:Kali Linux 网络安全之旅

目录 前言 访问官方网站 导航到下载页面 启动后界面操作 前言 "Kali" 可能指的是 Kali Linux&#xff0c;它是一种基于 Debian 的 Linux 发行版&#xff0c;专门用于渗透测试、网络安全评估、数字取证和相关的安全任务。Kali Linux 旨在提供一系列用于测试网络和…

菜鸟Vue教程 - 实现带国际化的注册登陆页面

初接触vue的时候觉得vue好难&#xff0c;因为项目中要用到&#xff0c;就硬着头皮上&#xff0c;慢慢的发现也不难&#xff0c;无外乎画个布局&#xff0c;然后通过样式调整界面。在通过属性和方法跟js交互。js就和我们写的java代码差不多了&#xff0c;复杂一点的就是引用这种…

Python数据分析实战-多线程并发处理列表(附源码和实现效果)

实现功能 Python数据分析实战-多线程并发处理列表 实现代码 import threading有15个列表&#xff0c;尝试多进程并发处理&#xff0c;每个列表一个进程&#xff0c;进程数和 CPU 核数一致def sum_list(lst):return sum(lst)if __name__ __main__:lists [[1,2,3], [4,5,6], …

BDA初级分析——SQL清洗和整理数据

一、数据处理 数据处理之类型转换 字符格式与数值格式存储的数据&#xff0c;同样是进行大小排序&#xff0c; 会有什么区别&#xff1f; 以rev为例&#xff0c;看看字符格式与数值格式存储时&#xff0c;排序会有什么区别&#xff1f; 用cast as转换为字符后进行排序 SEL…

mysql+jdbc+servlet+java实现的学生在校疫情信息打卡系统

摘 要 I Abstract II 主 要 符 号 表 i 1 绪论 1 1.1 研究背景 1 1.2 研究目的与意义 2 1.3 国内外的研究情况 2 1.4 研究内容 2 2 系统的开发方法和关键技术 4 2.1 开发方法 4 2.1.1 结构化开发方法 4 2.1.2 面向对象方法 4 2.2 开发技术 4 2.2.1 小程序开发MINA框架 4 2.2.2 …

快速搭建图书商城小程序的简易流程与优势

很多人喜欢阅读电子书&#xff0c;又有很多人依旧喜欢实体书&#xff0c;而实体书店拥有一个图书商城小程序便成为了满足用户需求的理想选择。如果您也想进入这一充满潜力的领域&#xff0c;但担心开发难度和复杂流程&#xff0c;别担心&#xff01;您能做到快速搭建一个专业、…

【令牌桶算法与漏桶算法】

&#x1f4a7; 令牌桶算法与漏桶算法 \color{#FF1493}{令牌桶算法与漏桶算法} 令牌桶算法与漏桶算法&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&#x1f390; &#x1f433; 《数据结构与算法》专…

2023年Java核心技术面试第五篇(篇篇万字精讲)

目录 十 . HashMap&#xff0c;ConcurrentHashMap源码解析 10.1 HashMap 的源码解析&#xff1a; 10.1.1数据结构&#xff1a; 10.1.2哈希算法&#xff1a; 10.1.3解决哈希冲突&#xff1a; 10.1.4扩容机制&#xff1a; 10.1.5如何使用 HashMap&#xff1a; 10.2 HashMap 关注…

开源后台管理系统Geekplus Admin

本系统采用前后端分离开发模式&#xff0c;后端采用springboot开发技术栈&#xff0c;mybatis持久层框架&#xff0c;redis缓存&#xff0c;shiro认证授权框架&#xff0c;freemarker模版在线生成代码&#xff0c;websocket消息推送等&#xff0c;后台管理包含用户管理&#xf…

一例Vague病毒的分析

这是一例通过U盘传播的文件夹病毒&#xff0c;有收集用户文件的行为&#xff0c;但是&#xff0c;没有回传和远控行为&#xff0c;有点奇怪&#xff0c;其中的字符串进行了加密。 样本比较简单&#xff0c;使用IDA很容易就看明白了。 根据匹配到威胁情报&#xff0c;有叫Vague蠕…

【AI】文心一言的使用

一、获得内测资格&#xff1a; 1、点击网页链接申请&#xff1a;https://yiyan.baidu.com/ 2、点击加入体验&#xff0c;等待通过 二、获得AI伙伴内测名额 1、收到短信通知&#xff0c;点击链接 网页Link&#xff1a;https://chat.baidu.com/page/launch.html?fa&sourc…

人工智能在网络安全中的作用:当前的局限性和未来的可能性

人工智能 (AI) 激发了网络安全行业的想象力&#xff0c;有可能彻底改变安全和 IT 团队处理网络危机、漏洞和勒索软件攻击的方式。 然而&#xff0c;对人工智能的能力和局限性的现实理解至关重要&#xff0c;并且存在许多挑战阻碍人工智能对网络安全产生直接的变革性影响。 在…

【ES6】—使用 const 声明

一、不属于顶层对象window 使用const关键字 声明的变量&#xff0c;不会挂载到window属性上 const a 5 console.log(a) console.log(window.a) // 5 // undefined二、不允许重复声明 使用const关键字不允许重复声明相同的变量 cosnt a 5 cosnt a 6 // Uncaught SyntaxEr…

Postman如何做接口测试:什么?postman 还可以做压力测试?

我们都知道&#xff0c; postman 是一款很好用的接口测试工具。不过 postman 还可以做简单的压力测试&#xff0c;而且步骤只需要 2 步。 首先&#xff0c;打开 postman, 编写接口的请求参数。 然后&#xff0c;点击右下方的 runner 运行器&#xff0c;把需要测试的接口拖动到…

Linux实用运维脚本分享

Linux实用运维脚本分享&#x1f343; MySQL备份 目录备份 PING查询 磁盘IO检查 性能相关 进程相关 javadump.sh 常用工具安装 常用lib库安装 系统检查脚本 sed进阶 MySQL备份 #!/bin/bashset -eUSER"backup" PASSWORD"backup" # 数据库数据目录…

学习网络编程No.3【socket理论实战】

引言&#xff1a; 北京时间&#xff1a;2023/8/12/15:32&#xff0c;自前天晚上更新完文章&#xff0c;看了一下鹅厂新出的《扫毒3》摆烂至现在&#xff0c;不知道是长大了&#xff0c;还是近年港片就那样&#xff0c;给我的感觉不是很好&#xff0c;也可能是国内市场对港片不…

Vue 项目搭建

环境配置 1. 安装node.js 官网&#xff1a;nodejs&#xff08;推荐 v10 以上&#xff09; 官网&#xff1a;npm 是什么&#xff1f; 由于vue的安装与创建依赖node.js&#xff08;JavaScript的运行环境&#xff09;里的npm&#xff08;包管理和分发工具&#xff09;&#xff…