java 集成 kafka 0.8.2.1 适配jdk1.6

文章目录

          • 一、版本说明
          • 二、实战
            • 2.1. 依赖
            • 2.2. 生产者代码
            • 2.3. 消费端代码
            • 2.4. 测试
          • 三、小伙伴疑难解答
            • 3.1. 首先新建一个maven项目
            • 3.2. 把我的依赖和代码复制过去
            • 3.3. 把我写的case调试通
            • 3.4. 找到左边External Libraries
            • 3.5. jar处理
            • 3.6. 打开非maven项目,添加jar
            • 3.7. 等待项目编译
          • 四、 项目jar和引入的jar冲突建议
            • 4.1. 定位问题
            • 4.2. 分析问题
          • 五、解决问题
            • 5.1. 解决问题场景
            • 5.2. 方案1
            • 5.2. 方案2
            • 5.2. 方案3

一、版本说明
linux服务器环境软件版本
jdkjdk-8u144-linux-x64.tar.gz
kafkakafka_2.9.2-0.8.2.1.tgz
应用服务器软件版本
jdkjdk1.6.0_24
kafkakafka_2.9.2-0.8.2.1
二、实战
2.1. 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.2</artifactId><version>0.8.2.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.15</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion><exclusion><artifactId>mail</artifactId><groupId>javax.mail</groupId></exclusion></exclusions></dependency>
2.2. 生产者代码
package com.sinosoft.d;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;/*** kafka生产端** @author gblfy* @date 2020-08-07** Kafka生产者测试* http://kafka.apache.org/documentation.html#introduction* http://blog.csdn.net/hmsiwtv/article/details/46960053*/
public class KafkaProducetest {private final Producer<String, String> producer;public final static String TOPIC = "clicki_info_topic";private KafkaProducetest() {Properties props = new Properties();//此处配置的是kafka的端口props.put("metadata.broker.list", "10.5.6.19:9092");//配置value的序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");//配置key的序列化类props.put("key.serializer.class", "kafka.serializer.StringEncoder");//0表示不确认主服务器是否收到消息,马上返回,低延迟但最弱的持久性,数据可能会丢失//1表示确认主服务器收到消息后才返回,持久性稍强,可是如果主服务器死掉,从服务器数据尚未同步,数据可能会丢失//-1表示确认所有服务器都收到数据,完美!props.put("request.required.acks", "-1");//异步生产,批量存入缓存后再发到服务器去props.put("producer.type", "async");//填充配置,初始化生产者producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 1000;final int COUNT = 2000;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message " + key;String data1 = "{\"c\":0,\"i\":16114765323924126,\"n\":\"http://www.abbo.cn/clicki.html\",\"s\":0,\"sid\":0,\"t\":\"info_url\",\"tid\":0,\"unix\":0,\"viewId\":0}";// 发送消息//            producer.send(new KeyedMessage<String, String>(TOPIC,data1));// 消息类型key:valueproducer.send(new KeyedMessage<String, String>(TOPIC, key, data));System.out.println(data);messageNo++;}producer.close();//必须关闭}public static void main(String[] args) {new KafkaProducetest().produce();}
}
2.3. 消费端代码
package com.sinosoft.d;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;/*** kafka生产端** @author gblfy* @date 2020-08-07* <p>* Kafka消费者测试*/
public class KafkaConsumertest {private final ConsumerConnector consumer;private KafkaConsumertest() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "10.5.6.19:2181");//group 代表一个消费组,加入组里面,消息只能被该组的一个消费者消费//如果所有消费者在一个组内,就是传统的队列模式,排队拿消息//如果所有的消费者都不在同一个组内,就是发布-订阅模式,消息广播给所有组//如果介于两者之间,那么广播的消息在组内也是要排队的props.put("group.id", "jd-group");//zk连接超时props.put("zookeeper.session.timeout.ms", "4000");//ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率/** 此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.* largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.* */props.put("auto.offset.reset", "smallest");  //消费最老消息,最新为largest//序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {// 描述读取哪个topic,需要几个线程读Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducetest.TOPIC, new Integer(1));/* 默认消费时的数据是byte[]形式的,可以传入String编码器*/StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);//消费数据时每个Topic有多个线程在读,所以取List第一个流KafkaStream<String, String> stream = consumerMap.get(KafkaProducetest.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {System.out.println(it.next().topic() + ":" + it.next().partition() + ":" + it.next().offset() + ":" + it.next().key() + ":" + it.next().message());}}public static void main(String[] args) {new KafkaConsumertest().consume();}
}
2.4. 测试

先启动消费端,在启动生产端
在这里插入图片描述
在这里插入图片描述

三、小伙伴疑难解答

有的小伙伴问我,我们的工程师非maven项目该怎么办呢?
我给这个小伙伴的建议是以下几点:

3.1. 首先新建一个maven项目
3.2. 把我的依赖和代码复制过去
3.3. 把我写的case调试通
3.4. 找到左边External Libraries

在这里插入图片描述

3.5. jar处理

在本地的maven仓库中把这些以来的jar复制出来,建议先发到一个空的文件夹里面,建议和我一样
在这里插入图片描述

3.6. 打开非maven项目,添加jar

打开你的非maven项目,把这些jar添加进去
在这里插入图片描述
在这里插入图片描述

3.7. 等待项目编译
四、 项目jar和引入的jar冲突建议

关于这个问题,很多小伙伴应该也遇到过很正常,首先要保证引入的jar不能和以前的jar产生冲突?
那又该怎么办?
有的小伙伴说,把以前和本次引入jar冲突的jar删除呗!要三思

4.1. 定位问题

首先定位引入的jar和项目中的哪一个jar发生冲突

4.2. 分析问题

冲突的原因是什么?
1.引用的对象的包路径一样并且对象名也一样
2.二个冲突的kar项目中加入都存在,但是,代码中引入jar的优先级问题,引入的jar非自己需要的jar,而自己需要的jar默认不会引入,导致代码报错
3.版本问题,jar向下不兼容

五、解决问题
5.1. 解决问题场景

首先解决问题要基于场景:

我引入的jar,只有我自己用,但是,我的代码中默认引入的jar非我需要的,但是,把项目中以前的jar(冲突的jar)删除,代码就不报错。

5.2. 方案1

这种场景有3种解决方案
方案1(风险可控):如果,可以确保删除以前的jar不会项目的其他功能造成影响,可以考虑删除以前旧的jar

5.2. 方案2

方案2(风险不可控):如果,不能评估删除以前jar的风险范围,建议换一种方式,完成你的需求

5.2. 方案3

方案3(风险不可控):这种方案在方案2的基础上做的,找到刚引入冲突jar的源码,把需要的类和api(方法),单独复制出来,这样也可以解决,但是,需要小伙伴具备阅读源码的能力和调试的时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云MWC 2019发布7款重磅产品,助力全球企业迈向智能化

当地时间2月25日&#xff0c;在巴塞罗那举行的MWC 2019上&#xff0c;阿里云面向全球发布了7款重磅产品&#xff0c;涵盖无服务器计算、高性能存储、全球网络、企业级数据库、大数据计算等主要云产品&#xff0c;可满足电子商务、物流、金融科技以及制造等各行业企业的数字化转…

Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel

自 Spring Cloud 官方宣布 Spring Cloud Netflix 进入维护状态后&#xff0c;我们开始制作《Spring Cloud Alibaba迁移指南》系列文章&#xff0c;向开发者提供更多的技术选型方案&#xff0c;并降低迁移过程中的技术难度。 第一篇&#xff0c;我们对Hystrix、Resilience4j 和…

util中注入service

Autowiredprivate GovCustomerService service;private static GovCustomerService govCustomerService;PostConstruct //完成对service的注入public void init() {govCustomerService service;}

linux环境安装 kafka 0.8.2.1 jdk1.6

文章目录一、环境分布二、实战1. kafka下载2. 解压3. 配置4. 编写启动脚本5. 编写关闭脚本6. 赋予脚本可执行权限7. 脚本使用案例三、Config配置四、Consumer配置五、Producer配置很多小伙伴问我&#xff0c;为什么不用最新版本的kafka呢&#xff1f;关于这个问题&#xff0c;都…

元旦限时特惠,耳机、书籍等大降价

戳蓝字“CSDN云计算”关注我们哦&#xff01;今天是12月31日离2020年仅有不到一天的时间你们的2019年目标都实现了吗&#xff1f;在这一年你写了多少行代码改了多少个bug呢&#xff1f;2020年的愿望是否也是希望自己写的代码bug能少一些&#xff1f;小编的2020年希望能买到更多…

深入解读MySQL8.0 新特性 :Crash Safe DDL

前言 在MySQL8.0之前的版本中&#xff0c;由于架构的原因&#xff0c;mysql在server层使用统一的frm文件来存储表元数据信息&#xff0c;这个信息能够被不同的存储引擎识别。而实际上innodb本身也存储有元数据信息。这给ddl带来了一定的挑战&#xff0c;因为这种架构无法做到d…

mysql查询包含字符串(模糊查询)

mysql查询包含字符串更高效率的方法一、LOCATE语句SELECT column from table where locate(‘keyword’, condition)>0二、或是 locate 的別名 positionSELECT column from table where position(‘keyword’ IN condition)三、INSTR语句SELECT column from table where ins…

ant编译web项目

文章目录1.下载ant2. 解压ant3. 配置an环境变量4. 验证二、编译项目2.1. 新建一个build.xml2.2. 编译项目测试1.下载ant 官网链接&#xff1a; https://ant.apache.org/srcdownload.cgi 2. 解压ant 3. 配置an环境变量 4. 验证 ant -v二、编译项目 2.1. 新建一个build.xml…

Spark in action on Kubernetes - Playground搭建与架构浅析

前言 Spark是非常流行的大数据处理引擎&#xff0c;数据科学家们使用Spark以及相关生态的大数据套件完成了大量又丰富场景的数据分析与挖掘。Spark目前已经逐渐成为了业界在数据处理领域的行业标准。但是Spark本身的设计更偏向使用静态的资源管理&#xff0c;虽然Spark也支持了…

阿里云发布时间序列数据库TSDB,关于时序你了解多少?

概要介绍 时间序列数据是一种表示物理设备&#xff0c;系统、应用过程或行为随时间变化的数据&#xff0c;广泛应用于物联网&#xff0c;工业物联网&#xff0c;基础运维系统等场景。阿里云TSDB 时间序列数据库可以解决大规模时序数据的可靠写入&#xff0c;降低数据存储成本&…

VMware宣布完成27亿美元收购Pivotal;日本成功研发出6G芯片:单载波速度高达100Gbps;联想手机再换新掌门……...

关注并标星星CSDN云计算 速递、最新、绝对有料。这里有企业新动、这里有业界要闻&#xff0c;打起十二分精神&#xff0c;紧跟fashion你可以的&#xff01;每周两次&#xff0c;打卡即read更快、更全了解泛云圈精彩newsgo go go【1月1日 星期三】云の声音5G医疗爆发箭在弦上&am…

mysql自定义排序以及优化like模糊查询

**1. 自定义排序函数FIELD()**SELECT id,username,city FROM sy_user order byFIELD(city,郑州, 开封, 平顶山,洛阳, 商丘, 安阳, 新乡, 许昌, 鹤壁, 焦作, 濮阳, 漯河, 三门峡, 周口,驻马店, 南阳, 信阳, 济源,省本部,河南) **2.使用 case when**SELECT id,username,city FR…

使用Logtail采集Kubernetes上挂载的NAS日志

采集k8s挂载Nas后的日志 该文档主要介绍使用logtail以两种不同的方式进行k8s挂载Nas后的日志采集。两种采集方式的实现原理是一样的&#xff0c;都是通过将Logtail和业务容器挂载到相同的NAS上&#xff0c;使Logtail和业务容器的日志数据共享&#xff0c;以此实现日志采集。下…

linux目录挂载

挂载前声明&#xff1a; 执行挂载后&#xff0c;源本地目录下的文件会不显示。 挂载前&#xff1a;需要提前将原目录下面的日志文件备份转移&#xff0c;挂载成功后&#xff0c;在转移到挂载的本地目录下面即可。操作流程如下&#xff1a; 1. 将/app/fis/xml中148G多的日志文件…

深度揭秘“蚂蚁双链通”

今年年初&#xff0c;蚂蚁金服ATEC城市峰会在上海举行。在ATEC区块链行业研讨会分论坛上&#xff0c;蚂蚁金服区块链高级产品专家杨俊带来了主题为《供应链金融&#xff0c;不止于金融&#xff1a;蚂蚁双链通——基于区块链的供应链协作网络》的精彩分享。 蚂蚁金服区块链高级产…

@程序员,不要瞎努力!比起熬夜更可怕的是“熬日”!

最近&#xff0c;笔者在经常后台看到小伙伴留言在问&#xff0c;想学Python&#xff0c;但不知道如何入门&#xff1f;其实对于这个问题&#xff0c;真是仁者见仁智者见智。有句老话说的好“一千个读者&#xff0c;就有一千个哈姆雷特”不过对于此疑惑&#xff0c;笔者就想直接…

配置nginx作为静态资源服务器 css,js,image等资源直接访问

1.传统的web项目&#xff0c;一般都将静态资源存放在 webroot 的目录下&#xff0c;这样做很方便获取静态资源&#xff0c;但是如果说web项目很大&#xff0c;用户很多&#xff0c;静态资源也很多时&#xff0c;服务器的性能 或许就会很低下了。这种情况下一般都会需要一个静态…

分布式事务中间件 Fescar - 全局写排它锁解读

前言 一般&#xff0c;数据库事务的隔离级别会被设置成 读已提交&#xff0c;已满足业务需求&#xff0c;这样对应在Fescar中的分支&#xff08;本地&#xff09;事务的隔离级别就是 读已提交&#xff0c;那么Fescar中对于全局事务的隔离级别又是什么呢&#xff1f;如果认真阅…

使用xfire webservice接口开发,obj与xml相互转换好用工具类,不需要写大量的转换代码,亲测可用

webservice接口开发&#xff0c;旧工程中存在使用xfire开发的接口&#xff0c;对象转换为xml和xml转换为对象的时候需要些大量的代码&#xff0c;工作量很大。现在提供一个比较好的对象转换为xml的工具。 <!-- https://mvnrepository.com/artifact/commons-betwixt/commons…

Spring Cloud Alibaba迁移指南(二):零代码替换 Eureka

自 Spring Cloud 官方宣布 Spring Cloud Netflix 进入维护状态后&#xff0c;我们开始制作《Spring Cloud Alibaba迁移指南》系列文章&#xff0c;向开发者提供更多的技术选型方案&#xff0c;并降低迁移过程中的技术难度。 第二篇&#xff0c;Spring Cloud Alibaba 实现了 Sp…