kafka命令及启动

默认内网访问,要在外网访问的话,需要在修改config/server.properties中的配置

将listeners和advertised.listeners的值用主机名进行替换,在外用使用java进行生产者或消费者连接的时候,不填写具体的IP,填写安装kafka的主机名,然后,在hosts目录中,配置该主机名对应的真是IP地址即可;

以下命令都是摘抄与官网http://kafka.apache.org/quickstart

先启动zookeeper,默认自带的

bin/zookeeper-server-start.sh config/zookeeper.properties

然后启动kafka服务

bin/kafka-server-start.sh config/server.properties

列举拥有哪些topics

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

在服务器上打开一个生产者,然后把输入的每行数据发送到kafka中的命令

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#后面光标提示数据数据,然后回车就会发送到kafka中了

打开一个消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

当有数据往kafka的test主题发送消息,这边就会进行消费。

java调用作为生产者和消费者代码:

项目需要引入的依赖pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.theorydance</groupId><artifactId>kafkademo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>kafkademo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.1.1</version></dependency></dependencies>
</project>

生产者代码ProducerDemo.java

package com.theorydance.kafkademo;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerDemo {public static void main(String[] args){Properties properties = new Properties();properties.put("bootstrap.servers", "node125:9092");properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 100; i++) {String msg = "This is Message " + i;producer.send(new ProducerRecord<String, String>("HelloWorld", msg));System.out.println("Sent:" + msg);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
}

消费者代码ConsumerDemo.java

package com.theorydance.kafkademo;import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;public class ConsumerDemo {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put("bootstrap.servers", "node125:9092");properties.put("group.id", "group-1");properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");properties    .put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);while(true){Map<String, List<PartitionInfo>> maps = kafkaConsumer.listTopics();System.out.println("监听topics="+maps.keySet());Set<String> sets = new HashSet<>();for (String topic : maps.keySet()) {if(topic.startsWith("Hello")){ // 制定规则,监听哪一些的topicsets.add(topic);}}kafkaConsumer.subscribe(sets);long startTime = System.currentTimeMillis();while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, value = %s, topic = %s", record.offset(), record.value(), record.topic());System.out.println("=====================>");}long endTime = System.currentTimeMillis();if(endTime - startTime > 30000){System.out.println("------------------------------------------------------------------");break;}}}}
}

说明:在实际需求中,我需要收集在不同服务器上的日志(微服务相同模块和不同模块,或其他程序的日志),采用的是flume进行收集,希望能够对收集的日志进行分类(区别是哪个程序产生的),去网上找了一下,在flume进行收集的时候,能不能在日志前面加上应用的标识进行区别,我没有找到,如果有,看到该博客的同行,请不吝赐教。我这边就换了种思路,就像前面我写的消费者示例一样,不同的程序日志,我往不同的topic中进行发送消息,在消费者监听一定规则的topic,然后进行消费,这样就可以区分不同的应用程序的日志了。

 

转载于:https://www.cnblogs.com/TheoryDance/p/11183181.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/550150.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编程判断元素归类_编程练习-判断是否为易混淆数

题目来源&#xff1a;LeetCode给定一个数字 N&#xff0c;当它满足以下条件的时候返回 true&#xff1a;把原数字旋转180以后得到新的数字。如 0, 1, 6, 8, 9 旋转 180 以后&#xff0c;得到了新的数字 0, 1, 9, 8, 6 。2, 3, 4, 5, 7 旋转 180 后,得到的不是数字。易混淆数字 …

vue引入51la流量监控

main.js router.afterEach( ( to, from, next ) > {setTimeout(()>{var _hmt _hmt || [];(function() {//每次执行前&#xff0c;先移除上次插入的代码document.getElementById(51_la) && document.getElementById(51_la).remove();var hm document.createEle…

斯蒂夫乔布斯传 读后感

斯蒂夫乔布斯传 读后感 失败和命运&#xff0c;成长和感受刺激着乔布斯成长。他和普通人一样。但是&#xff0c;不一样的环境&#xff0c;不一样的职位&#xff0c;不一样的体验给了他不一样的生命。所以,慢慢成长吧。当你的经历足够支撑你的命运时&#xff0c;你会站到你想要的…

cad画直角命令_炸了,CAD fro命令配合tk命令,极轴追踪无敌!

文尾左下角阅读原文看视频教程好课推荐&#xff1a;零基础CAD&#xff1a;点我CAD室内&#xff1a;点我 周站长CAD&#xff1a;点我CAD机械&#xff1a;点我 Bim教程&#xff1a;点我CAD建筑&#xff1a;点我CAD三维&#xff1a;点我全屋定制&#xff1a;点我 ps教程&#xff1…

linux frp 内网穿透 nginx反向代理

wget https://github.com/fatedier/frp/releases/download/v0.20.0/frp_0.20.0_linux_amd64.tar.gz frps.ini 服务端 有外网IP服务器 frpc.ini 客户端 局域网服务器 服务端: vi frps.ini [common] bind_port 8700 token asdasdasdasdasdasdasd vi start_frp_server.s…

[转载]Tensorflow 的reduce_sum()函数的axis,keep_dim这些参数到底是什么意思?

转载链接&#xff1a;https://www.zhihu.com/question/51325408/answer/125426642来源&#xff1a;知乎 这个问题无外乎有三个难点&#xff1a; 什么是sum什么是reduce什么是维度(indices, 现在均改为了axis和numpy等包一致)sum很简单&#xff0c;就是求和&#xff0c;那么问题…

多多进宝推广团队_多多进宝推广形式及推手寻找、佣金结算方式是怎样的?

多多进宝是拼多多下的一款推广工具&#xff0c;主要是针对于想要提高自己店铺产品的销量以及获取更多流量的拼多多商家来展开的&#xff01;但是&#xff0c;目前为止还有很多拼多多商家对多多进宝的一个运用不太了解&#xff0c;以至于有一些拼多多商家在使用多多进宝时有很大…

rtsp 通过 浏览器播放

安装docker ffmpeg https://blog.csdn.net/m1f2c3/article/details/93624289 docker pull jrottenberg/ffmpeg docker run -it --entrypointbash jrottenberg/ffmpeg docker start ... 下载jsmpeg npm install -g ws npm install ws npm install http-server -g 进入j…

025 程序的循环结构

目录 一、概述二、遍历循环三、遍历循环的应用3.1 计数循环(N次)3.2 计数循环(特定次)3.3 字符串遍历循环3.4 列表遍历循环3.5 文件遍历循环四、无限循环五、循环控制保留字5.1 break 和 continue5.1.1 for5.1.2 while六、循环的高级用法6.1 循环的扩展6.1.1 for6.1.2 while七、…

灰度值怎么降级_微服务生态的灰度发布如何实现?

前言相信很多小伙伴们都听说过灰度发布&#xff0c;但是不一定知道如何实现&#xff1f;今天我们就介绍一下基本原理&#xff0c;以及提供代码实现给小伙伴们。灰度概念即原来的生产环境是1.0版本&#xff0c;那现在我们需要升级到2.0版本&#xff0c;但是我们需要验证2.0版本&…

wget抓取网站, 模拟手机端抓取

nohup wget --mirror -p --tries100 --convert-links -P . –user-agent"Mozilla/5.0 (iPad; U; CPU OS 3_2 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Version/4.0.4 Mobile/7B334b Safari/531.21.10" http://网址 >1.log 2>&1 …

在电脑上显示未知发布者怎么办_笔记本电脑显示器花屏怎么办?电脑屏幕花屏的解决方法...

笔记本电脑显示器花屏怎么办&#xff1f;笔记本电脑屏幕花屏该如何解决呢&#xff1f;近日有用户反映在使用笔记本电脑时候&#xff0c;会出现花屏的问题&#xff0c;下面就给大家介绍具体解决方法。一&#xff1a;检查显示器与显卡的连线是否松动若显示屏花屏的话&#xff0c;…

php 网页转pdf

linux 安装 wkhtmltopdf 下载地址 https://wkhtmltopdf.org/downloads.html 服务器是centos7 wget https://github.com/wkhtmltopdf/packaging/releases/download/0.12.6-1/wkhtmltox-0.12.6-1.centos7.x86_64.rpm 下载有点慢 rpm -ivh wkhtmltox-0.12.6-1.centos7.x86_6…

deluser命令

deluser命令 2012年07月03日 ⁄ 系统管理 ⁄ 暂无评论 1.功能作用 从系统中删除一个用户或组 2.位置 /usr/sbin/deluser 3.格式用法 deluser [options] USER deluser USER GROUP 4.主要参数 --quiet | -q 不将进程信息发给 stdout --help | -h 帮助信息 …

mongodb 字段出现次数_MongoDB数据库

内容回顾Xpath选择器不要求记忆&#xff0c;只要混个眼熟即可基于openpyxl模块爬取豆瓣电影单页爬取多页爬取1.校验请求头里面是否有User-Agent参数 请求头里面加上即可2.限制IP规定时间内的访问次数 1.人为的加上时间延迟 在你的程序里面加上time.sleep()让你的程序间歇一段时…

相似三角形·中考

概述相似&#xff0c;主要是相似三角形&#xff0c;在中考中有举足轻重的地位&#xff0c;难度也较高&#xff0c;往往倒三题中至少有一题是圆和相似的结合相似常常和四边形、反比例函数、圆、二次函数等结合&#xff0c;十分灵活 比例性质 概念若$\displaystyle \frac{a}{b}\f…

php 对接 北向数据接口 socket

function encode($msgType, $timeStamp, $body) {return "\xFF\xFF".pack(CNn, $msgType, $timeStamp, strlen($body)).$body;}$connection->send(encode(1, time(), reqLoginAlarm;userxxx;keyxxx;typexxx)); 详情参考: 请教使用socket做为客户端和北向接口对接&…

编译后没有taget文件夹_maven资源文件的相关配置才会在编译后的target里面有

建Maven项目的时候&#xff0c;如果没有进行特殊的配置&#xff0c;Maven会按照标准的目录结构查找和处理各种类型文件。src/main/java和src/test/java这两个目录中的所有*.java文件会分别在comile和test-comiple阶段被编译&#xff0c;编译结果分别放到了target/classes和targ…

配置gunicorn、 nginx、supervisor

1、建立引导&#xff0c;参考博文&#xff1a;https://www.cnblogs.com/wxzbk/p/10335859.html2、启动测试gunicorn -b 0.0.0.0:8080 run:app #run是引导文件&#xff0c;APP是模块名3、配置nginx1.跳转&#xff1a;cd /etc/nginx2.跳转&#xff1a;cd sites-available3.创建配…

添加icon_在zotero中添加百度学术、中国知网的文章检索引擎

方法关于添加文章检索引擎的方法&#xff0c;首先是参考官方文档&#xff1a;locate [Zotero Documentation]。里面提到有一个官方随时更新的engines.json文件&#xff0c;下载下来&#xff0c;保存到相应的文件夹就可以了。另外官方文档里还提供了一张检索引擎列表&#xff0c…