kafka java编程demo_Kafka简单客户端编程实例

今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。

一、创建配置类Config

这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS

package com.lya.kafka;

/**

* 配置项

* @author liuyazhuang

*

*/

public class Config {

/**

* 话题

*/

public static final String TOPIC = "wordcount";

/**

* 线程数

*/

public static final Integer THREADS = 1;

}

二、编程生产者类ProducerDemo

这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

/**

* 生产者实例

* @author liuyazhuang

*

*/

public class ProducerDemo {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put("zk.connect", "192.168.209.121:2181");

props.put("metadata.broker.list","192.168.209.121:9092");

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("zk.connectiontimeout.ms", "15000");

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

// 发送业务消息

// 读取文件 读取内存数据库 读socket端口

for (int i = 1; i <= 100; i++) {

Thread.sleep(500);

producer.send(new KeyedMessage(Config.TOPIC,

"this number ===>>> " + i));

}

}

}

三、编写消息者类ConsumerDemo

这个类的主要作用就是消费Kafka中wordcount话题的消息。

package com.lya.kafka;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

/**

* 消费者实例

* @author liuyazhuang

*

*/

public class ConsumerDemo {

public static void main(String[] args) {

Properties props = new Properties();

props.put("zookeeper.connect", "192.168.209.121:2181");

props.put("group.id", "1111");

props.put("auto.offset.reset", "smallest");

props.put("zk.connectiontimeout.ms", "15000");

ConsumerConfig config = new ConsumerConfig(props);

ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);

Map topicCountMap = new HashMap();

topicCountMap.put(Config.TOPIC, Config.THREADS);

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(Config.TOPIC);

for(final KafkaStream kafkaStream : streams){

new Thread(new Runnable() {

@Override

public void run() {

for(MessageAndMetadata mm : kafkaStream){

String msg = new String(mm.message());

System.out.println(msg);

}

}

}).start();

}

}

}

四、运行实例

首先,运行消费者类ConsumerDemo

运行结果如下:

37afe700aba87b547f0b5bf40a828cb6.png

没有打印任何信息。

此时,我们运行生产者类ProducerDemo

我们再次打开消费者的控制台查看如下:

d636c6f7f921352861b10cb626f2aa05.png

打印出了生产者生产的消息。

至此,Kafka简单客户端编程实例结束。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/339733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java我的世界极限生存_我的世界 1.7.10 极限生存整合包

整合包介绍&#xff1a;最近总有人觉得Minecraft很无聊&#xff0c;没有什么可玩的&#xff0c;或者觉得生存太简单 那么就来试试这个吧&#xff0c;全部是增强怪物的MOD&#xff0c;保证不无聊&#xff0c;保证不简单 基本上没有增加一些新的东西&#xff0c;只增加了几种怪物…

具有InlfuxDB的Spring Boot和Micrometer第1部分:基础项目

对于那些关注此博客的人来说&#xff0c;难怪我倾向于大量使用InfluxDB。 我喜欢这样一个事实&#xff0c;它是一个真正的单一用途的数据库&#xff08;时间序列&#xff09;&#xff0c;具有许多功能&#xff0c;并且还带有企业支持。 Spring也是我选择的工具之一。 因此&…

PIT,JUnit 5和Gradle –仅需额外的一行配置

在Gradle&#xff08;带有gradle-pitest-plugin 1.4.7&#xff09;中发现简单&#xff0c;经过改进的PIT和JUnit 5配置。 不可否认&#xff0c;如今JUnit 5越来越受欢迎。 虽然为JUnit 5提供了一个专用于PIT的插件&#xff0c;并且gradle-pitest-plugin支持了很多年&#xff0…

apache camel_使用WildFly 8在Java EE7中自举Apache Camel

apache camel从Camel版本2.10开始&#xff0c;支持CDI&#xff08;JSR-299&#xff09;和DI&#xff08;JSR-330&#xff09;。 这为在Java EE容器中以及在独立的Java SE或CDI容器中开发和部署Apache Camel项目提供了新的机会。 是时候尝试一下并熟悉它了。 骆驼到底是什么&am…

Hibernate中保存与持久性以及saveOrUpdate之间的区别

保存与保存或更新与持久保存在Hibernate中 save和saveOrUpdate之间的区别是什么或save和persist之间的区别是任何Hibernate面试中常见的面试问题&#xff0c;就像Hibernate中get和load方法之间的区别一样。 Hibernate Session类提供了几种方法&#xff0c;可以通过诸如save&am…

java中的语句有哪些_java中的循环语句有哪些

Java中有三种主要的循环结构&#xff1a;while 循环do…while 循环for 循环顺序结构的程序语句只能被执行一次。如果您想要同样的操作执行多次,&#xff0c;就需要使用循环结构。一、while循环语法&#xff1a;while( 布尔表达式 ) {     //循环内容   }只要符合布尔表达…

php无法新数据类型,新手入门PHP必知的七种数据类型

想要入门PHP&#xff0c;首先要学会搭建环境&#xff0c;其次是学习基础语法。PHP的基础包括数据类型&#xff0c;运算符&#xff0c;变量和常量等。在这篇文章中&#xff0c;我们主要了解什么是数据类型。数据类型是指同种数据的一个统称&#xff0c;一般会描述为XX数据类型。…

攻防世界web高手进阶php_rce,php_rce 攻防世界xctf web

php_rce首先了解ThinkPHP5.x rec 漏洞分析与复现https://blog.csdn.net/qq_40884727/article/details/101452478var_pathinfo的默认配置为s,我们可以通过$_GET[‘s’]来传参于是构造payloadhttp://111.198.29.45:30600/index.php?sindex/\think\App/invokefunction&functi…

具有InlfuxDB的Spring Boot和Micrometer第2部分:添加InfluxDB

自从我们添加了基本应用程序以来&#xff0c;是时候启动InfluxDB实例了。 我们将按照之前的教程进行操作&#xff0c;并添加一个docker实例。 docker run –rm -p 8086&#xff1a;8086 –name influxdb-本地influxdb 是时候在我们的pom上添加微米InfluxDB依赖项了 < dep…

使用比较器的nulls对具有null值的列表进行排序

你好朋友&#xff0c; 在本教程中&#xff0c;我们将看到如何使用Java 8 Comparator.nullsFirst在列表中的项目很少为空时如何对项目列表进行排序&#xff0c;以便将null视为列表中的最小元素。 –什么是比较器 – nullsFirst方法在Comparator中做什么 –排序具有非空名称的…

Jar Hell变得轻松–用jHades揭开类路径的神秘面纱

Java开发人员将不得不面对的最困难的问题是类路径错误&#xff1a; ClassNotFoundException &#xff0c; NoClassDefFoundError &#xff0c;Jar Hell&#xff0c; Xerces Hell和公司。 在本文中&#xff0c;我们将探究这些问题的根本原因&#xff0c;并了解最小的工具&#…

分度器中硒定位器的完整指南(示例)

在测试网站的功能时&#xff0c;特别是Web元素&#xff08;例如单选按钮&#xff0c;文本框&#xff0c;下拉列表等&#xff09;&#xff0c;您需要确保能够访问这些元素。 Selenium定位器正是出于这个目的&#xff0c;通过使用此命令&#xff0c;我们可以识别这些Web元素DOM&a…

wildfly管理控制台_WildFly 9 –别希望您的控制台像这样!

wildfly管理控制台每个人都可能听到这个消息。 周一发布了第一个WildFly 9.0.0.Alpha1版本。 您可以从wildfly.org网站上下载它&#xff0c;最大的变化是它是由一个新的功能配置工具构建的&#xff0c;该工具位于现在单独的核心发行版上&#xff0c;还包含一个新的Servlet发行版…

azure mysql sql,UiPath连接Azure Sql Server数据库

一、创建数据库在Azure中创建SQL数据库image更改防火墙设置&#xff0c;并设置客户端IP访问规则image二、安装数据源驱动在本地安装数据源驱动程序&#xff0c;保证可以正常接入到远程的数据库。如果不安装驱动程序&#xff0c;则会出现以下报错&#xff1a;[Microsoft][ODBC D…

linux 误删除mysql表能恢复吗,Linux误删数据恢复

引子指在键上飘&#xff0c;难免会湿手套。当你按下shiftdel键后&#xff0c;会不会突然心里凉透&#xff0c;当你执行rm -rf后&#xff0c;会不会马上去搜索哪个国家入境不需要签证。或者你还会遇到如下的情况&#xff1a;root4xem7:~# aliasalias cdrm -rfalias ddocker数据恢…

Apache Camel 3.1 –更多骆驼核心优化(第3部分)

我以前曾在博客中介绍过我们在下一个Camel 3.1版本中所做的优化 博客第1部分 博客第2部分 今天&#xff0c;我想简短介绍一下我们已经完成的最新开发&#xff0c;因为我们准备在本周末或下半年准备好构建和发布Camel 3.1。 从第2部分开始&#xff0c;我们设法在路由过程中将…

jvm jinfo 参数_jinfo:JVM运行时配置的命令行浏览

jvm jinfo 参数在最近的一些博客中&#xff08;特别是在对Java EE 7性能调优和优化以及WildFly性能调优的书中的评论中&#xff09;&#xff0c;我引用了自己过去在某些Oracle JDK命令行工具上的博客文章。 令我震惊的是&#xff0c;我从来没有专门解决过漂亮的jinfo工具&#…

49自动化测试中最常见的硒异常

开发人员将始终在编写代码时牢记不同的场景&#xff0c;但是在某些情况下&#xff0c;实现可能无法按预期工作。 相同的原则也适用于测试代码&#xff0c;该代码主要用于测试现有产品的功能&#xff0c;发现错误以及使产品100&#xff05;不受错误影响。 正确地说&#xff0c;…

鹰式价差matlab,鹰式期权:什么叫铁鹰式期权组合,蝶式价差期权?

蝶式期权套利 是利用 交割月份的价差进行 套期获利&#xff0c; 个方向相 反、 共享居中交割月份合约的跨期套利组成。是一种期权策略&#xff0c;风险有限&#xff0c;盈利也有限&#xff0c;是由一手牛市套利和一手熊市套利组合而成的。铁鹰式期权组合是牛市看跌价差期权组合…

angular8 rest_带有Angular JS的Java EE 7 – CRUD,REST,验证–第2部分

angular8 rest这是Angular JS承诺的Java EE 7的后续版本–第1部分 。 花了比我预期更长的时间&#xff08;找到时间来准备代码和博客文章&#xff09;&#xff0c;但是终于到了&#xff01; 应用程序 第1部分中的原始应用程序只是带有分页的简单列表&#xff0c;以及提供列表数…