文章目录
- 一、版本说明
- 二、实战
- 2.1. 依赖
- 2.2. 生产者代码
- 2.3. 消费端代码
- 2.4. 测试
- 三、小伙伴疑难解答
- 3.1. 首先新建一个maven项目
- 3.2. 把我的依赖和代码复制过去
- 3.3. 把我写的case调试通
- 3.4. 找到左边External Libraries
- 3.5. jar处理
- 3.6. 打开非maven项目,添加jar
- 3.7. 等待项目编译
- 四、 项目jar和引入的jar冲突建议
- 4.1. 定位问题
- 4.2. 分析问题
- 五、解决问题
- 5.1. 解决问题场景
- 5.2. 方案1
- 5.2. 方案2
- 5.2. 方案3
一、版本说明
linux服务器环境软件 | 版本 |
---|---|
jdk | jdk-8u144-linux-x64.tar.gz |
kafka | kafka_2.9.2-0.8.2.1.tgz |
应用服务器软件 | 版本 |
---|---|
jdk | jdk1.6.0_24 |
kafka | kafka_2.9.2-0.8.2.1 |
二、实战
2.1. 依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.2</artifactId><version>0.8.2.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.15</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion><exclusion><artifactId>mail</artifactId><groupId>javax.mail</groupId></exclusion></exclusions></dependency>
2.2. 生产者代码
package com.sinosoft.d;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;/*** kafka生产端** @author gblfy* @date 2020-08-07** Kafka生产者测试* http://kafka.apache.org/documentation.html#introduction* http://blog.csdn.net/hmsiwtv/article/details/46960053*/
public class KafkaProducetest {private final Producer<String, String> producer;public final static String TOPIC = "clicki_info_topic";private KafkaProducetest() {Properties props = new Properties();//此处配置的是kafka的端口props.put("metadata.broker.list", "10.5.6.19:9092");//配置value的序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");//配置key的序列化类props.put("key.serializer.class", "kafka.serializer.StringEncoder");//0表示不确认主服务器是否收到消息,马上返回,低延迟但最弱的持久性,数据可能会丢失//1表示确认主服务器收到消息后才返回,持久性稍强,可是如果主服务器死掉,从服务器数据尚未同步,数据可能会丢失//-1表示确认所有服务器都收到数据,完美!props.put("request.required.acks", "-1");//异步生产,批量存入缓存后再发到服务器去props.put("producer.type", "async");//填充配置,初始化生产者producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 1000;final int COUNT = 2000;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message " + key;String data1 = "{\"c\":0,\"i\":16114765323924126,\"n\":\"http://www.abbo.cn/clicki.html\",\"s\":0,\"sid\":0,\"t\":\"info_url\",\"tid\":0,\"unix\":0,\"viewId\":0}";// 发送消息// producer.send(new KeyedMessage<String, String>(TOPIC,data1));// 消息类型key:valueproducer.send(new KeyedMessage<String, String>(TOPIC, key, data));System.out.println(data);messageNo++;}producer.close();//必须关闭}public static void main(String[] args) {new KafkaProducetest().produce();}
}
2.3. 消费端代码
package com.sinosoft.d;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;/*** kafka生产端** @author gblfy* @date 2020-08-07* <p>* Kafka消费者测试*/
public class KafkaConsumertest {private final ConsumerConnector consumer;private KafkaConsumertest() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "10.5.6.19:2181");//group 代表一个消费组,加入组里面,消息只能被该组的一个消费者消费//如果所有消费者在一个组内,就是传统的队列模式,排队拿消息//如果所有的消费者都不在同一个组内,就是发布-订阅模式,消息广播给所有组//如果介于两者之间,那么广播的消息在组内也是要排队的props.put("group.id", "jd-group");//zk连接超时props.put("zookeeper.session.timeout.ms", "4000");//ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率/** 此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.* largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.* */props.put("auto.offset.reset", "smallest"); //消费最老消息,最新为largest//序列化类props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {// 描述读取哪个topic,需要几个线程读Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducetest.TOPIC, new Integer(1));/* 默认消费时的数据是byte[]形式的,可以传入String编码器*/StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);//消费数据时每个Topic有多个线程在读,所以取List第一个流KafkaStream<String, String> stream = consumerMap.get(KafkaProducetest.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {System.out.println(it.next().topic() + ":" + it.next().partition() + ":" + it.next().offset() + ":" + it.next().key() + ":" + it.next().message());}}public static void main(String[] args) {new KafkaConsumertest().consume();}
}
2.4. 测试
先启动消费端,在启动生产端
三、小伙伴疑难解答
有的小伙伴问我,我们的工程师非maven项目该怎么办呢?
我给这个小伙伴的建议是以下几点:
3.1. 首先新建一个maven项目
3.2. 把我的依赖和代码复制过去
3.3. 把我写的case调试通
3.4. 找到左边External Libraries
3.5. jar处理
在本地的maven仓库中把这些以来的jar复制出来,建议先发到一个空的文件夹里面,建议和我一样
3.6. 打开非maven项目,添加jar
打开你的非maven项目,把这些jar添加进去
3.7. 等待项目编译
四、 项目jar和引入的jar冲突建议
关于这个问题,很多小伙伴应该也遇到过很正常,首先要保证引入的jar不能和以前的jar产生冲突?
那又该怎么办?
有的小伙伴说,把以前和本次引入jar冲突的jar删除呗!要三思
4.1. 定位问题
首先定位引入的jar和项目中的哪一个jar发生冲突
4.2. 分析问题
冲突的原因是什么?
1.引用的对象的包路径一样并且对象名也一样
2.二个冲突的kar项目中加入都存在,但是,代码中引入jar的优先级问题,引入的jar非自己需要的jar,而自己需要的jar默认不会引入,导致代码报错
3.版本问题,jar向下不兼容
五、解决问题
5.1. 解决问题场景
首先解决问题要基于场景:
我引入的jar,只有我自己用,但是,我的代码中默认引入的jar非我需要的,但是,把项目中以前的jar(冲突的jar)删除,代码就不报错。
5.2. 方案1
这种场景有3种解决方案
方案1(风险可控):如果,可以确保删除以前的jar不会项目的其他功能造成影响,可以考虑删除以前旧的jar
5.2. 方案2
方案2(风险不可控):如果,不能评估删除以前jar的风险范围,建议换一种方式,完成你的需求
5.2. 方案3
方案3(风险不可控):这种方案在方案2的基础上做的,找到刚引入冲突jar的源码,把需要的类和api(方法),单独复制出来,这样也可以解决,但是,需要小伙伴具备阅读源码的能力和调试的时间。