spring boot学习第八篇:kafka

目录

1、安装kafka

1.1确认jdk是否安装OK

1.2下载&&安装kafka

1.3验证kafka

2、连接kafka

3、在java中操作kafka


1、安装kafka

1.1确认jdk是否安装Ok

java -version

1.2下载&&安装kafka

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

mkdir zk-3.4.14
tar -xvzf zookeeper-3.4.14.tar.gz -C /home/lighthouse/zk-3.4.14

配置

进入该目录下的conf文件夹中。

zoo_sample.cfg是一个配置文件的样本,需要将这个文件复制并重命名为zoo.cfg:   

cp zoo_sample.cfg zoo.cfg

修改配置文件:

vi zoo.cfg

配置环境变量, 使用vim打开etc目录下的profile文件:vim /etc/profile

在末尾配置环境变量,这里需要写入的是:

export ZOOKEEPER_HOME=/home/lighthouse/zk-3.4.14/zookeeper-3.4.14

export PATH=$PATH:$ZOOKEEPER_HOME/bin

写入信息并保存后,需要使配置文件生效,所用的命令为:source /etc/profile

启动zookeeper, 由于配置了环境变量,可以在系统中的任意目录执行启动zookeeper的命令,其执行的实际上是zookeeper的bin文件夹中的zkServer.sh的命令:zkServer.sh start

Zookeeper启动成功:

下载kafka2.2.1:

wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz

解压:

tar -zxvf kafka_2.12-2.2.1.tgz

启动:

nohup bin/kafka-server-start.sh config/server.properties > output.txt &

其中server.properties文件内容如下:

1.3验证kafka

执行命令:bin/kafka-topics.sh –version

看不到版本号

2、连接kafka,并执行命令

2.1创建topic:执行命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.2查看topic:执行命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181 

bin/kafka-topics.sh --list --zookeeper 43.138.0.199:2181

2.3使用kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 bin/kafka-console-producer.sh --broker-list 43.138.0.199:9092 --topic test

2.4使用kafka-console-consumer.sh消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server 43.138.0.199:9092 --topic test --from-beginning

3、在java中操作kafka

pom.xml增加如下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.1</version></dependency></dependencies>

Producer.java代码如下:

package com.hmblogs.backend.util;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;import java.util.Properties;/************************************************************ @ClassName   : Producer.java****** @author      : milo ^ ^****** @date        : 2018 03 14 11:34****** @version     : v1.0.x*******************************************************/
public class Producer {static Logger log = Logger.getLogger(Producer.class);private static final String TOPIC = "test";private static final String BROKER_LIST = "43.138.0.199:9092";private static KafkaProducer<String,String> producer = null;/*初始化生产者*/static {Properties configs = initConfig();producer = new KafkaProducer<String, String>(configs);}/*初始化配置*/private static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}public static void main(String[] args) throws InterruptedException {//消息实体ProducerRecord<String , String> record = null;for (int i = 0; i < 3; i++) {record = new ProducerRecord<String, String>(TOPIC, "value"+(int)(10*(Math.random())));//发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (null != e){System.out.println("send error" + e.getMessage());}else {System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}producer.close();}
}

执行报错,如下:

send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time
send errorExpiring 3 record(s) for test-0: 30034 ms has passed since batch creation plus linger time

搜索资料,尝试解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/627240.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试Java岗老喜欢盯着JVM问,有那么多项目要调优吗?

面试Java岗老喜欢盯着JVM问&#xff0c;有那么多项目要调优吗&#xff1f; 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「Java的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给…

Qt vs开发将Graphics view提升 QChartview

1提升部件 2 添加Charts模块 3 在包含ui类的头文件添加QT_CHARTS_USE_NAMESPACE 或者添加 using namespace QtCharts #include <QtWidgets/QMainWindow> #include <QtCharts> #include "ui_mainwin.h" #include <qchartview.h> QT_CHARTS_USE_N…

【MATLAB源码-第114期】基于matlab的孔雀优化算法(POA)无人机三维路径规划,输出做短路径图和适应度曲线。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 POA&#xff08;孔雀优化算法&#xff09;是一种基于孔雀羽毛开屏行为启发的优化算法。这种算法模仿孔雀通过展开其色彩斑斓的尾羽来吸引雌性的自然行为。在算法中&#xff0c;每个孔雀代表一个潜在的解决方案&#xff0c;而…

Java爬虫爬取图片壁纸

Java爬虫 以sougou图片为例&#xff1a;https://pic.sogou.com/ JDK17、SpringBoot3.2.X、hutool5.8.24实现Java爬虫&#xff0c;爬取页面图片 项目介绍 开发工具&#xff1a;IDEA2023.2.5 JDK&#xff1a;Java17 SpringBoot&#xff1a;3.2.x 通过 SpringBoot 快速构建开发环境…

窗口虽小,功能良多:一篇文章带你玩转鸿蒙4的实况窗

在更新了鸿蒙4系统后&#xff0c;华为手机上多出了一个实况窗的功能。 实况窗功能可以实时显示部分应用的工作状态或者进度&#xff0c;比如外卖的进度、录音的时长等等&#xff0c;可以让我们在不打开手机应用的前提下也能了解到关键信息&#xff0c;极大地提升我们的生活体验…

闲鱼宝库亮相!闲鱼商品详情关键词搜索电商API接口助你畅享无尽好货!

随着互联网的快速发展&#xff0c;电商平台的崛起已经改变了人们的购物习惯。而在众多电商平台中&#xff0c;闲鱼作为一款社区二手交易平台&#xff0c;一直备受用户喜爱。如今&#xff0c;闲鱼宝库正式亮相&#xff0c;为用户带来了更加全面、详细的商品详情关键词搜索电商AP…

windows server 2019 云服务器看不见硬盘的解决方案

刚拿的windows server 服务器看不见硬盘&#xff0c;这是因为没有初始化数据盘的原因。 解决方案如下&#xff1a; 单击“服务器管理器”仪表盘。 弹出“服务器管理器”窗口&#xff0c;如图1所示。 “服务器管理器”页面右上方选择“工具 > 计算机管理”。 弹出“计算机管…

【转载】MyBatisCodeHelperPro最新版使用教程

在开发中编写生成bean&#xff0c;mapper&#xff0c;mapper.xml费时也费力&#xff0c;可以通过MyBatisCodeHelper-Pro自动生成bean&#xff0c;dao&#xff0c;mapper.xml等文件。 MyBatisCodeHelper-Pro是IDEA下的一个插件&#xff0c;类似于mybatis plugin&#xff0c;但可…

书生·浦语大模型实战营第四节课笔记及作业

XTuner 大模型单卡低成本微调实战 1 Finetune简介 大语言模型LLM是在海量的文本内容基础上&#xff0c;以无监督或半监督方式进行训练的。海量的文本内容赋予了大模型各种各样的行业知识。但是如果直接把大模型的知识用于生产实践&#xff0c;会发现回答不大满意。微调的目的…

adb wifi 远程调试 安卓手机 命令

使用adb wifi 模式调试需要满足以下前提条件&#xff1a; 手机 和 PC 需要在同一局域网下。手机需要开启开发者模式&#xff0c;然后打开 USB 调试模式。 具体操作步骤如下&#xff1a; 将安卓手机通过 USB 线连接到 PC。&#xff08;连接的时候&#xff0c;会弹出请求&#x…

零花钱也能设计精美网页!推荐几个免费设计素材站点!

UI设计师最怕什么&#xff1f; 没有创意&#xff0c;没有灵感&#xff0c;没有思路&#xff01; 在哪里可以得到idea&#xff1f;别担心&#xff0c;往下看&#xff01; 你知道网络有多大&#xff0c;你想要什么吗&#xff1f;今天&#xff0c;我想和大家分享一些宝藏网页设…

解决flask中jinja2插值变量变成字符串的办法

今天在通过使用{{ variable_name }}这种方式插入html内容时&#xff0c;发现变量内容到了页面中全部变成了字符串&#xff0c; python代码&#xff1a; return render_template(FilePath.file_path_to_page,md_contenthtml_content # 返回html内容 )html代码中插入&#xff1…

最简单爱心的解析

首先你需要了解爱心代码在直角坐标系的方程 数学知识&#xff1a;x 属于 -1.5 ~ 1.5 y 属于 -1 ~ 1.5 和 高中所学的线性规划 请看代码 #include <math.h> #include <stdlib.h> #include <Windows.h> #include <stdio.h> int main() { …

【REST2SQL】10 REST2SQL操作指南

【REST2SQL】01RDB关系型数据库REST初设计 【REST2SQL】02 GO连接Oracle数据库 【REST2SQL】03 GO读取JSON文件 【REST2SQL】04 REST2SQL第一版Oracle版实现 【REST2SQL】05 GO 操作 达梦 数据库 【REST2SQL】06 GO 跨包接口重构代码 【REST2SQL】07 GO 操作 Mysql 数据库 【RE…

谭浩强C语言程序设计习题-循环结构程序设计

最大公约数与最小公倍数 //辗转相除法求公约公倍 #include <stdio.h>int gcd(int a, int b) {return (a % b 0) ? b : gcd(b, a % b); }int main() {int m, n;scanf("%d %d", &m, &n); int ans gcd(m, n);printf("%d %d\n", ans, m * n…

ESP系列入门教程(四)——之MQTT通信实现设备反控【分别附上 ESP32 + ESP8266 的具体代码】

ESP系列入门教程<四> 概要技术名词简介● ESP系列简介● MQTT简介 硬件连接实现&#xff08;同教程2&#xff0c;没有变化&#xff09;代码实现●Demo&#xff1a;通过MQTT进行开关灯反控○ ESP8266代码○ ESP32代码 特别鸣谢 概要 最近在跟着几个大佬的教学视频做项目。…

02--数据定义语言DDL

1、数据定义语言DDL 1.1 操作数据库-DDL 创建数据库 create database 数据库名称; 创建数据库&#xff0c;并指定字符集 create database 数据库名称 character set 字符集名; 查询所有数据库的名称 show databases; 查询某个数据库的字符集:查询某个数据库的创建语句及字…

微服务介绍

背景 微服务是什么?杜克大学教授DanAriely说过一段非常出名的话&#xff0c;用来表述Big Data的发展现状。我觉得把这句话放到微服务身上也极其贴切。 Micro-services is like teenage sex: Everyone talks about it, nobody really knows how to do it, everyo ne thinks ev…

excel(vab)删除空行

删除第一、二、三列位空的所有行&#xff08;8000)行范围以内 代码如下&#xff1a; Sub Macro1()Dim hang As Integer For hang 8000 To 1 Step -1If Sheet1.Cells(hang, 1) "" And Sheet1.Cells(hang, 2) "" And Sheet1.Cells(hang, 3) "&quo…

系统性学习vue-vue中的ajax

vue中的ajax 配置代理常用发送Ajax请求方式跨域方式一方式二 vue-resource插槽默认插槽具名插槽作用域插槽 配置代理 常用发送Ajax请求方式 xhr new XMLHttpRequest() 在真正开发中不常用&#xff0c;比较麻烦jQuery 封装了xhraxios 封装了xhr 与jQuery相比优势是&#xff1a…