java 集成 kafka 0.8.2.1 适配jdk1.6

文章目录

          • 一、版本说明
          • 二、实战
            • 2.1. 依赖
            • 2.2. 生产者代码
            • 2.3. 消费端代码
            • 2.4. 测试
          • 三、小伙伴疑难解答
            • 3.1. 首先新建一个maven项目
            • 3.2. 把我的依赖和代码复制过去
            • 3.3. 把我写的case调试通
            • 3.4. 找到左边External Libraries
            • 3.5. jar处理
            • 3.6. 打开非maven项目,添加jar
            • 3.7. 等待项目编译
          • 四、 项目jar和引入的jar冲突建议
            • 4.1. 定位问题
            • 4.2. 分析问题
          • 五、解决问题
            • 5.1. 解决问题场景
            • 5.2. 方案1
            • 5.2. 方案2
            • 5.2. 方案3

一、版本说明
linux服务器环境软件版本
jdkjdk-8u144-linux-x64.tar.gz
kafkakafka_2.9.2-0.8.2.1.tgz
应用服务器软件版本
jdkjdk1.6.0_24
kafkakafka_2.9.2-0.8.2.1
二、实战
2.1. 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.2</artifactId><version>0.8.2.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.15</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion><exclusion><artifactId>mail</artifactId><groupId>javax.mail</groupId></exclusion></exclusions></dependency>
2.2. 生产者代码
package com.sinosoft.d;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;/*** kafka生产端** @author gblfy* @date 2020-08-07** Kafka生产者测试* http://kafka.apache.org/documentation.html#introduction* http://blog.csdn.net/hmsiwtv/article/details/46960053*/
public class KafkaProducetest {private final Producer<String, String> producer;public final static String TOPIC = "clicki_info_topic";private KafkaProducetest() {Properties props = new Properties();//此处配置的是kafka的端口props.put("metadata.broker.list", "10.5.6.19:9092");//配置value的序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");//配置key的序列化类props.put("key.serializer.class", "kafka.serializer.StringEncoder");//0表示不确认主服务器是否收到消息,马上返回,低延迟但最弱的持久性,数据可能会丢失//1表示确认主服务器收到消息后才返回,持久性稍强,可是如果主服务器死掉,从服务器数据尚未同步,数据可能会丢失//-1表示确认所有服务器都收到数据,完美!props.put("request.required.acks", "-1");//异步生产,批量存入缓存后再发到服务器去props.put("producer.type", "async");//填充配置,初始化生产者producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 1000;final int COUNT = 2000;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message " + key;String data1 = "{\"c\":0,\"i\":16114765323924126,\"n\":\"http://www.abbo.cn/clicki.html\",\"s\":0,\"sid\":0,\"t\":\"info_url\",\"tid\":0,\"unix\":0,\"viewId\":0}";// 发送消息//            producer.send(new KeyedMessage<String, String>(TOPIC,data1));// 消息类型key:valueproducer.send(new KeyedMessage<String, String>(TOPIC, key, data));System.out.println(data);messageNo++;}producer.close();//必须关闭}public static void main(String[] args) {new KafkaProducetest().produce();}
}
2.3. 消费端代码
package com.sinosoft.d;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;/*** kafka生产端** @author gblfy* @date 2020-08-07* <p>* Kafka消费者测试*/
public class KafkaConsumertest {private final ConsumerConnector consumer;private KafkaConsumertest() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "10.5.6.19:2181");//group 代表一个消费组,加入组里面,消息只能被该组的一个消费者消费//如果所有消费者在一个组内,就是传统的队列模式,排队拿消息//如果所有的消费者都不在同一个组内,就是发布-订阅模式,消息广播给所有组//如果介于两者之间,那么广播的消息在组内也是要排队的props.put("group.id", "jd-group");//zk连接超时props.put("zookeeper.session.timeout.ms", "4000");//ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率/** 此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.* largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.* */props.put("auto.offset.reset", "smallest");  //消费最老消息,最新为largest//序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {// 描述读取哪个topic,需要几个线程读Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducetest.TOPIC, new Integer(1));/* 默认消费时的数据是byte[]形式的,可以传入String编码器*/StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);//消费数据时每个Topic有多个线程在读,所以取List第一个流KafkaStream<String, String> stream = consumerMap.get(KafkaProducetest.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {System.out.println(it.next().topic() + ":" + it.next().partition() + ":" + it.next().offset() + ":" + it.next().key() + ":" + it.next().message());}}public static void main(String[] args) {new KafkaConsumertest().consume();}
}
2.4. 测试

先启动消费端,在启动生产端
在这里插入图片描述
在这里插入图片描述

三、小伙伴疑难解答

有的小伙伴问我,我们的工程师非maven项目该怎么办呢?
我给这个小伙伴的建议是以下几点:

3.1. 首先新建一个maven项目
3.2. 把我的依赖和代码复制过去
3.3. 把我写的case调试通
3.4. 找到左边External Libraries

在这里插入图片描述

3.5. jar处理

在本地的maven仓库中把这些以来的jar复制出来,建议先发到一个空的文件夹里面,建议和我一样
在这里插入图片描述

3.6. 打开非maven项目,添加jar

打开你的非maven项目,把这些jar添加进去
在这里插入图片描述
在这里插入图片描述

3.7. 等待项目编译
四、 项目jar和引入的jar冲突建议

关于这个问题,很多小伙伴应该也遇到过很正常,首先要保证引入的jar不能和以前的jar产生冲突?
那又该怎么办?
有的小伙伴说,把以前和本次引入jar冲突的jar删除呗!要三思

4.1. 定位问题

首先定位引入的jar和项目中的哪一个jar发生冲突

4.2. 分析问题

冲突的原因是什么?
1.引用的对象的包路径一样并且对象名也一样
2.二个冲突的kar项目中加入都存在,但是,代码中引入jar的优先级问题,引入的jar非自己需要的jar,而自己需要的jar默认不会引入,导致代码报错
3.版本问题,jar向下不兼容

五、解决问题
5.1. 解决问题场景

首先解决问题要基于场景:

我引入的jar,只有我自己用,但是,我的代码中默认引入的jar非我需要的,但是,把项目中以前的jar(冲突的jar)删除,代码就不报错。

5.2. 方案1

这种场景有3种解决方案
方案1(风险可控):如果,可以确保删除以前的jar不会项目的其他功能造成影响,可以考虑删除以前旧的jar

5.2. 方案2

方案2(风险不可控):如果,不能评估删除以前jar的风险范围,建议换一种方式,完成你的需求

5.2. 方案3

方案3(风险不可控):这种方案在方案2的基础上做的,找到刚引入冲突jar的源码,把需要的类和api(方法),单独复制出来,这样也可以解决,但是,需要小伙伴具备阅读源码的能力和调试的时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云MWC 2019发布7款重磅产品,助力全球企业迈向智能化

当地时间2月25日&#xff0c;在巴塞罗那举行的MWC 2019上&#xff0c;阿里云面向全球发布了7款重磅产品&#xff0c;涵盖无服务器计算、高性能存储、全球网络、企业级数据库、大数据计算等主要云产品&#xff0c;可满足电子商务、物流、金融科技以及制造等各行业企业的数字化转…

linux环境安装 kafka 0.8.2.1 jdk1.6

文章目录一、环境分布二、实战1. kafka下载2. 解压3. 配置4. 编写启动脚本5. 编写关闭脚本6. 赋予脚本可执行权限7. 脚本使用案例三、Config配置四、Consumer配置五、Producer配置很多小伙伴问我&#xff0c;为什么不用最新版本的kafka呢&#xff1f;关于这个问题&#xff0c;都…

元旦限时特惠,耳机、书籍等大降价

戳蓝字“CSDN云计算”关注我们哦&#xff01;今天是12月31日离2020年仅有不到一天的时间你们的2019年目标都实现了吗&#xff1f;在这一年你写了多少行代码改了多少个bug呢&#xff1f;2020年的愿望是否也是希望自己写的代码bug能少一些&#xff1f;小编的2020年希望能买到更多…

ant编译web项目

文章目录1.下载ant2. 解压ant3. 配置an环境变量4. 验证二、编译项目2.1. 新建一个build.xml2.2. 编译项目测试1.下载ant 官网链接&#xff1a; https://ant.apache.org/srcdownload.cgi 2. 解压ant 3. 配置an环境变量 4. 验证 ant -v二、编译项目 2.1. 新建一个build.xml…

Spark in action on Kubernetes - Playground搭建与架构浅析

前言 Spark是非常流行的大数据处理引擎&#xff0c;数据科学家们使用Spark以及相关生态的大数据套件完成了大量又丰富场景的数据分析与挖掘。Spark目前已经逐渐成为了业界在数据处理领域的行业标准。但是Spark本身的设计更偏向使用静态的资源管理&#xff0c;虽然Spark也支持了…

阿里云发布时间序列数据库TSDB,关于时序你了解多少?

概要介绍 时间序列数据是一种表示物理设备&#xff0c;系统、应用过程或行为随时间变化的数据&#xff0c;广泛应用于物联网&#xff0c;工业物联网&#xff0c;基础运维系统等场景。阿里云TSDB 时间序列数据库可以解决大规模时序数据的可靠写入&#xff0c;降低数据存储成本&…

VMware宣布完成27亿美元收购Pivotal;日本成功研发出6G芯片:单载波速度高达100Gbps;联想手机再换新掌门……...

关注并标星星CSDN云计算 速递、最新、绝对有料。这里有企业新动、这里有业界要闻&#xff0c;打起十二分精神&#xff0c;紧跟fashion你可以的&#xff01;每周两次&#xff0c;打卡即read更快、更全了解泛云圈精彩newsgo go go【1月1日 星期三】云の声音5G医疗爆发箭在弦上&am…

使用Logtail采集Kubernetes上挂载的NAS日志

采集k8s挂载Nas后的日志 该文档主要介绍使用logtail以两种不同的方式进行k8s挂载Nas后的日志采集。两种采集方式的实现原理是一样的&#xff0c;都是通过将Logtail和业务容器挂载到相同的NAS上&#xff0c;使Logtail和业务容器的日志数据共享&#xff0c;以此实现日志采集。下…

linux目录挂载

挂载前声明&#xff1a; 执行挂载后&#xff0c;源本地目录下的文件会不显示。 挂载前&#xff1a;需要提前将原目录下面的日志文件备份转移&#xff0c;挂载成功后&#xff0c;在转移到挂载的本地目录下面即可。操作流程如下&#xff1a; 1. 将/app/fis/xml中148G多的日志文件…

深度揭秘“蚂蚁双链通”

今年年初&#xff0c;蚂蚁金服ATEC城市峰会在上海举行。在ATEC区块链行业研讨会分论坛上&#xff0c;蚂蚁金服区块链高级产品专家杨俊带来了主题为《供应链金融&#xff0c;不止于金融&#xff1a;蚂蚁双链通——基于区块链的供应链协作网络》的精彩分享。 蚂蚁金服区块链高级产…

@程序员,不要瞎努力!比起熬夜更可怕的是“熬日”!

最近&#xff0c;笔者在经常后台看到小伙伴留言在问&#xff0c;想学Python&#xff0c;但不知道如何入门&#xff1f;其实对于这个问题&#xff0c;真是仁者见仁智者见智。有句老话说的好“一千个读者&#xff0c;就有一千个哈姆雷特”不过对于此疑惑&#xff0c;笔者就想直接…

配置nginx作为静态资源服务器 css,js,image等资源直接访问

1.传统的web项目&#xff0c;一般都将静态资源存放在 webroot 的目录下&#xff0c;这样做很方便获取静态资源&#xff0c;但是如果说web项目很大&#xff0c;用户很多&#xff0c;静态资源也很多时&#xff0c;服务器的性能 或许就会很低下了。这种情况下一般都会需要一个静态…

分布式事务中间件 Fescar - 全局写排它锁解读

前言 一般&#xff0c;数据库事务的隔离级别会被设置成 读已提交&#xff0c;已满足业务需求&#xff0c;这样对应在Fescar中的分支&#xff08;本地&#xff09;事务的隔离级别就是 读已提交&#xff0c;那么Fescar中对于全局事务的隔离级别又是什么呢&#xff1f;如果认真阅…

云+X案例展 | 民生类:京东云突破数据中心光互联瓶颈

本案例由京东云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。随着数字化的进程&#…

UI2Code智能生成Flutter代码--整体设计篇

背景: 随着移动互联网时代的到来&#xff0c;人类的科学技术突飞猛进。然而软件工程师们依旧需要花费大量精力在重复的还原UI视觉稿的工作。 UI视觉研发拥有明显的特征&#xff1a;组件&#xff0c;位置和布局&#xff0c;符合机器学习处理范畴。能否通过机器视觉和深度学习等手…

如何成为优秀的技术主管?你要做到这三点

阿里妹导读&#xff1a;技术主管&#xff0c;又叫「技术经理」&#xff0c;英文一般是 Tech Leader &#xff0c;简称 TL。随着工作经验的不断积累&#xff0c;能力的不断提升&#xff0c;每个人都有机会成为Team Leader。然而在机会到来前&#xff0c;我们必须提前做好准备&am…

达摩院2020十大科技趋势发布:云成IT技术创新中心

2020年第一个工作日&#xff0c;“达摩院2020十大科技趋势”发布。这是继2019年之后&#xff0c;阿里巴巴达摩院第二次预测年度科技趋势。 回望2019年的科技领域&#xff0c;静水流深之下仍有暗潮涌动。AI芯片崛起、智能城市诞生、5G催生全新应用场景……达摩院去年预测的科技…

SpringBoot Mybatisplus 多数据源使用

文章目录一、mybatisplus3.x1. 依赖2. 启动类添加注解3. 添加多数据源注解4. yml5. 测试类6. 源码地址为了适配新的需求&#xff0c;需要同时支持mysql和oracle数据库操作多数据源&#xff0c;因此项目中集成dynamic-datasource-spring-boot-starter,支持很多场景。 例如&#…

数据流被污染?数据质量不高?蚂蚁金服数据资产管理平台了解一下

今年年初&#xff0c;蚂蚁金服ATEC城市峰会在上海举办。金融智能专场分论坛上&#xff0c;蚂蚁金服数据平台部高级数据技术专家李俊华做了主题为《蚂蚁金服数据治理之数据质量治理实践》的精彩分享。 演讲中&#xff0c;李俊华介绍了蚂蚁金服数据架构体系的免疫系统——数据质…

努力≠上进!那些“熬夜”持续精进的人有多可怕!

经常听到一些同学说&#xff1a;某个公司薪资上调30-50%&#xff0c;我可以跳槽入吗&#xff1f;最近收到几个比较好的offer&#xff01;该去哪家&#xff1f;纠结&#xff01;目前岗位和环境对自己成长非常慢&#xff01;更看不到公司的前景特别迷茫&#xff01;想成为人工智能…