kafka安装使用

版本:kafka_2.11-0.10.1.0  (之前安装2.10-0.10.0.0,一直出问题)

 

  • 安装
  • Springboot结合Kafka的使用

 

安装

  1. 下载并解压代码
    wget http://mirrors.cnnic.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
    #http://kafka.apache.org/downloadstar -zxvf kafka_2.10-0.10.0.0.tgz
    cd kafka_2.10-0.10.0.0

     

  2. 修改每个broker安装目录下的配置文件
    # $targetID默认是0,每个broker的broker.id必须要唯一
    broker.id=$targetID#默认是注释的,$IP改成当前节点的IP即可。如果不改该配置项,在节点通过命令行可以收发消息,而在其他机器是无法通过IP去访问队列的
    #在之前的版本不叫listeners,而是advertised.host.name和host.name
    listeners=PLAINTEXT://$IP:9092

     

  3. 启动服务
    #kafka依赖于zookeeper
    #如果没有的话,可以通过kafka提供的脚本快速创建一个单节点zookeeper实例:
    #bin/zookeeper-server-start.sh config/zookeeper.properties#确认zookeeper服务已经启动后,启动kafka服务
    nohup bin/kafka-server-start.sh config/server.properties &

     

  4. 创建一个名为test,有一份备份,一个分区的topic
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    #查看所有topicbin/kafka-topics.sh --list --zookeeper localhost:2181

     

  5. 发送消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

     

  6. 开启一个消费者接收消息
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

     

  7. 查看topic信息
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

     

Springboot结合Kafka的使用 

1.在pom文件添加依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

 

2.在application.properties中添加配置

# APACHE KAFKA (KafkaProperties)
spring.kafka.bootstrap-servers=192.168.0.155:9092,192.168.0.156:9092
spring.kafka.client-id=K1
spring.kafka.consumer.auto-offset-reset= earliest spring.kafka.consumer.enable-auto-commit= true spring.kafka.consumer.group-id= test-consumer-group
spring.kafka.producer.batch-size=2 spring.kafka.producer.bootstrap-servers= 192.168.0.155:9092,192.168.0.156:9092 spring.kafka.producer.client-id= P1 spring.kafka.producer.retries=3 spring.kafka.template.default-topic= test

 

3.创建消费者类(订阅消息的对象)

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ListenerBean {@KafkaListener(topics = "myTopic")public void processMessage(String content) {System.out.println("you have a new message:" + content);// ...
    }
}

 

4.创建生产者类(发布消息的对象)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;@Component
@RestController
@RequestMapping("/send")
@EnableAutoConfiguration
public class SendMsgBean {private final KafkaTemplate<String,String> kafkaTemplate;@Autowiredpublic SendMsgBean(KafkaTemplate<String,String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(path="/{msg}",method=RequestMethod.GET)public String send(@PathVariable("msg") String msg) {System.out.println("==sending msg:" + msg);kafkaTemplate.send("test","test-"+msg);return "message has been sent!";}
}

 

 

只需这4步,就可以在springboot中使用kafka了,现在我们访问 http://localhost:8080/send/mymessage  就可以在控制台看到信息了。

源码下载

 

参考:

  • Kafka producer程序本地运行时发送信息失败解决方案

转载于:https://www.cnblogs.com/TiestoRay/p/6394602.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/541771.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php获取上传文件路径 fakepath,JavaScript_js获取上传文件的绝对路径实现方法,在html中input type=file - phpStudy...

js获取上传文件的绝对路径实现方法在html中function upload() {var filename document.getElementById("importFile").value;// 这时的filename不是 importFile 框中的值alert(filename);}如上面的代码&#xff0c;用文件上传对话框选择文件后&#xff0c;如果选择&…

在Bootstrap中使用类的按钮类型

Bootstrap has 7 different types of buttons with contextual classes from which we can create buttons easily by using these classes (.btn-default, .btn-success, .btn-danger, .btn-primary, .btn-info, .btn-warning, .btn-link). Bootstrap具有上下文类型的 7种不同…

php json encode中文乱码,php json_encode中文乱码如何解决

php encode中文乱码的解决办法&#xff1a;首先打开相应的PHP文件&#xff1b;然后使用正则语句“preg_replace("#\\\u([0-9a-f]{4})#ie","iconv(UCS-2BE, UTF-8...)”将编码替换成中文即可。本文列举3个方法&#xff0c;实现json_encode()后的string显示中文问…

乡村图景(转载)

转自: http://cul.qq.com/a/20160205/046437.htm 我丈夫家在湖北孝感孝昌县的一个村子。2005年第一次过年回到他家&#xff0c;印象最深的就是嫂子。我暗自问当时的男友&#xff0c;“哥哥尽管算不上特别帅气&#xff0c;但为何找了这么难看的嫂子&#xff1f;”后来才发现&…

stl向量最大值_C ++ STL中向量的最小和最大元素

stl向量最大值Given a vector and we have to find the smallest (minimum) and largest (maximum) elements. 给定一个向量&#xff0c;我们必须找到最小(最小)和最大(最大)元素。 查找向量的最小和最大元素 (Finding vectors minimum & maximum elements) To find minim…

oracle如何设置备份计划任务,Oracle数据库设置任务计划备份一周的备份记录

Oracle 数据库备份&#xff1a;--保留最近一周的备份记录&#xff1b;正文&#xff1a;开始代码如下:echo 设置备份文件存放文件夹...set "tbufE:\Cway\backup"echo 设置备份文件名(以星期几命名&#xff0c;即备份文件只保存最近一周)...set name%date%set name%nam…

索引(转载自百度百科)

Oracle索引 编辑本词条缺少信息栏、名片图&#xff0c;补充相关内容使词条更完整&#xff0c;还能快速升级&#xff0c;赶紧来编辑吧&#xff01;在oracle索引是一种供服务器在表中快速查找一个行的数据库结构。合理使用索引能够大大提高数据库的运行效率。目录 1 概念及作用 2…

阿姆斯特朗数_阿姆斯特朗的功能依赖公理 数据库管理系统

阿姆斯特朗数Armstrong axioms are a complete set of inference rules or axioms, introduced and developed by William W. Armstrong in 1974. The inference rules are sound which is used to test logical inferences of functional dependencies. The axiom which also …

ORACLE JOB 失败 查看,Oracle JOB异常中断原因分析

注释今天研发同事找我确认 PKG_WMS.proc_TaskMain 存储的 job 是否还在运行&#xff0c;竟发现 dba_jobs.NEXT_DATE4000/1/1&#xff0c;如下看看究竟原因吧~JOB 信息&#xff1a;参数&#xff1a;BROKEN : 中断标记 ,N 启动、Y 中断 --> DBMS_JOBS.BROKEN(job_id,TRUE/FA…

ruby打印_Ruby程序打印一个数字的乘法表

ruby打印打印乘法表 (Printing multiplication table) This requires a very simple logic where we only have to multiply the number with digits from 1 to 10. This can be implemented by putting the multiplication statement inside a loop. We have mentioned two wa…

步骤1:JMeter 录制脚本接口测试

JMeter 常用测试方法简介 1.下载安装 http://jmeter.apache.org/download_jmeter.cgi 安装JDK&#xff0c;配置环境变量JAVA_HOME. 系统要求&#xff1a;JMeter2.11 需要JDK1.6以上的版本支持运行 2.学习Jmeter元件 http://jmeter.apache.org/usermanual/component_reference.h…

模拟断电oracle数据不一致,Oracle数据库案例整理-Oracle系统运行时故障-断电导致数据文件状态变为RECOVER...

1.1 现象描述异常断电&#xff0c;数据库数据文件的状态由ONLINE变为RECOVER。系统显示如下信息&#xff1a;SQL> select file_name ,tablespace_name ,online_status from dba_data_files;FILE_NAME---------------------------------------------------------------…

python日历模块_Python日历模块| prmonth()方法与示例

python日历模块Python calendar.prmonth()方法 (Python calendar.prmonth() Method) prmonth() method is an inbuilt method of the calendar module in Python. It works on simple text calendars and prints the calendar of the given month of the given year. Also, the…

多例模式

多例&#xff1a;只是单例的一种延伸 不必过于在意各种模式的名字&#xff0c;重要的是学会融会贯通&#xff0c;把生产的car放到集合中 类似JDBC 的连接池 把连接对象放到池中 多例模式特点&#xff1a; 1. 多例类可以有多个实例 2. 多例类必须自己创建自己的实例&a…

Oracle public view,【易错概念】以太坊Solidity函数的external/internal,public/private,view/pure/payable区别...

1. 函数类型&#xff1a;内部(internal)函数和外部(external)函数函数类型是一种表示函数的类型。可以将一个函数赋值给另一个函数类型的变量&#xff0c;也可以将一个函数作为参数进行传递&#xff0c;还能在函数调用中返回函数类型变量。 函数类型有两类&#xff1a;- 内部(i…

c-style字符字符串_C字符串-能力问题与解答

c-style字符字符串C programming String Aptitude Questions and Answers: In this section you will find C Aptitude Questions and Answers on Strings, String is the set of characters and String related Aptitude Questions and Answers you will find here. C编程Stri…

PHP Smarty template for website

/******************************************************************************* PHP Smarty template for website* 说明&#xff1a;* 之前一直在想将MVC的方式加在PHP做的网站上&#xff0c;这样比较好处理&#xff0c;相对来说比较好* 处理…

ftp连接oracle服务器,使用SSL加密连接FTP - 架建SSL安全加密的FTP服务器(图)_服务器应用_Linux公社-Linux系统门户网站...

四、使用SSL加密连接FTP启用Serv-U服务器的SSL功能后&#xff0c;就可以利用此功能安全传输数据了&#xff0c;但FTP客户端程序必须支持SSL功能才行。 如果我们直接使用IE浏览器进行登录则会出现图4显示的错误信息&#xff0c;一方面是以为没有修改默认的端口21为990&#xff0…

c# 情感倾向_C否则-能力倾向问题与解答

c# 情感倾向C programming if else Aptitude Questions and Answers: In this section you will find C Aptitude Questions and Answers on condition statements – if else, nested if else, ladder if else, conditional operators etc. C语言编程如果有问题&#xff0c;请…

springboot中使用缓存shiro-ehcache

在pom.xml中注入缓存依赖&#xff0c;版本(Sep 09, 2016)spring-context-support 包含支持UI模版&#xff08;Velocity&#xff0c;FreeMarker&#xff0c;JasperReports&#xff09;&#xff0c; 邮件服务&#xff0c; 脚本服务(JRuby)&#xff0c; 缓存Cache&#xff08;EHCa…