java 集成kafka单机版 适配jdk1.8

文章目录

          • 一、环境分布
            • 1. 版本声明
            • 2. 依赖
            • 2. case测试
            • 2. case2测试

一、环境分布
1. 版本声明
linux服务器软件版本
jdk1.8
kafkakafka_2.13-2.4.0
注:建议版本和应用依赖的客户端版本依赖保持一致,如果需要更高版本,可以尝试

但是有一点,小伙伴们要记住:linux服务器的kafka版本向下兼容,但是,kafka的客户端版本不向下兼容,这一点很重要!

2. 依赖
    <!-- kafka连接 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>2.4.0</version></dependency>
2. case测试
package com.sinosoft.a;import kafka.consumer.ConsumerConfig;
import kafka.producer.ProducerConfig;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;public class KafkaDemo {public static void main(String[] args) throws InterruptedException {// 生产者示例providerDemo();// 消费者示例consumerDemo();}/*** 生产者示例*/public static void providerDemo() {Properties properties = new Properties();/*** kafka的服务地址*/properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.521.1314:9092");
//        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.521.1314:9092,192.168.17.137:9092");/*** 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:* acks = 0,生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且retries配置将不会生效(因为客户端通常不会知道任何故障)。* acks = 1,这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。* acks = all,这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。这相当于acks = -1设置*/properties.put(ProducerConfig.ACKS_CONFIG, "all");/*** 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。* 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改*/properties.put(ProducerConfig.RETRIES_CONFIG, 0);/*** 当有多条消息要被发送到同一分区时,生产者会把他们放到同一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。* 以下配置,当缓存数量达到16kb,就会触发网络请求,发送消息*/properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/*** 每条消息在缓存中的最长时间(单位ms),如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去*/properties.put(ProducerConfig.LINGER_MS_CONFIG, 50);/*** Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。* buffer.memory的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB。* 如果buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。* 所以“buffer.memory”参数需要结合实际业务情况压测,需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲。经过压测,调试出来一个合理值。*/properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);/*** key的序列化方式*/properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");/*** value序列化方式*/properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties);
//            for (int i = 0; i < 100; i++) {
//                String msg = "------Message " + i;
//                producer.send(new ProducerRecord<String, String>("mytest", msg));
//                System.out.println("Sent:" + msg);
//            }String msg = "------Message hello world!";// mytest 为topicproducer.send(new ProducerRecord<String, String>("mytest", msg));System.out.println("Sent:" + msg);} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}/*** 消费者示例* @throws InterruptedException*/public static void consumerDemo() throws InterruptedException {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.521.1314:9092");
//        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.136:9092,192.168.17.137:9092");// 每个消费者分配独立的组号props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");// 如果value合法,则自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 设置自动更新被消费消息的偏移量的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");// 设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");// 设置服务返回的最大数据量,这不是绝对最大值,如果提取的第一个非空分区中的第一条消息大于此值,则仍将返回该消息以确保使用者使用。此处设置5MBprops.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "5242880");// 设置服务返回的每个分区的最大数据量,此大小必须至少与服务器允许的最大消息大小(fetch.max.bytes)一样大,否则,生产者有可能发送大于消费者可以获取的消息。此处设置5MBprops.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5242880");/*** earliest,当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest,当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据* none,topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常*/props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// mytest 为topicconsumer.subscribe(Arrays.asList("mytest"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records){System.out.printf("------------------offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());System.out.println();}/*** 手动提交偏移量* 保证同一个consumer group中,下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。* 防止consumer莫名挂掉后,下次进行数据fetch时,不能从上次读到的数据开始读而导致Consumer消费的数据丢失*/consumer.commitSync();Thread.sleep(2000);}}}
2. case2测试
package com.sinosoft.b;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class kafkaConsumerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.521.1314:9092"); // 指向kafka集群的IP地址properties.put("group.id", "	group-1"); // Consumer分组IDproperties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000"); /* 自动确认offset的时间间隔 */properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("max.poll.records", "100");// max.poll.records条数据需要在在session.timeout.ms这个时间内处理完properties.put("fetch.min.bytes", "1");//server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。properties.put("fetch.wait.max.ms", "1000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Arrays.asList("xuhaitao")); // 设置消费的主题while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // 调用poll方法来轮循Kafka集群的消息,其中参数100是超时时间for (ConsumerRecord<String, String> record : records) {System.out.printf("offsetConsumer = %d, value = %s", record.offset(), record.value());System.out.println();}}}
}
package com.sinosoft.b;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.521.1314:9092");// 指向kafka集群的IP地址properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 100; i++) {String msg = "This is Message " + i;producer.send(new ProducerRecord<String, String>("xuhaitao", msg));System.out.println("Sent:" + msg);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519722.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务架构下,解决数据一致性问题的实践

随着业务的快速发展&#xff0c;应用单体架构暴露出代码可维护性差、容错率低、测试难度大和敏捷交付能力差等诸多问题&#xff0c;微服务应运而生。微服务的诞生一方面解决了上述问题&#xff0c;但是另一方面却引入新的问题&#xff0c;其中主要问题之一就是&#xff1a;如何…

2019阿里云开年Hi购季满返活动火热报名中!

2019阿里云云上采购季活动已经于2月25日正式开启&#xff0c;从已开放的活动页面来看&#xff0c;活动分为三个阶段&#xff1a; 2月25日-3月04日的活动报名阶段、3月04日-3月16日的新购满返5折抢购阶段、3月16日-3月31日的续费抽豪礼5折抢购阶段。 整个大促活动包含1个主会场…

mybatis中resultType取出数据顺序不一致解决方法

原来我的查询返回resultType “map” &#xff0c; 也就是这个map&#xff0c;打乱了顺序。因为map并不能保证存入取出数据一致。 解决方法&#xff1a;resultType "map" 改为 resultType"java.util.LinkedHashMap"

2019云计算高光时刻:乱云飞渡 传统IT大溃败

前言&#xff1a;2019年&#xff0c;物理机最后一张王牌也败给了云计算&#xff0c;无论从成本还是性能的角度&#xff0c;都没有不选云计算的理由&#xff0c;这是一个时代的终结。 2019的云计算市场格局&#xff0c;依旧是马太效应凸显、大者恒大的趋势继续&#xff0c;但在…

java 集成 kafka 0.8.2.1 适配jdk1.6

文章目录一、版本说明二、实战2.1. 依赖2.2. 生产者代码2.3. 消费端代码2.4. 测试三、小伙伴疑难解答3.1. 首先新建一个maven项目3.2. 把我的依赖和代码复制过去3.3. 把我写的case调试通3.4. 找到左边External Libraries3.5. jar处理3.6. 打开非maven项目&#xff0c;添加jar3.…

阿里云MWC 2019发布7款重磅产品,助力全球企业迈向智能化

当地时间2月25日&#xff0c;在巴塞罗那举行的MWC 2019上&#xff0c;阿里云面向全球发布了7款重磅产品&#xff0c;涵盖无服务器计算、高性能存储、全球网络、企业级数据库、大数据计算等主要云产品&#xff0c;可满足电子商务、物流、金融科技以及制造等各行业企业的数字化转…

Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel

自 Spring Cloud 官方宣布 Spring Cloud Netflix 进入维护状态后&#xff0c;我们开始制作《Spring Cloud Alibaba迁移指南》系列文章&#xff0c;向开发者提供更多的技术选型方案&#xff0c;并降低迁移过程中的技术难度。 第一篇&#xff0c;我们对Hystrix、Resilience4j 和…

util中注入service

Autowiredprivate GovCustomerService service;private static GovCustomerService govCustomerService;PostConstruct //完成对service的注入public void init() {govCustomerService service;}

linux环境安装 kafka 0.8.2.1 jdk1.6

文章目录一、环境分布二、实战1. kafka下载2. 解压3. 配置4. 编写启动脚本5. 编写关闭脚本6. 赋予脚本可执行权限7. 脚本使用案例三、Config配置四、Consumer配置五、Producer配置很多小伙伴问我&#xff0c;为什么不用最新版本的kafka呢&#xff1f;关于这个问题&#xff0c;都…

元旦限时特惠,耳机、书籍等大降价

戳蓝字“CSDN云计算”关注我们哦&#xff01;今天是12月31日离2020年仅有不到一天的时间你们的2019年目标都实现了吗&#xff1f;在这一年你写了多少行代码改了多少个bug呢&#xff1f;2020年的愿望是否也是希望自己写的代码bug能少一些&#xff1f;小编的2020年希望能买到更多…

深入解读MySQL8.0 新特性 :Crash Safe DDL

前言 在MySQL8.0之前的版本中&#xff0c;由于架构的原因&#xff0c;mysql在server层使用统一的frm文件来存储表元数据信息&#xff0c;这个信息能够被不同的存储引擎识别。而实际上innodb本身也存储有元数据信息。这给ddl带来了一定的挑战&#xff0c;因为这种架构无法做到d…

mysql查询包含字符串(模糊查询)

mysql查询包含字符串更高效率的方法一、LOCATE语句SELECT column from table where locate(‘keyword’, condition)>0二、或是 locate 的別名 positionSELECT column from table where position(‘keyword’ IN condition)三、INSTR语句SELECT column from table where ins…

ant编译web项目

文章目录1.下载ant2. 解压ant3. 配置an环境变量4. 验证二、编译项目2.1. 新建一个build.xml2.2. 编译项目测试1.下载ant 官网链接&#xff1a; https://ant.apache.org/srcdownload.cgi 2. 解压ant 3. 配置an环境变量 4. 验证 ant -v二、编译项目 2.1. 新建一个build.xml…

Spark in action on Kubernetes - Playground搭建与架构浅析

前言 Spark是非常流行的大数据处理引擎&#xff0c;数据科学家们使用Spark以及相关生态的大数据套件完成了大量又丰富场景的数据分析与挖掘。Spark目前已经逐渐成为了业界在数据处理领域的行业标准。但是Spark本身的设计更偏向使用静态的资源管理&#xff0c;虽然Spark也支持了…

阿里云发布时间序列数据库TSDB,关于时序你了解多少?

概要介绍 时间序列数据是一种表示物理设备&#xff0c;系统、应用过程或行为随时间变化的数据&#xff0c;广泛应用于物联网&#xff0c;工业物联网&#xff0c;基础运维系统等场景。阿里云TSDB 时间序列数据库可以解决大规模时序数据的可靠写入&#xff0c;降低数据存储成本&…

VMware宣布完成27亿美元收购Pivotal;日本成功研发出6G芯片:单载波速度高达100Gbps;联想手机再换新掌门……...

关注并标星星CSDN云计算 速递、最新、绝对有料。这里有企业新动、这里有业界要闻&#xff0c;打起十二分精神&#xff0c;紧跟fashion你可以的&#xff01;每周两次&#xff0c;打卡即read更快、更全了解泛云圈精彩newsgo go go【1月1日 星期三】云の声音5G医疗爆发箭在弦上&am…

mysql自定义排序以及优化like模糊查询

**1. 自定义排序函数FIELD()**SELECT id,username,city FROM sy_user order byFIELD(city,郑州, 开封, 平顶山,洛阳, 商丘, 安阳, 新乡, 许昌, 鹤壁, 焦作, 濮阳, 漯河, 三门峡, 周口,驻马店, 南阳, 信阳, 济源,省本部,河南) **2.使用 case when**SELECT id,username,city FR…

使用Logtail采集Kubernetes上挂载的NAS日志

采集k8s挂载Nas后的日志 该文档主要介绍使用logtail以两种不同的方式进行k8s挂载Nas后的日志采集。两种采集方式的实现原理是一样的&#xff0c;都是通过将Logtail和业务容器挂载到相同的NAS上&#xff0c;使Logtail和业务容器的日志数据共享&#xff0c;以此实现日志采集。下…

linux目录挂载

挂载前声明&#xff1a; 执行挂载后&#xff0c;源本地目录下的文件会不显示。 挂载前&#xff1a;需要提前将原目录下面的日志文件备份转移&#xff0c;挂载成功后&#xff0c;在转移到挂载的本地目录下面即可。操作流程如下&#xff1a; 1. 将/app/fis/xml中148G多的日志文件…

深度揭秘“蚂蚁双链通”

今年年初&#xff0c;蚂蚁金服ATEC城市峰会在上海举行。在ATEC区块链行业研讨会分论坛上&#xff0c;蚂蚁金服区块链高级产品专家杨俊带来了主题为《供应链金融&#xff0c;不止于金融&#xff1a;蚂蚁双链通——基于区块链的供应链协作网络》的精彩分享。 蚂蚁金服区块链高级产…